From 97db8e0351b9451ece8fd16c25ca0588ec71a2ab Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 18 Mar 2025 07:26:35 +0100 Subject: Performance updates (#314) --- .../failure/hostfault/StartStopHostFault.kt | 2 +- .../org/opendc/compute/simulator/host/SimHost.kt | 6 +- .../org/opendc/compute/simulator/internal/Guest.kt | 36 ++-- .../simulator/provisioner/HostsProvisioningStep.kt | 26 +-- .../opendc/compute/topology/TopologyFactories.kt | 23 +- .../opendc/compute/topology/specs/TopologySpecs.kt | 12 +- .../org/opendc/simulator/compute/cpu/SimCpu.java | 17 +- .../simulator/compute/machine/SimMachine.java | 47 ++-- .../simulator/compute/machine/VirtualMachine.java | 240 --------------------- .../opendc/simulator/compute/memory/Memory.java | 4 +- .../simulator/compute/power/CarbonModel.java | 25 ++- .../simulator/compute/power/SimPowerSource.java | 15 +- .../org/opendc/simulator/compute/power/SimPsu.java | 18 +- .../compute/power/batteries/BatteryAggregator.java | 32 ++- .../compute/power/batteries/SimBattery.java | 20 +- .../power/batteries/policy/BatteryPolicy.java | 16 +- .../policy/DoubleThresholdBatteryPolicy.java | 8 +- .../batteries/policy/RunningMeanBatteryPolicy.java | 8 +- .../policy/RunningMeanPlusBatteryPolicy.java | 8 +- .../policy/SingleThresholdBatteryPolicy.java | 8 +- .../simulator/compute/workload/ChainWorkload.java | 11 +- .../compute/workload/CheckpointModel.java | 20 +- .../compute/workload/SimChainWorkload.java | 95 ++++++-- .../simulator/compute/workload/SimWorkload.java | 8 +- .../simulator/compute/workload/Workload.java | 6 +- .../compute/workload/trace/SimTraceWorkload.java | 19 +- .../compute/workload/trace/TraceWorkload.java | 11 +- .../org/opendc/simulator/compute/Coroutines.kt | 8 +- .../opendc/simulator/engine/engine/FlowEngine.java | 8 - .../simulator/engine/graph/FlowDistributor.java | 14 +- .../opendc/simulator/engine/graph/FlowEdge.java | 23 ++ .../opendc/simulator/engine/graph/FlowGraph.java | 115 ---------- .../opendc/simulator/engine/graph/FlowNode.java | 52 ++--- 33 files changed, 406 insertions(+), 555 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java diff --git a/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/hostfault/StartStopHostFault.kt b/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/hostfault/StartStopHostFault.kt index 06fad3e6..37e26f6a 100644 --- a/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/hostfault/StartStopHostFault.kt +++ b/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/hostfault/StartStopHostFault.kt @@ -41,7 +41,7 @@ public class StartStopHostFault( for (host in victims) { val guests = host.getGuests() - val snapshots = guests.map { it.virtualMachine!!.getActiveWorkload().getSnapshot() } + val snapshots = guests.map { it.simChainWorkload!!.getSnapshot() } val tasks = guests.map { it.task } host.fail() diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt index 132ad227..53fafe5f 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -35,8 +35,8 @@ import org.opendc.simulator.compute.cpu.CpuPowerModel import org.opendc.simulator.compute.machine.SimMachine import org.opendc.simulator.compute.models.MachineModel import org.opendc.simulator.compute.models.MemoryUnit +import org.opendc.simulator.engine.engine.FlowEngine import org.opendc.simulator.engine.graph.FlowDistributor -import org.opendc.simulator.engine.graph.FlowGraph import java.time.Duration import java.time.Instant import java.time.InstantSource @@ -56,7 +56,7 @@ public class SimHost( private val name: String, private val clusterName: String, private val clock: InstantSource, - private val graph: FlowGraph, + private val engine: FlowEngine, private val machineModel: MachineModel, private val cpuPowerModel: CpuPowerModel, private val powerDistributor: FlowDistributor, @@ -126,7 +126,7 @@ public class SimHost( this.simMachine = SimMachine( - this.graph, + this.engine, this.machineModel, this.powerDistributor, this.cpuPowerModel, diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 83968d35..e2bdd960 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -29,8 +29,8 @@ import org.opendc.compute.simulator.service.ServiceTask import org.opendc.compute.simulator.telemetry.GuestCpuStats import org.opendc.compute.simulator.telemetry.GuestSystemStats import org.opendc.simulator.compute.machine.SimMachine -import org.opendc.simulator.compute.machine.VirtualMachine import org.opendc.simulator.compute.workload.ChainWorkload +import org.opendc.simulator.compute.workload.SimChainWorkload import org.opendc.simulator.compute.workload.trace.TraceFragment import org.opendc.simulator.compute.workload.trace.TraceWorkload import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling @@ -58,9 +58,9 @@ public class Guest( private set /** - * The [VirtualMachine] on which the task is currently running + * The [SimChainWorkload] on which the task is currently running */ - public var virtualMachine: VirtualMachine? = null + public var simChainWorkload: SimChainWorkload? = null private var uptime = 0L private var downtime = 0L @@ -90,7 +90,7 @@ public class Guest( * Launch the guest on the simulated Virtual machine */ private fun doStart() { - assert(virtualMachine == null) { "Concurrent job is already running" } + assert(simChainWorkload == null) { "Concurrent job is already running" } onStart() @@ -114,7 +114,12 @@ public class Guest( scalingPolicy, ) - if (task.workload is TraceWorkload) { + if (task.workload is ChainWorkload) { + simChainWorkload = + simMachine.startWorkload(task.workload as ChainWorkload) { cause -> + onStop(if (cause != null) TaskState.FAILED else TaskState.COMPLETED) + } + } else { val newChainWorkload = ChainWorkload( ArrayList(listOf(task.workload)), @@ -123,15 +128,10 @@ public class Guest( task.workload.checkpointIntervalScaling, ) - virtualMachine = + simChainWorkload = simMachine.startWorkload(newChainWorkload) { cause -> onStop(if (cause != null) TaskState.FAILED else TaskState.COMPLETED) } - } else { - virtualMachine = - simMachine.startWorkload(task.workload) { cause -> - onStop(if (cause != null) TaskState.FAILED else TaskState.COMPLETED) - } } } @@ -160,15 +160,15 @@ public class Guest( * Attempt to stop the task and put it into [target] state. */ private fun doStop(target: TaskState) { - assert(virtualMachine != null) { "Invalid job state" } - val virtualMachine = this.virtualMachine ?: return + assert(simChainWorkload != null) { "Invalid job state" } + val virtualMachine = this.simChainWorkload ?: return if (target == TaskState.FAILED) { - virtualMachine.shutdown(Exception("Task has failed")) + virtualMachine.stopWorkload(Exception("Task has failed")) } else { - virtualMachine.shutdown() + virtualMachine.stopWorkload() } - this.virtualMachine = null + this.simChainWorkload = null this.state = target } @@ -236,8 +236,8 @@ public class Guest( * Obtain the CPU statistics of this guest. */ public fun getCpuStats(): GuestCpuStats { - virtualMachine!!.updateCounters(this.clock.millis()) - val counters = virtualMachine!!.performanceCounters + simChainWorkload!!.updateCounters(this.clock.millis()) + val counters = simChainWorkload!!.performanceCounters return GuestCpuStats( counters.cpuActiveTime / 1000L, diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index 211f33fe..68e263f8 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -34,6 +34,7 @@ import org.opendc.simulator.compute.power.batteries.BatteryAggregator import org.opendc.simulator.compute.power.batteries.SimBattery import org.opendc.simulator.engine.engine.FlowEngine import org.opendc.simulator.engine.graph.FlowDistributor +import org.opendc.simulator.engine.graph.FlowEdge /** * A [ProvisioningStep] that provisions a list of hosts for a [ComputeService]. @@ -56,36 +57,35 @@ public class HostsProvisioningStep internal constructor( val simPowerSources = mutableListOf() val engine = FlowEngine.create(ctx.dispatcher) - val graph = engine.newGraph() for (cluster in clusterSpecs) { // Create the Power Source to which hosts are connected // Create Power Source - val simPowerSource = SimPowerSource(graph, cluster.powerSource.totalPower.toDouble(), cluster.powerSource.name, cluster.name) + val simPowerSource = SimPowerSource(engine, cluster.powerSource.totalPower.toDouble(), cluster.powerSource.name, cluster.name) simPowerSources.add(simPowerSource) service.addPowerSource(simPowerSource) - val hostDistributor = FlowDistributor(graph) + val hostDistributor = FlowDistributor(engine) val carbonFragments = getCarbonFragments(cluster.powerSource.carbonTracePath) var carbonModel: CarbonModel? = null // Create Carbon Model if (carbonFragments != null) { - carbonModel = CarbonModel(graph, carbonFragments, startTime) + carbonModel = CarbonModel(engine, carbonFragments, startTime) carbonModel.addReceiver(simPowerSource) } if (cluster.battery != null) { // Create Battery Distributor - val batteryDistributor = FlowDistributor(graph) - graph.addEdge(batteryDistributor, simPowerSource) + val batteryDistributor = FlowDistributor(engine) + FlowEdge(batteryDistributor, simPowerSource) // Create Battery val battery = SimBattery( - graph, + engine, cluster.battery!!.capacity, cluster.battery!!.chargingSpeed, cluster.battery!!.initialCharge, @@ -94,26 +94,26 @@ public class HostsProvisioningStep internal constructor( cluster.battery!!.embodiedCarbon, cluster.battery!!.expectedLifetime, ) - graph.addEdge(battery, batteryDistributor) + FlowEdge(battery, batteryDistributor) // Create Aggregator - val batteryAggregator = BatteryAggregator(graph, battery, batteryDistributor) + val batteryAggregator = BatteryAggregator(engine, battery, batteryDistributor) val batteryPolicy = createSimBatteryPolicy( cluster.battery!!.batteryPolicy, - graph, + engine, battery, batteryAggregator, ) carbonModel?.addReceiver(batteryPolicy) - graph.addEdge(hostDistributor, batteryAggregator) + FlowEdge(hostDistributor, batteryAggregator) service.addBattery(battery) } else { - graph.addEdge(hostDistributor, simPowerSource) + FlowEdge(hostDistributor, simPowerSource) } // Create hosts, they are connected to the powerMux when SimMachine is created @@ -123,7 +123,7 @@ public class HostsProvisioningStep internal constructor( hostSpec.name, cluster.name, ctx.dispatcher.timeSource, - graph, + engine, hostSpec.model, hostSpec.cpuPowerModel, hostDistributor, diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt index a20bc2c2..cc2c4b4e 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt @@ -44,10 +44,10 @@ import java.io.InputStream private val reader = TopologyReader() // Lists used to make sure all cluster, host, power source and battery have unique names -private val clusterNames: ArrayList = ArrayList() -private val hostNames: ArrayList = ArrayList() -private val powerSourceNames: ArrayList = ArrayList() -private val batteryNames: ArrayList = ArrayList() +private val clusterNames: HashMap = HashMap() +private val hostNames: HashMap = HashMap() +private val powerSourceNames: HashMap = HashMap() +private val batteryNames: HashMap = HashMap() /** * Create a unique name for the specified [name] that is not already in the [names] list. @@ -57,20 +57,19 @@ private val batteryNames: ArrayList = ArrayList() */ private fun createUniqueName( name: String, - names: ArrayList, + names: MutableMap, ): String { if (name !in names) { - names.add(name) + names[name] = 0 return name } - var i = 0 - var newName = "$name-$i" - while (newName in names) { - newName = "$name-${++i}" - } + val latestValue = names[name] + + val newName = "$name-$latestValue" + + names[name] = latestValue!! + 1 - names.add(newName) return newName } diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/TopologySpecs.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/TopologySpecs.kt index 3d8b63dc..920d8373 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/TopologySpecs.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/TopologySpecs.kt @@ -34,7 +34,7 @@ import org.opendc.simulator.compute.power.batteries.policy.DoubleThresholdBatter import org.opendc.simulator.compute.power.batteries.policy.RunningMeanBatteryPolicy import org.opendc.simulator.compute.power.batteries.policy.RunningMeanPlusBatteryPolicy import org.opendc.simulator.compute.power.batteries.policy.SingleThresholdBatteryPolicy -import org.opendc.simulator.engine.graph.FlowGraph +import org.opendc.simulator.engine.engine.FlowEngine /** * Definition of a Topology modeled in the simulation. @@ -236,21 +236,21 @@ public data class RunningQuartilesPolicyJSONSpec( public fun createSimBatteryPolicy( batterySpec: BatteryPolicyJSONSpec, - graph: FlowGraph, + engine: FlowEngine, battery: SimBattery, batteryAggregator: BatteryAggregator, ): BatteryPolicy { return when (batterySpec) { is SingleBatteryPolicyJSONSpec -> SingleThresholdBatteryPolicy( - graph, + engine, battery, batteryAggregator, batterySpec.carbonThreshold, ) is DoubleBatteryPolicyJSONSpec -> DoubleThresholdBatteryPolicy( - graph, + engine, battery, batteryAggregator, batterySpec.lowerThreshold, @@ -258,7 +258,7 @@ public fun createSimBatteryPolicy( ) is RunningMeanPolicyJSONSpec -> RunningMeanBatteryPolicy( - graph, + engine, battery, batteryAggregator, batterySpec.startingThreshold, @@ -266,7 +266,7 @@ public fun createSimBatteryPolicy( ) is RunningMeanPlusPolicyJSONSpec -> RunningMeanPlusBatteryPolicy( - graph, + engine, battery, batteryAggregator, batterySpec.startingThreshold, diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java index a9edaa97..fadfc4d6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java @@ -22,11 +22,13 @@ package org.opendc.simulator.compute.cpu; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.machine.PerformanceCounters; import org.opendc.simulator.compute.models.CpuModel; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -102,15 +104,15 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public SimCpu(FlowGraph graph, CpuModel cpuModel, CpuPowerModel powerModel, int id) { - super(graph); + public SimCpu(FlowEngine engine, CpuModel cpuModel, CpuPowerModel powerModel, int id) { + super(engine); this.cpuModel = cpuModel; this.maxCapacity = this.cpuModel.getTotalCapacity(); // TODO: connect this to the front-end this.cpuPowerModel = powerModel; - this.lastCounterUpdate = graph.getEngine().getClock().millis(); + this.lastCounterUpdate = clock.millis(); this.cpuFrequencyInv = 1 / this.maxCapacity; } @@ -255,4 +257,11 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer this.psuEdge = null; this.invalidate(); } + + @Override + public Map> getConnectedEdges() { + return Map.of( + FlowEdge.NodeType.CONSUMING, List.of(this.psuEdge), + FlowEdge.NodeType.SUPPLYING, List.of(this.distributorEdge)); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index dab0c421..55a5eb42 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -29,17 +29,19 @@ import org.opendc.simulator.compute.cpu.SimCpu; import org.opendc.simulator.compute.memory.Memory; import org.opendc.simulator.compute.models.MachineModel; import org.opendc.simulator.compute.power.SimPsu; +import org.opendc.simulator.compute.workload.ChainWorkload; +import org.opendc.simulator.compute.workload.SimChainWorkload; import org.opendc.simulator.compute.workload.SimWorkload; -import org.opendc.simulator.compute.workload.Workload; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowDistributor; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowEdge; /** * A machine that is able to execute {@link SimWorkload} objects. */ public class SimMachine { private final MachineModel machineModel; - private final FlowGraph graph; + private final FlowEngine engine; private final InstantSource clock; @@ -62,8 +64,8 @@ public class SimMachine { return machineModel; } - public FlowGraph getGraph() { - return graph; + public FlowEngine getEngine() { + return engine; } public InstantSource getClock() { @@ -112,29 +114,30 @@ public class SimMachine { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public SimMachine( - FlowGraph graph, + FlowEngine engine, MachineModel machineModel, FlowDistributor powerDistributor, CpuPowerModel cpuPowerModel, Consumer completion) { - this.graph = graph; + this.engine = engine; this.machineModel = machineModel; - this.clock = graph.getEngine().getClock(); + this.clock = engine.getClock(); // Create the psu and cpu and connect them - this.psu = new SimPsu(graph); + this.psu = new SimPsu(engine); - graph.addEdge(this.psu, powerDistributor); + new FlowEdge(this.psu, powerDistributor); - this.cpu = new SimCpu(graph, this.machineModel.getCpuModel(), cpuPowerModel, 0); + this.cpu = new SimCpu(engine, this.machineModel.getCpuModel(), cpuPowerModel, 0); - graph.addEdge(this.cpu, this.psu); + new FlowEdge(this.cpu, this.psu); - this.memory = new Memory(graph, this.machineModel.getMemory()); + this.memory = new Memory(engine, this.machineModel.getMemory()); // Create a FlowDistributor and add the cpu as supplier - this.cpuDistributor = new FlowDistributor(this.graph); - graph.addEdge(this.cpuDistributor, this.cpu); + this.cpuDistributor = new FlowDistributor(engine); + + new FlowEdge(this.cpuDistributor, this.cpu); this.completion = completion; } @@ -147,13 +150,13 @@ public class SimMachine { * Close all related hardware */ public void shutdown(Exception cause) { - this.graph.removeNode(this.psu); + this.psu.closeNode(); this.psu = null; - this.graph.removeNode(this.cpu); + this.cpu.closeNode(); this.cpu = null; - this.graph.removeNode(this.cpuDistributor); + this.cpuDistributor.closeNode(); this.cpuDistributor = null; this.memory = null; @@ -181,11 +184,7 @@ public class SimMachine { * @param completion * @return */ - public VirtualMachine startWorkload(Workload workload, Consumer completion) { - final VirtualMachine vm = new VirtualMachine(this); - - vm.startWorkload(workload, completion); - - return vm; + public SimChainWorkload startWorkload(ChainWorkload workload, Consumer completion) { + return (SimChainWorkload) workload.startWorkload(this.cpuDistributor, this, completion); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java deleted file mode 100644 index 1946eecb..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.machine; - -import java.util.function.Consumer; -import org.opendc.simulator.compute.cpu.SimCpu; -import org.opendc.simulator.compute.workload.SimWorkload; -import org.opendc.simulator.compute.workload.Workload; -import org.opendc.simulator.engine.graph.FlowConsumer; -import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; -import org.opendc.simulator.engine.graph.FlowNode; -import org.opendc.simulator.engine.graph.FlowSupplier; - -/* - A virtual Machine created to run a single workload -*/ -public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSupplier { - private final SimMachine machine; - - private SimWorkload activeWorkload; - - private long lastUpdate; - private final double d; - - private FlowEdge cpuEdge; // The edge to the cpu - private FlowEdge workloadEdge; // The edge to the workload - - private double cpuDemand; - private double cpuSupply; - private double cpuCapacity; - - private PerformanceCounters performanceCounters = new PerformanceCounters(); - - private Consumer completion; - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Basic Getters and Setters - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public PerformanceCounters getPerformanceCounters() { - return performanceCounters; - } - - public SimWorkload getActiveWorkload() { - return activeWorkload; - } - - public double getDemand() { - return cpuDemand; - } - - public void setDemand(double demand) { - this.cpuDemand = demand; - } - - public double getCpuCapacity() { - return cpuCapacity; - } - - public void setCpuCapacity(double cpuCapacity) { - this.cpuCapacity = cpuCapacity; - } - - public FlowGraph getGraph() { - return this.parentGraph; - } - - public SimCpu getCpu() { - return machine.getCpu(); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Constructors - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public VirtualMachine(SimMachine machine) { - super(machine.getGraph()); - this.machine = machine; - this.clock = this.machine.getClock(); - - this.parentGraph = machine.getGraph(); - this.parentGraph.addEdge(this, this.machine.getCpuDistributor()); - - this.lastUpdate = clock.millis(); - this.lastUpdate = clock.millis(); - - this.d = 1 / machine.getCpu().getFrequency(); - } - - public void shutdown() { - this.shutdown(null); - } - - public void shutdown(Exception cause) { - if (this.nodeState == NodeState.CLOSED) { - return; - } - - super.closeNode(); - - this.activeWorkload = null; - this.performanceCounters = null; - - this.completion.accept(cause); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Workload related functionality - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public void startWorkload(Workload workload, Consumer completion) { - this.completion = completion; - this.activeWorkload = workload.startWorkload(this, this.clock.millis()); - } - - public void updateCounters(long now) { - long lastUpdate = this.lastUpdate; - this.lastUpdate = now; - long delta = now - lastUpdate; - - if (delta > 0) { - final double factor = this.d * delta; - - this.performanceCounters.addCpuActiveTime(Math.round(this.cpuSupply * factor)); - this.performanceCounters.setCpuIdleTime(Math.round((this.cpuCapacity - this.cpuSupply) * factor)); - this.performanceCounters.addCpuStealTime(Math.round((this.cpuDemand - this.cpuSupply) * factor)); - } - - this.performanceCounters.setCpuDemand(this.cpuDemand); - this.performanceCounters.setCpuSupply(this.cpuSupply); - this.performanceCounters.setCpuCapacity(this.cpuCapacity); - } - - @Override - public long onUpdate(long now) { - updateCounters(now); - - return Long.MAX_VALUE; - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // FlowGraph Related functionality - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - /** - * Add an edge to the workload - * TODO: maybe add a check if there is already an edge - */ - @Override - public void addConsumerEdge(FlowEdge consumerEdge) { - this.workloadEdge = consumerEdge; - } - - /** - * Add an edge to the cpuMux - * TODO: maybe add a check if there is already an edge - */ - @Override - public void addSupplierEdge(FlowEdge supplierEdge) { - this.cpuEdge = supplierEdge; - } - - /** - * Push demand to the cpuMux if the demand has changed - **/ - @Override - public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - this.cpuEdge.pushDemand(newDemand); - } - - /** - * Push supply to the workload if the supply has changed - **/ - @Override - public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { - this.workloadEdge.pushSupply(newSupply); - } - - /** - * Handle new demand from the workload by sending it through to the cpuMux - **/ - @Override - public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { - - updateCounters(this.clock.millis()); - this.cpuDemand = newDemand; - - pushOutgoingDemand(this.cpuEdge, newDemand); - } - - /** - * Handle a new supply pushed by the cpuMux by sending it through to the workload - **/ - @Override - public void handleIncomingSupply(FlowEdge supplierEdge, double newCpuSupply) { - - updateCounters(this.clock.millis()); - this.cpuSupply = newCpuSupply; - - pushOutgoingSupply(this.workloadEdge, newCpuSupply); - } - - @Override - public void removeConsumerEdge(FlowEdge consumerEdge) { - this.workloadEdge = null; - this.shutdown(); - } - - @Override - public double getCapacity() { - return this.cpuCapacity; - } - - @Override - public void removeSupplierEdge(FlowEdge supplierEdge) { - this.cpuEdge = null; - this.shutdown(); - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java index d4406b20..2686cfa2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.memory; import org.opendc.simulator.compute.models.MemoryUnit; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * The [SimMemory] implementation for a machine. @@ -32,7 +32,7 @@ public final class Memory { // private final SimpleFlowSink sink; private final MemoryUnit memoryUnit; - public Memory(FlowGraph graph, MemoryUnit memoryUnit) { + public Memory(FlowEngine engine, MemoryUnit memoryUnit) { this.memoryUnit = memoryUnit; // TODO: Fix this diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java index b6246fe9..3bbdba66 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java @@ -24,7 +24,9 @@ package org.opendc.simulator.compute.power; import java.util.ArrayList; import java.util.List; -import org.opendc.simulator.engine.graph.FlowGraph; +import java.util.Map; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowNode; /** @@ -45,13 +47,13 @@ public class CarbonModel extends FlowNode { /** * Construct a CarbonModel * - * @param parentGraph The active FlowGraph which should be used to make the new FlowNode + * @param engine The {@link FlowEngine} the node belongs to * @param carbonFragments A list of Carbon Fragments defining the carbon intensity at different time frames * @param startTime The start time of the simulation. This is used to go from relative time (used by the clock) * to absolute time (used by carbon fragments). */ - public CarbonModel(FlowGraph parentGraph, List carbonFragments, long startTime) { - super(parentGraph); + public CarbonModel(FlowEngine engine, List carbonFragments, long startTime) { + super(engine); this.startTime = startTime; this.fragments = carbonFragments; @@ -66,6 +68,8 @@ public class CarbonModel extends FlowNode { receiver.removeCarbonModel(this); } + receivers.clear(); + this.closeNode(); } @@ -128,4 +132,17 @@ public class CarbonModel extends FlowNode { receiver.updateCarbonIntensity(this.current_fragment.getCarbonIntensity()); } + + public static List castList(List list, Class clazz) { + List result = new ArrayList<>(); + for (T element : list) { + result.add(clazz.cast(element)); + } + return result; + } + + @Override + public Map> getConnectedEdges() { + return Map.of(); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java index 718bc22a..34804230 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java @@ -22,9 +22,11 @@ package org.opendc.simulator.compute.power; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.cpu.SimCpu; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -111,8 +113,8 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier, Carb // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public SimPowerSource(FlowGraph graph, double max_capacity, String name, String clusterName) { - super(graph); + public SimPowerSource(FlowEngine engine, double max_capacity, String name, String clusterName) { + super(engine); this.capacity = max_capacity; @@ -208,4 +210,11 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier, Carb public void removeCarbonModel(CarbonModel carbonModel) { this.carbonModel = null; } + + @Override + public Map> getConnectedEdges() { + List supplierEdges = this.distributorEdge != null ? List.of(this.distributorEdge) : List.of(); + + return Map.of(FlowEdge.NodeType.SUPPLYING, supplierEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java index dc5129d6..87a4e791 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java @@ -22,10 +22,12 @@ package org.opendc.simulator.compute.power; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.cpu.SimCpu; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -90,8 +92,8 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public SimPsu(FlowGraph graph) { - super(graph); + public SimPsu(FlowEngine engine) { + super(engine); lastUpdate = this.clock.millis(); } @@ -183,4 +185,14 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer public void removeSupplierEdge(FlowEdge supplierEdge) { this.powerSupplyEdge = null; } + + @Override + public Map> getConnectedEdges() { + List supplyingEdges = cpuEdge != null ? List.of(cpuEdge) : List.of(); + List consumingEdges = powerSupplyEdge != null ? List.of(powerSupplyEdge) : List.of(); + + return Map.of( + FlowEdge.NodeType.SUPPLYING, supplyingEdges, + FlowEdge.NodeType.CONSUMING, consumingEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/BatteryAggregator.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/BatteryAggregator.java index 14d15b17..9a05f2b3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/BatteryAggregator.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/BatteryAggregator.java @@ -24,10 +24,12 @@ package org.opendc.simulator.compute.power.batteries; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowDistributor; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -52,14 +54,15 @@ public class BatteryAggregator extends FlowNode implements FlowConsumer, FlowSup /** * Construct a new {@link FlowNode} instance. * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. */ - public BatteryAggregator(FlowGraph parentGraph, SimBattery battery, FlowDistributor powerSourceDistributor) { - super(parentGraph); + public BatteryAggregator(FlowEngine engine, SimBattery battery, FlowDistributor powerSourceDistributor) { + super(engine); - this.powerSourceEdge = parentGraph.addEdge(this, powerSourceDistributor); + this.powerSourceEdge = new FlowEdge(this, powerSourceDistributor); this.powerSourceEdge.setSupplierIndex(0); - this.batteryEdge = parentGraph.addEdge(this, battery); + + this.batteryEdge = new FlowEdge(this, battery); this.batteryEdge.setSupplierIndex(1); } @@ -170,4 +173,21 @@ public class BatteryAggregator extends FlowNode implements FlowConsumer, FlowSup public double getCapacity() { return 0; } + + @Override + public Map> getConnectedEdges() { + List consumingEdges = new ArrayList<>(); + if (this.powerSourceEdge != null) { + consumingEdges.add(this.batteryEdge); + } + if (this.batteryEdge != null) { + consumingEdges.add(this.powerSourceEdge); + } + + List supplyingEdges = this.hostEdge != null ? List.of(this.hostEdge) : List.of(); + + return Map.of( + FlowEdge.NodeType.CONSUMING, consumingEdges, + FlowEdge.NodeType.SUPPLYING, supplyingEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/SimBattery.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/SimBattery.java index f09cbcee..d749af72 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/SimBattery.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/SimBattery.java @@ -22,10 +22,12 @@ package org.opendc.simulator.compute.power.batteries; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.power.batteries.policy.BatteryPolicy; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -120,7 +122,7 @@ public class SimBattery extends FlowNode implements FlowConsumer, FlowSupplier { /** * Construct a new {@link SimBattery} instance. * - * @param parentGraph The {@link FlowGraph} instance this battery is part of. + * @param engine The {@link FlowEngine} instance this battery is part of. * @param capacity The capacity of the battery in kWh. * @param chargingSpeed The charging speed of the battery in J. * @param initialCharge The initial charge of the battery in kWh. @@ -130,7 +132,7 @@ public class SimBattery extends FlowNode implements FlowConsumer, FlowSupplier { * @param expectedLifeTime The expected lifetime of the battery in years. */ public SimBattery( - FlowGraph parentGraph, + FlowEngine engine, double capacity, double chargingSpeed, double initialCharge, @@ -139,7 +141,7 @@ public class SimBattery extends FlowNode implements FlowConsumer, FlowSupplier { Double totalEmbodiedCarbon, Double expectedLifeTime) { - super(parentGraph); + super(engine); this.capacity = capacity * 3600000; this.chargingSpeed = chargingSpeed; @@ -319,4 +321,14 @@ public class SimBattery extends FlowNode implements FlowConsumer, FlowSupplier { public void removeConsumerEdge(FlowEdge consumerEdge) { this.close(); } + + @Override + public Map> getConnectedEdges() { + List consumingEdges = (this.distributorEdge != null) ? List.of(this.distributorEdge) : List.of(); + List supplyingEdges = (this.aggregatorEdge != null) ? List.of(this.aggregatorEdge) : List.of(); + + return Map.of( + FlowEdge.NodeType.CONSUMING, consumingEdges, + FlowEdge.NodeType.SUPPLYING, supplyingEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/BatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/BatteryPolicy.java index a50f7e73..9d983a67 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/BatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/BatteryPolicy.java @@ -22,13 +22,16 @@ package org.opendc.simulator.compute.power.batteries.policy; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.power.CarbonModel; import org.opendc.simulator.compute.power.CarbonReceiver; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.PowerSourceType; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowNode; /** @@ -47,10 +50,10 @@ public abstract class BatteryPolicy extends FlowNode implements CarbonReceiver { /** * Construct a new {@link FlowNode} instance. * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. */ - public BatteryPolicy(FlowGraph parentGraph, SimBattery battery, BatteryAggregator aggregator) { - super(parentGraph); + public BatteryPolicy(FlowEngine engine, SimBattery battery, BatteryAggregator aggregator) { + super(engine); this.battery = battery; this.battery.setBatteryPolicy(this); @@ -114,4 +117,9 @@ public abstract class BatteryPolicy extends FlowNode implements CarbonReceiver { public void removeCarbonModel(CarbonModel carbonModel) { this.close(); } + + @Override + public Map> getConnectedEdges() { + return Map.of(FlowEdge.NodeType.SUPPLYING, List.of()); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/DoubleThresholdBatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/DoubleThresholdBatteryPolicy.java index 3a9cb228..28302f4d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/DoubleThresholdBatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/DoubleThresholdBatteryPolicy.java @@ -25,7 +25,7 @@ package org.opendc.simulator.compute.power.batteries.policy; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * A battery policy that uses two thresholds to determine if a better should be charging or discharging. @@ -42,19 +42,19 @@ public class DoubleThresholdBatteryPolicy extends BatteryPolicy { /** * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. * @param battery The {@link SimBattery} to control. * @param aggregator The {@link BatteryAggregator} to use. * @param lowerThreshold The lower carbon intensity threshold to trigger charging or discharging. * @param upperThreshold The upper carbon intensity threshold to trigger charging or discharging. */ public DoubleThresholdBatteryPolicy( - FlowGraph parentGraph, + FlowEngine engine, SimBattery battery, BatteryAggregator aggregator, double lowerThreshold, double upperThreshold) { - super(parentGraph, battery, aggregator); + super(engine, battery, aggregator); this.lowerThreshold = lowerThreshold; this.upperThreshold = upperThreshold; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanBatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanBatteryPolicy.java index 1c127abd..63e03942 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanBatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanBatteryPolicy.java @@ -26,7 +26,7 @@ import java.util.LinkedList; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * A battery policy that uses a running mean to determine if a battery should be charging or discharging. @@ -46,17 +46,17 @@ public class RunningMeanBatteryPolicy extends BatteryPolicy { /** * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. * @param battery The {@link SimBattery} to control. * @param aggregator The {@link BatteryAggregator} to use. */ public RunningMeanBatteryPolicy( - FlowGraph parentGraph, + FlowEngine engine, SimBattery battery, BatteryAggregator aggregator, double startingThreshold, int windowSize) { - super(parentGraph, battery, aggregator); + super(engine, battery, aggregator); this.windowSize = windowSize; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanPlusBatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanPlusBatteryPolicy.java index 25b86dde..3330cd4e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanPlusBatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanPlusBatteryPolicy.java @@ -26,7 +26,7 @@ import java.util.LinkedList; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * An improved version of {@link RunningMeanBatteryPolicy}. @@ -43,17 +43,17 @@ public class RunningMeanPlusBatteryPolicy extends BatteryPolicy { /** * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this stage belongs to. * @param battery The {@link SimBattery} to control. * @param aggregator The {@link BatteryAggregator} to use. */ public RunningMeanPlusBatteryPolicy( - FlowGraph parentGraph, + FlowEngine engine, SimBattery battery, BatteryAggregator aggregator, double startingThreshold, int windowSize) { - super(parentGraph, battery, aggregator); + super(engine, battery, aggregator); this.windowSize = windowSize; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/SingleThresholdBatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/SingleThresholdBatteryPolicy.java index 26d85958..22e6ae2f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/SingleThresholdBatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/SingleThresholdBatteryPolicy.java @@ -25,7 +25,7 @@ package org.opendc.simulator.compute.power.batteries.policy; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * A battery policy that uses a single threshold to determine if a better should be charging or discharging. @@ -38,14 +38,14 @@ public class SingleThresholdBatteryPolicy extends BatteryPolicy { private final double carbonThreshold; /** - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. * @param battery The {@link SimBattery} to control. * @param aggregator The {@link BatteryAggregator} to use. * @param carbonThreshold The carbon intensity threshold to trigger charging or discharging. */ public SingleThresholdBatteryPolicy( - FlowGraph parentGraph, SimBattery battery, BatteryAggregator aggregator, double carbonThreshold) { - super(parentGraph, battery, aggregator); + FlowEngine engine, SimBattery battery, BatteryAggregator aggregator, double carbonThreshold) { + super(engine, battery, aggregator); this.carbonThreshold = carbonThreshold; } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java index ecd4c47f..76a715f7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java @@ -23,6 +23,8 @@ package org.opendc.simulator.compute.workload; import java.util.ArrayList; +import java.util.function.Consumer; +import org.opendc.simulator.compute.machine.SimMachine; import org.opendc.simulator.engine.graph.FlowSupplier; public class ChainWorkload implements Workload { @@ -66,7 +68,12 @@ public class ChainWorkload implements Workload { } @Override - public SimWorkload startWorkload(FlowSupplier supplier, long now) { - return new SimChainWorkload(supplier, this, now); + public SimWorkload startWorkload(FlowSupplier supplier) { + return new SimChainWorkload(supplier, this); + } + + @Override + public SimWorkload startWorkload(FlowSupplier supplier, SimMachine machine, Consumer completion) { + return new SimChainWorkload(supplier, this, machine, completion); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java index 6cc67e3f..d34da203 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java @@ -27,9 +27,10 @@ package org.opendc.simulator.compute.workload; // TODO: Move this to a separate file //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -import java.time.InstantSource; +import java.util.List; +import java.util.Map; import org.jetbrains.annotations.NotNull; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowNode; public class CheckpointModel extends FlowNode { @@ -37,23 +38,18 @@ public class CheckpointModel extends FlowNode { private long checkpointInterval; private final long checkpointDuration; private double checkpointIntervalScaling; - private FlowGraph graph; private long startOfInterval; public CheckpointModel(@NotNull SimWorkload simWorkload) { - super(simWorkload.getGraph()); + super(simWorkload.getEngine()); this.checkpointInterval = simWorkload.getCheckpointInterval(); this.checkpointDuration = simWorkload.getCheckpointDuration(); this.checkpointIntervalScaling = simWorkload.getCheckpointIntervalScaling(); this.simWorkload = simWorkload; - this.graph = simWorkload.getGraph(); - - InstantSource clock = graph.getEngine().getClock(); - - this.startOfInterval = clock.millis(); + this.startOfInterval = this.clock.millis(); } @Override @@ -89,6 +85,10 @@ public class CheckpointModel extends FlowNode { this.closeNode(); this.simWorkload = null; - this.graph = null; + } + + @Override + public Map> getConnectedEdges() { + return Map.of(); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index da6b8334..faab5c56 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -23,6 +23,11 @@ package org.opendc.simulator.compute.workload; import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import org.opendc.simulator.compute.machine.PerformanceCounters; +import org.opendc.simulator.compute.machine.SimMachine; import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -30,13 +35,15 @@ import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimChainWorkload} that composes multiple {@link SimWorkload}s. */ -final class SimChainWorkload extends SimWorkload implements FlowSupplier { +public final class SimChainWorkload extends SimWorkload implements FlowSupplier { private final LinkedList workloads; private int workloadIndex; private SimWorkload activeWorkload; - private double demand = 0.0f; - private double supply = 0.0f; + private double cpuDemand = 0.0f; + private double cpuSupply = 0.0f; + private double cpuCapacity = 0.0f; + private double d = 0.0f; private FlowEdge workloadEdge; private FlowEdge machineEdge; @@ -50,6 +57,10 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { private ChainWorkload snapshot; + private long lastUpdate; + private PerformanceCounters performanceCounters = new PerformanceCounters(); + private Consumer completion; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Basic Getters and Setters //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -79,24 +90,28 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { return checkpointIntervalScaling; } + public PerformanceCounters getPerformanceCounters() { + return performanceCounters; + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - SimChainWorkload(FlowSupplier supplier, ChainWorkload workload, long now) { - super(((FlowNode) supplier).getGraph()); + SimChainWorkload(FlowSupplier supplier, ChainWorkload workload) { + super(((FlowNode) supplier).getEngine()); this.snapshot = workload; - this.parentGraph = ((FlowNode) supplier).getGraph(); - this.parentGraph.addEdge(this, supplier); + new FlowEdge(this, supplier); - this.clock = this.parentGraph.getEngine().getClock(); this.workloads = new LinkedList<>(workload.getWorkloads()); this.checkpointInterval = workload.getCheckpointInterval(); this.checkpointDuration = workload.getCheckpointDuration(); this.checkpointIntervalScaling = workload.getCheckpointIntervalScaling(); + this.lastUpdate = clock.millis(); + if (checkpointInterval > 0) { this.createCheckpointModel(); } @@ -106,6 +121,15 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { this.onStart(); } + SimChainWorkload( + FlowSupplier supplier, ChainWorkload workload, SimMachine machine, Consumer completion) { + this(supplier, workload); + + this.capacity = machine.getCpu().getFrequency(); + this.d = 1 / machine.getCpu().getFrequency(); + this.completion = completion; + } + public Workload getNextWorkload() { this.workloadIndex++; return workloads.pop(); @@ -122,7 +146,25 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { this.checkpointModel.start(); } - this.activeWorkload = this.getNextWorkload().startWorkload(this, this.clock.millis()); + this.activeWorkload = this.getNextWorkload().startWorkload(this); + } + + public void updateCounters(long now) { + long lastUpdate = this.lastUpdate; + this.lastUpdate = now; + long delta = now - lastUpdate; + + if (delta > 0) { + final double factor = this.d * delta; + + this.performanceCounters.addCpuActiveTime(Math.round(this.cpuSupply * factor)); + this.performanceCounters.setCpuIdleTime(Math.round((this.cpuCapacity - this.cpuSupply) * factor)); + this.performanceCounters.addCpuStealTime(Math.round((this.cpuDemand - this.cpuSupply) * factor)); + } + + this.performanceCounters.setCpuDemand(this.cpuDemand); + this.performanceCounters.setCpuSupply(this.cpuSupply); + this.performanceCounters.setCpuCapacity(this.cpuCapacity); } @Override @@ -132,6 +174,16 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { @Override public void stopWorkload() { + this.stopWorkload(null); + } + + private Exception stopWorkloadCause = null; + + public void stopWorkload(Exception cause) { + if (cause != null) { + this.stopWorkloadCause = cause; + } + if (this.checkpointModel != null) { this.checkpointModel.close(); this.checkpointModel = null; @@ -143,6 +195,11 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { } this.closeNode(); + + if (this.completion != null) { + this.completion.accept(stopWorkloadCause); + this.completion = null; + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -197,7 +254,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { @Override public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - this.demand = newDemand; + this.cpuDemand = newDemand; this.machineEdge.pushDemand(newDemand); } @@ -210,7 +267,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { @Override public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { - this.supply = newSupply; + this.cpuSupply = newSupply; this.workloadEdge.pushSupply(newSupply); } @@ -222,6 +279,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { */ @Override public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { + updateCounters(this.clock.millis()); + this.pushOutgoingDemand(this.machineEdge, newDemand); } @@ -233,6 +292,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { */ @Override public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { + updateCounters(this.clock.millis()); + this.pushOutgoingSupply(this.machineEdge, newSupply); } @@ -255,7 +316,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { // Start next workload if (!this.workloads.isEmpty()) { - this.activeWorkload = getNextWorkload().startWorkload(this, this.clock.millis()); + this.activeWorkload = getNextWorkload().startWorkload(this); return; } @@ -276,4 +337,14 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { this.stopWorkload(); } + + @Override + public Map> getConnectedEdges() { + List consumerEdges = (this.machineEdge != null) ? List.of(this.machineEdge) : List.of(); + List supplierEdges = (this.workloadEdge != null) ? List.of(this.workloadEdge) : List.of(); + + return Map.of( + FlowEdge.NodeType.CONSUMING, consumerEdges, + FlowEdge.NodeType.SUPPLYING, supplierEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java index ebfcc552..d27d90af 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java @@ -22,8 +22,8 @@ package org.opendc.simulator.compute.workload; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; /** @@ -37,10 +37,10 @@ public abstract class SimWorkload extends FlowNode implements FlowConsumer { /** * Construct a new {@link FlowNode} instance. * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this stage belongs to. */ - public SimWorkload(FlowGraph parentGraph) { - super(parentGraph); + public SimWorkload(FlowEngine engine) { + super(engine); } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java index d85669bb..1d069bae 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.workload; +import java.util.function.Consumer; +import org.opendc.simulator.compute.machine.SimMachine; import org.opendc.simulator.engine.graph.FlowSupplier; public interface Workload { @@ -32,5 +34,7 @@ public interface Workload { double getCheckpointIntervalScaling(); - SimWorkload startWorkload(FlowSupplier supplier, long now); + SimWorkload startWorkload(FlowSupplier supplier); + + SimWorkload startWorkload(FlowSupplier supplier, SimMachine machine, Consumer completion); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index 46354d4c..9811f72d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -23,12 +23,13 @@ package org.opendc.simulator.compute.workload.trace; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -46,9 +47,9 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private double newCpuFreqSupplied = 0.0; // The Cpu speed supplied private double remainingWork = 0.0; // The duration of the fragment at the demanded speed - private long checkpointDuration; + private final long checkpointDuration; - private TraceWorkload snapshot; + private final TraceWorkload snapshot; private ScalingPolicy scalingPolicy = new NoDelayScaling(); @@ -83,8 +84,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public SimTraceWorkload(FlowSupplier supplier, TraceWorkload workload, long now) { - super(((FlowNode) supplier).getGraph()); + public SimTraceWorkload(FlowSupplier supplier, TraceWorkload workload) { + super(((FlowNode) supplier).getEngine()); this.snapshot = workload; this.checkpointDuration = workload.getCheckpointDuration(); @@ -92,8 +93,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.remainingFragments = new LinkedList<>(workload.getFragments()); this.fragmentIndex = 0; - final FlowGraph graph = ((FlowNode) supplier).getGraph(); - graph.addEdge(this, supplier); + new FlowEdge(this, supplier); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -289,4 +289,9 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.stopWorkload(); } + + @Override + public Map> getConnectedEdges() { + return Map.of(FlowEdge.NodeType.CONSUMING, (this.machineEdge != null) ? List.of(this.machineEdge) : List.of()); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java index 47292a7b..80687b88 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -25,6 +25,8 @@ package org.opendc.simulator.compute.workload.trace; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.function.Consumer; +import org.opendc.simulator.compute.machine.SimMachine; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.compute.workload.Workload; import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; @@ -111,8 +113,13 @@ public class TraceWorkload implements Workload { } @Override - public SimWorkload startWorkload(FlowSupplier supplier, long now) { - return new SimTraceWorkload(supplier, this, now); + public SimWorkload startWorkload(FlowSupplier supplier) { + return new SimTraceWorkload(supplier, this); + } + + @Override + public SimWorkload startWorkload(FlowSupplier supplier, SimMachine machine, Consumer completion) { + return this.startWorkload(supplier); } public static Builder builder() { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt index 49baaf48..bbd6feaa 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -25,8 +25,6 @@ package org.opendc.simulator.compute import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.machine.SimMachine import org.opendc.simulator.compute.workload.trace.TraceWorkload -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException /** * Run the specified [SimWorkloadNew] on this machine and suspend execution util [workload] has finished. @@ -43,8 +41,8 @@ public suspend fun SimMachine.runWorkload( return suspendCancellableCoroutine { cont -> cont.invokeOnCancellation { this@runWorkload.shutdown() } - startWorkload(workload) { cause -> - if (cause != null) cont.resumeWithException(cause) else cont.resume(Unit) - } +// startWorkload(workload) { cause -> +// if (cause != null) cont.resumeWithException(cause) else cont.resume(Unit) +// } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 3f18ac76..e5dbb7a8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -26,7 +26,6 @@ import java.time.Clock; import java.time.InstantSource; import kotlin.coroutines.CoroutineContext; import org.opendc.common.Dispatcher; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; /** @@ -78,13 +77,6 @@ public final class FlowEngine implements Runnable { return clock; } - /** - * Return a new {@link FlowGraph} that can be used to build a flow network. - */ - public FlowGraph newGraph() { - return new FlowGraph(this); - } - /** * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. *

diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index c094560e..09cd73f6 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -25,8 +25,11 @@ package org.opendc.simulator.engine.graph; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import org.opendc.simulator.engine.engine.FlowEngine; public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { private final ArrayList consumerEdges = new ArrayList<>(); @@ -45,8 +48,8 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private double capacity; // What is the max capacity. Can probably be removed - public FlowDistributor(FlowGraph graph) { - super(graph); + public FlowDistributor(FlowEngine engine) { + super(engine); } public double getTotalIncomingDemand() { @@ -283,4 +286,11 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu outgoingSupplies.set(idx, newSupply); consumerEdge.pushSupply(newSupply); } + + @Override + public Map> getConnectedEdges() { + List supplyingEdges = (this.supplierEdge != null) ? List.of(this.supplierEdge) : List.of(); + + return Map.of(FlowEdge.NodeType.CONSUMING, supplyingEdges, FlowEdge.NodeType.SUPPLYING, this.consumerEdges); + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java index 9521f2ce..95eac20b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -40,6 +40,11 @@ public class FlowEdge { private double capacity; + public enum NodeType { + CONSUMING, + SUPPLYING + } + public FlowEdge(FlowConsumer consumer, FlowSupplier supplier) { if (!(consumer instanceof FlowNode)) { throw new IllegalArgumentException("Flow consumer is not a FlowNode"); @@ -69,6 +74,24 @@ public class FlowEdge { } } + /** + * Close the edge of the specified node type. + * + * @param nodeType The type of connected node that is being closed. + */ + public void close(NodeType nodeType) { + if (nodeType == NodeType.CONSUMING) { + this.consumer = null; + this.supplier.removeConsumerEdge(this); + this.supplier = null; + } + if (nodeType == NodeType.SUPPLYING) { + this.supplier = null; + this.consumer.removeSupplierEdge(this); + this.consumer = null; + } + } + public FlowConsumer getConsumer() { return consumer; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java deleted file mode 100644 index 60d57785..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.graph; - -import java.util.ArrayList; -import java.util.HashMap; -import org.opendc.simulator.engine.engine.FlowEngine; - -public class FlowGraph { - private final FlowEngine engine; - private final ArrayList nodes = new ArrayList<>(); - private final ArrayList edges = new ArrayList<>(); - private final HashMap> nodeToEdge = new HashMap<>(); - - public FlowGraph(FlowEngine engine) { - this.engine = engine; - } - - /** - * Return the {@link FlowEngine} driving the simulation of the graph. - */ - public FlowEngine getEngine() { - return engine; - } - - /** - * Create a new {@link FlowNode} representing a node in the flow network. - */ - public void addNode(FlowNode node) { - if (nodes.contains(node)) { - System.out.println("Node already exists"); - } - nodes.add(node); - nodeToEdge.put(node, new ArrayList<>()); - long now = this.engine.getClock().millis(); - node.invalidate(now); - } - - /** - * Internal method to remove the specified {@link FlowNode} from the graph. - */ - public void removeNode(FlowNode node) { - - // Remove all edges connected to node - final ArrayList connectedEdges = nodeToEdge.get(node); - while (!connectedEdges.isEmpty()) { - removeEdge(connectedEdges.get(0)); - } - - nodeToEdge.remove(node); - - // remove the node - nodes.remove(node); - } - - /** - * Add an edge between the specified consumer and supplier in this graph. - */ - public FlowEdge addEdge(FlowConsumer flowConsumer, FlowSupplier flowSupplier) { - // Check if the consumer and supplier are both FlowNodes - if (!(flowConsumer instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - if (!(flowSupplier instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - - // Check of the consumer and supplier are present in this graph - if (!(this.nodes.contains((FlowNode) flowConsumer))) { - throw new IllegalArgumentException("The consumer is not a node in this graph"); - } - if (!(this.nodes.contains((FlowNode) flowSupplier))) { - throw new IllegalArgumentException("The supplier is not a node in this graph"); - } - - final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier); - - edges.add(flowEdge); - - nodeToEdge.get((FlowNode) flowConsumer).add(flowEdge); - nodeToEdge.get((FlowNode) flowSupplier).add(flowEdge); - - return flowEdge; - } - - public void removeEdge(FlowEdge flowEdge) { - final FlowConsumer consumer = flowEdge.getConsumer(); - final FlowSupplier supplier = flowEdge.getSupplier(); - nodeToEdge.get((FlowNode) consumer).remove(flowEdge); - nodeToEdge.get((FlowNode) supplier).remove(flowEdge); - - edges.remove(flowEdge); - flowEdge.close(); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index e24e9f93..cbfe39a3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -23,13 +23,15 @@ package org.opendc.simulator.engine.graph; import java.time.InstantSource; +import java.util.List; +import java.util.Map; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.engine.FlowEventQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A {@link FlowNode} represents a node in a {@link FlowGraph}. + * A {@link FlowNode} represents a node in the {@link FlowEngine}. */ public abstract class FlowNode { private static final Logger LOGGER = LoggerFactory.getLogger(FlowNode.class); @@ -76,14 +78,6 @@ public abstract class FlowNode { this.clock = clock; } - public FlowGraph getParentGraph() { - return parentGraph; - } - - public void setParentGraph(FlowGraph parentGraph) { - this.parentGraph = parentGraph; - } - public FlowEngine getEngine() { return engine; } @@ -116,29 +110,22 @@ public abstract class FlowNode { private Boolean inCycleQueue = false; protected InstantSource clock; - protected FlowGraph parentGraph; protected FlowEngine engine; - /** - * Return the {@link FlowGraph} to which this stage belongs. - */ - public FlowGraph getGraph() { - return parentGraph; - } - /** * Construct a new {@link FlowNode} instance. * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} driving the simulation. */ - public FlowNode(FlowGraph parentGraph) { - this.parentGraph = parentGraph; - this.engine = parentGraph.getEngine(); + public FlowNode(FlowEngine engine) { + this.engine = engine; this.clock = engine.getClock(); - this.parentGraph.addNode(this); + this.invalidate(); } + public abstract Map> getConnectedEdges(); + /** * Invalidate the {@link FlowNode} forcing the stage to update. * @@ -228,8 +215,25 @@ public abstract class FlowNode { // Mark the stage as closed this.nodeState = NodeState.CLOSED; - // Remove stage from parent graph - this.parentGraph.removeNode(this); + // Get Connected Edges + Map> connectedEdges = getConnectedEdges(); + + // Remove connected edges + List consumerEdges = connectedEdges.get(FlowEdge.NodeType.CONSUMING); + if (consumerEdges != null) { + for (FlowEdge edge : consumerEdges) { + edge.close(FlowEdge.NodeType.CONSUMING); + } + } + + // Remove connected edges + List supplierEdges = connectedEdges.get(FlowEdge.NodeType.SUPPLYING); + + if (supplierEdges != null) { + for (FlowEdge edge : supplierEdges) { + edge.close(FlowEdge.NodeType.SUPPLYING); + } + } // Remove stage from the timer queue this.deadline = Long.MAX_VALUE; -- cgit v1.2.3