diff options
28 files changed, 2131 insertions, 287 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index 791ab692..ced38480 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -33,8 +33,9 @@ import org.opendc.simulator.compute.power.SimPowerSource import org.opendc.simulator.compute.power.batteries.BatteryAggregator import org.opendc.simulator.compute.power.batteries.SimBattery import org.opendc.simulator.engine.engine.FlowEngine -import org.opendc.simulator.engine.graph.FlowDistributor import org.opendc.simulator.engine.graph.FlowEdge +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory.DistributionPolicy /** * A [ProvisioningStep] that provisions a list of hosts for a [ComputeService]. @@ -66,7 +67,11 @@ public class HostsProvisioningStep internal constructor( simPowerSources.add(simPowerSource) service.addPowerSource(simPowerSource) - val hostDistributor = FlowDistributor(engine) + val hostDistributor = + FlowDistributorFactory.getFlowDistributor( + engine, + DistributionPolicy.MAX_MIN_FAIRNESS, + ) val carbonFragments = getCarbonFragments(cluster.powerSource.carbonTracePath) @@ -80,7 +85,11 @@ public class HostsProvisioningStep internal constructor( if (cluster.battery != null) { // Create Battery Distributor - val batteryDistributor = FlowDistributor(engine) + val batteryDistributor = + FlowDistributorFactory.getFlowDistributor( + engine, + DistributionPolicy.MAX_MIN_FAIRNESS, + ) FlowEdge(batteryDistributor, simPowerSource) // Create Battery diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt index 09a8fe64..c1617304 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt @@ -31,13 +31,12 @@ import org.opendc.compute.topology.specs.HostJSONSpec import org.opendc.compute.topology.specs.HostSpec import org.opendc.compute.topology.specs.PowerSourceSpec import org.opendc.compute.topology.specs.TopologySpec +import org.opendc.compute.topology.specs.toDistributionPolicy import org.opendc.simulator.compute.models.CpuModel import org.opendc.simulator.compute.models.GpuModel import org.opendc.simulator.compute.models.MachineModel import org.opendc.simulator.compute.models.MemoryUnit import org.opendc.simulator.compute.power.getPowerModel -import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicyFactory -import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicyFactory.DistributionPolicyType import java.io.File import java.io.InputStream @@ -189,9 +188,8 @@ private fun HostJSONSpec.toHostSpec(clusterName: String): HostSpec { units, unknownMemoryUnit, gpuUnits, - // TODO: Pass through - DistributionPolicyFactory.getDistributionStrategy(DistributionPolicyType.MaxMinFairness), - DistributionPolicyFactory.getDistributionStrategy(DistributionPolicyType.MaxMinFairness), + cpuDistributionPolicy.toDistributionPolicy(), + gpuDistributionPolicy.toDistributionPolicy(), ) val cpuPowerModel = @@ -227,6 +225,8 @@ private fun HostJSONSpec.toHostSpec(clusterName: String): HostSpec { machineModel, cpuPowerModel, gpuPowerModel, + cpuDistributionPolicy = cpuDistributionPolicy.toDistributionPolicy(), + gpuDistributionPolicy = gpuDistributionPolicy.toDistributionPolicy(), ) return hostSpec } diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/HostSpec.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/HostSpec.kt index 30a75896..8bfef464 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/HostSpec.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/HostSpec.kt @@ -24,6 +24,7 @@ package org.opendc.compute.topology.specs import org.opendc.simulator.compute.models.MachineModel import org.opendc.simulator.compute.power.PowerModel +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory.DistributionPolicy /** * Description of a physical host that will be simulated by OpenDC and host the virtual machines. @@ -40,4 +41,6 @@ public data class HostSpec( val gpuPowerModel: PowerModel?, val embodiedCarbon: Double = 1000.0, val expectedLifetime: Double = 5.0, + val cpuDistributionPolicy: DistributionPolicy = DistributionPolicy.MAX_MIN_FAIRNESS, + val gpuDistributionPolicy: DistributionPolicy = DistributionPolicy.MAX_MIN_FAIRNESS, ) diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/TopologySpecs.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/TopologySpecs.kt index 62c3906a..f5c8ab31 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/TopologySpecs.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/TopologySpecs.kt @@ -36,6 +36,7 @@ import org.opendc.simulator.compute.power.batteries.policy.RunningMeanBatteryPol import org.opendc.simulator.compute.power.batteries.policy.RunningMeanPlusBatteryPolicy import org.opendc.simulator.compute.power.batteries.policy.SingleThresholdBatteryPolicy import org.opendc.simulator.engine.engine.FlowEngine +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory.DistributionPolicy /** * Definition of a Topology modeled in the simulation. @@ -80,6 +81,8 @@ public data class HostJSONSpec( val gpu: GPUJSONSpec? = null, val cpuPowerModel: PowerModelSpec = PowerModelSpec.DFLT, val gpuPowerModel: PowerModelSpec = PowerModelSpec.DFLT, + val cpuDistributionPolicy: DistributionPolicySpec = MaxMinFairnessDistributionPolicySpec(), + val gpuDistributionPolicy: DistributionPolicySpec = MaxMinFairnessDistributionPolicySpec(), ) /** @@ -160,6 +163,59 @@ public data class PowerModelSpec( } } +@Serializable +public sealed interface DistributionPolicySpec { + public val type: DistributionPolicy +} + +@Serializable +@SerialName("BEST_EFFORT") +public data class BestEffortDistributionPolicySpec( + override val type: DistributionPolicy = DistributionPolicy.BEST_EFFORT, + val updateIntervalLength: Long = 1000L, +) : DistributionPolicySpec + +@Serializable +@SerialName("EQUAL_SHARE") +public data class EqualShareDistributionPolicySpec( + override val type: DistributionPolicy = DistributionPolicy.EQUAL_SHARE, +) : DistributionPolicySpec + +@Serializable +@SerialName("FIRST_FIT") +public data class FirstFitDistributionPolicySpec( + override val type: DistributionPolicy = DistributionPolicy.FIRST_FIT, +) : DistributionPolicySpec + +@Serializable +@SerialName("FIXED_SHARE") +public data class FixedShareDistributionPolicySpec( + override val type: DistributionPolicy = DistributionPolicy.FIXED_SHARE, + val shareRatio: Double = 1.0, +) : DistributionPolicySpec + +public fun DistributionPolicySpec.toDistributionPolicy(): DistributionPolicy { + return when (this) { + is BestEffortDistributionPolicySpec -> + DistributionPolicy.BEST_EFFORT.apply { + setProperty("updateIntervalLength", updateIntervalLength) + } + is EqualShareDistributionPolicySpec -> DistributionPolicy.EQUAL_SHARE + is FixedShareDistributionPolicySpec -> + DistributionPolicy.FIXED_SHARE.apply { + setProperty("shareRatio", shareRatio) + } + is FirstFitDistributionPolicySpec -> DistributionPolicy.FIRST_FIT + is MaxMinFairnessDistributionPolicySpec -> DistributionPolicy.MAX_MIN_FAIRNESS + } +} + +@Serializable +@SerialName("MAX_MIN_FAIRNESS") +public data class MaxMinFairnessDistributionPolicySpec( + override val type: DistributionPolicy = DistributionPolicy.MAX_MIN_FAIRNESS, +) : DistributionPolicySpec + /** * Definition of a power source used for JSON input. * diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/DistributionPoliciesTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/DistributionPoliciesTest.kt new file mode 100644 index 00000000..68111bc1 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/DistributionPoliciesTest.kt @@ -0,0 +1,782 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.base + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.workload.Task +import org.opendc.simulator.compute.workload.trace.TraceFragment +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory.DistributionPolicy +import java.util.ArrayList + +class DistributionPoliciesTest { + /** + * This test verifies that the DistributionPolicies are correctly loaded from the configuration. + */ + @Test + fun distributionPoliciesTest() { + val maxMinFairnessGpuTopology = createTopology("DistributionPolicies/maxMinFairness/multi_gpu_host.json") + val equalShareGpuTopology = createTopology("DistributionPolicies/equalShare/dual_core_gpu_host.json") + val fixedShareGpuTopology = createTopology("DistributionPolicies/fixedShare/multi_gpu_host.json") + val bestEffortGpuTopology = createTopology("DistributionPolicies/bestEffort/multi_gpu_host.json") + + assertAll( + { + assertEquals( + DistributionPolicy.MAX_MIN_FAIRNESS, + maxMinFairnessGpuTopology[0].hostSpecs[0].model.gpuDistributionStrategy, + "MaxMinFairnessDistributionPolicy should be used", + ) + }, + { + assertEquals( + DistributionPolicy.EQUAL_SHARE, + equalShareGpuTopology[0].hostSpecs[0].model.gpuDistributionStrategy, + "EqualShareDistributionPolicy should be used", + ) + }, + { + assertEquals( + DistributionPolicy.FIXED_SHARE, + fixedShareGpuTopology[0].hostSpecs[0].model.gpuDistributionStrategy, + "FixedShareDistributionPolicy should be used", + ) + }, + { + assertEquals( + 0.5, + fixedShareGpuTopology[0].hostSpecs[0].model.gpuDistributionStrategy.getProperty("shareRatio"), + "FixedShareDistributionPolicy should have a share ratio of 0.5", + ) + }, + { + assertEquals( + DistributionPolicy.BEST_EFFORT, + bestEffortGpuTopology[0].hostSpecs[0].model.gpuDistributionStrategy, + "BestEffortDistributionPolicy should be used", + ) + }, + ) + } + + /** + * This test verifies that the [EqualShareDistributionPolicy] correctly distributes supply according to the number of suppliers. + * The supply is divided equally among all suppliers. + */ + @Test + fun equalShareDistributionPolicyTest1() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 2000.0, 1), + ), + ), + ) + + val singleGpuTopology = createTopology("DistributionPolicies/equalShare/dual_core_gpu_host.json") + val doubleGpuTopology = createTopology("DistributionPolicies/equalShare/multi_gpu_host.json") + + val singleMonitor = runTest(singleGpuTopology, workload) + val doubleMonitor = runTest(doubleGpuTopology, workload) + + assertAll( + // single gpu + { assertEquals(2000.0, singleMonitor.taskGpuDemands["0"]?.get(1), "Single GPU demand in task \"0\" should be 2000.0") }, + { assertEquals(4000.0, singleMonitor.taskGpuSupplied["0"]?.get(1), "Single GPU demand in task \"0\" should be 2000.0") }, + { + assertEquals( + 4000.0, + singleMonitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), + "Single GPU demand at host \"DualGpuHost\" should be 2000.0", + ) + }, + { + assertEquals( + 4000.0, + singleMonitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), + "Single GPU supply at host \"DualGpuHost\" should be 2000.0", + ) + }, + // double gpu + { assertEquals(2000.0, doubleMonitor.taskGpuDemands["0"]?.get(1), "Double GPU demand in task \"0\" should be 2000.0") }, + { assertEquals(4000.0, doubleMonitor.taskGpuSupplied["0"]?.get(1), "Double GPU supplied in task \"0\" should be 4000.0") }, + { + assertEquals( + 2000.0, + doubleMonitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), + "Double GPU demand for GPU 0 at host \"DualGpuHost\" should be 2000.0", + ) + }, + { + assertEquals( + 2000.0, + doubleMonitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), + "Double GPU supplied for GPU 0 at host \"DualGpuHost\" should be 2000.0", + ) + }, + { + assertEquals( + 2000.0, + doubleMonitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), + "Double GPU demand for GPU 1 at host \"DualGpuHost\" should be 2000.0", + ) + }, + { + assertEquals( + 2000.0, + doubleMonitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), + "Double GPU supplied for GPU 1 at host \"DualGpuHost\" should be 2000.0", + ) + }, + ) + } + + /** + * This test verifies that the [EqualShareDistributionPolicy] correctly distributes supply according to the number of suppliers. + * The supply is divided equally among all suppliers. + */ + @Test + fun equalShareDistributionPolicyTest2() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 4000.0, 2), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 4000.0, 2), + ), + ), + ) + + val singleGpuTopology = createTopology("DistributionPolicies/equalShare/dual_core_gpu_host.json") + val doubleGpuTopology = createTopology("DistributionPolicies/equalShare/multi_gpu_host.json") + + val singleMonitor = runTest(singleGpuTopology, workload) + val doubleMonitor = runTest(doubleGpuTopology, workload) + + assertAll( + // single gpu + // task 0 + { assertEquals(4000.0, singleMonitor.taskGpuDemands["0"]?.get(1), "Single GPU demand in task \"0\" should be 4000.0") }, + { assertEquals(2000.0, singleMonitor.taskGpuSupplied["0"]?.get(1), "Single GPU supplied in task \"0\" should be 2000.0") }, + // task 1 + { assertEquals(4000.0, singleMonitor.taskGpuDemands["1"]?.get(1), "Single GPU demand in task \"0\" should be 4000.0") }, + { assertEquals(2000.0, singleMonitor.taskGpuSupplied["1"]?.get(1), "Single GPU supplied in task \"0\" should be 2000.0") }, + // host + { + assertEquals( + 4000.0, + singleMonitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), + "Single GPU demand at host \"DualGpuHost\" should be 4000.0", + ) + }, + { + assertEquals( + 4000.0, + singleMonitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), + "Single GPU supply at host \"DualGpuHost\" should be 4000.0", + ) + }, + // double gpu + // task 0 + { assertEquals(4000.0, doubleMonitor.taskGpuDemands["0"]?.get(1), "Double GPU demand in task \"0\" should be 4000.0") }, + { assertEquals(2000.0, doubleMonitor.taskGpuSupplied["0"]?.get(1), "Double GPU supply in task \"0\" should be 2000.0") }, + // task 1 + { assertEquals(4000.0, doubleMonitor.taskGpuDemands["1"]?.get(1), "Double GPU demand in task \"0\" should be 4000.0") }, + { assertEquals(2000.0, doubleMonitor.taskGpuSupplied["1"]?.get(1), "Double GPU supply in task \"0\" should be 2000.0") }, + // host + { + assertEquals( + 2000.0, + doubleMonitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), + "Double GPU demand for GPU 0 at host \"DualGpuHost\" should be 2000.0", + ) + }, + { + assertEquals( + 2000.0, + doubleMonitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), + "Double GPU supply for GPU 0 at host \"DualGpuHost\" should be 2000.0", + ) + }, + { + assertEquals( + 2000.0, + doubleMonitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), + "Double GPU demand for GPU 1 at host \"DualGpuHost\" should be 2000.0", + ) + }, + { + assertEquals( + 2000.0, + doubleMonitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), + "Double GPU supply for GPU 1 at host \"DualGpuHost\" should be 2000.0", + ) + }, + ) + } + + /** + * This test verifies that the [FixedShareDistributionPolicy] correctly distributes supply according to the fixed share. + * The supply is divided according to the fixed share defined for each supplier. + */ + @Test + fun fixedShareDistributionPolicyTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 4000.0, 2), + ), + ), + ) + + val topology = createTopology("DistributionPolicies/fixedShare/multi_gpu_host.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals(4000.0, monitor.taskGpuDemands["0"]?.get(1), "Task GPU demand should be 4000.0") }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1), "Task GPU supplied should be 1000.0") }, + // Host + { + assertEquals( + 1000.0, + monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), + "GPU 0 demand at host should be 1000.0 (50% of the capacity)", + ) + }, + { assertEquals(1000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), "GPU 0 supplied at host should be 1000.0") }, + { + assertEquals( + 1000.0, + monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), + "GPU 1 demand at host should be 1000.0 (50% of the capacity)", + ) + }, + { assertEquals(1000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), "GPU 1 supplied at host should be 1000.0") }, + ) + } + + /** + * This test verifies that the [FixedShareDistributionPolicy] correctly handles resource contention. + * When total demand exceeds available supply, resources should be distributed according to the fixed share ratio. + */ + @Test + fun fixedShareDistributionPolicyContentionTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 6000.0, 2), + ), + ), + ) + + val topology = createTopology("DistributionPolicies/fixedShare/multi_gpu_host.json") + + val monitor = runTest(topology, workload) + + // With demand of 6000.0 but total GPU capacity of 4000.0 (2 GPUs * 2000.0 each) + // Fixed share ratio of 0.5 means each GPU gets 50% of available capacity = 2000.0 each + // Total supplied should be 4000.0 (limited by total capacity) + assertAll( + { assertEquals(6000.0, monitor.taskGpuDemands["0"]?.get(1), "Task GPU demand should be 6000.0") }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1), "Task GPU supplied should be 1000.0 (limited by the capacity)") }, + // Host + { + assertEquals( + 1000.0, + monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), + "GPU 0 demand at host should be 1000.0 (50% of the gpu capacity)", + ) + }, + { + assertEquals( + 1000.0, + monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), + "GPU 0 supplied at host should be 1000.0 (limited by GPU capacity)", + ) + }, + { + assertEquals( + 1000.0, + monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), + "GPU 1 demand at host should be 1000.0 (50% of the gpu capacity)", + ) + }, + { + assertEquals( + 1000.0, + monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), + "GPU 1 supplied at host should be 1000.0 (limited by GPU capacity)", + ) + }, + ) + } + + /** + * This test verifies that the [FixedShareDistributionPolicy] correctly handles multiple tasks competing for resources. + * Resources should be distributed proportionally according to the fixed share ratio among all tasks. + */ + @Test + fun fixedShareDistributionPolicyMultipleTasksTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 3000.0, 2), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 3000.0, 2), + ), + ), + ) + + val topology = createTopology("DistributionPolicies/fixedShare/multi_gpu_host.json") + + val monitor = runTest(topology, workload) + + // Total demand: 6000.0 (3000.0 from each task) + // Total capacity: 4000.0 (2 GPUs * 2000.0 each) + // So each task gets 1000.0, distributed as 1000.0 per GPU (50% share ratio) + assertAll( + // Task 0 + { assertEquals(3000.0, monitor.taskGpuDemands["0"]?.get(1), "Task 0 GPU demand should be 3000.0") }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1), "Task 0 GPU supplied should be 1000.0") }, + // Task 1 + { assertEquals(3000.0, monitor.taskGpuDemands["1"]?.get(1), "Task 1 GPU demand should be 3000.0") }, + { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(1), "Task 1 GPU supplied should be 1000.0") }, + // Host + { assertEquals(1000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), "GPU 0 total demand at host should be 1000.0") }, + { + assertEquals( + 1000.0, + monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), + "GPU 0 total supplied at host should be 1000.0", + ) + }, + { assertEquals(1000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), "GPU 1 total demand at host should be 1000.0") }, + { + assertEquals( + 1000.0, + monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), + "GPU 1 total supplied at host should be 1000.0", + ) + }, + ) + } + + /** + * This test verifies that the [BestEffortDistributionPolicy] correctly distributes supply based on demand + * when resources are abundant. It should satisfy all demands and distribute remaining capacity optimally. + */ + @Test + fun bestEffortDistributionPolicyBasicTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 1500.0, 1), + ), + ), + ) + + val singleGpuTopology = createTopology("DistributionPolicies/bestEffort/dual_core_gpu_host.json") + val doubleGpuTopology = createTopology("DistributionPolicies/bestEffort/multi_gpu_host.json") + + val singleMonitor = runTest(singleGpuTopology, workload) + val doubleMonitor = runTest(doubleGpuTopology, workload) + + assertAll( + // single gpu - should satisfy demand and utilize remaining capacity + { assertEquals(1500.0, singleMonitor.taskGpuDemands["0"]?.get(1), "Single GPU demand in task \"0\" should be 1500.0") }, + { assertEquals(1500.0, singleMonitor.taskGpuSupplied["0"]?.get(1)) { "Single GPU should supply the demanded 1500.0" } }, + // Host + { + assertEquals( + 1500.0, + singleMonitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), + "Single GPU demand at host \"DualGpuHost\" should be 1500.0", + ) + }, + { + assertEquals( + 1500.0, + singleMonitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), + "Single GPU supply at host \"DualGpuHost\" should be 1500.0", + ) + }, + // double gpu - should distribute across both GPUs and utilize remaining capacity + { assertEquals(1500.0, doubleMonitor.taskGpuDemands["0"]?.get(1), "Double GPU demand in task \"0\" should be 1500.0") }, + { assertEquals(1500.0, doubleMonitor.taskGpuSupplied["0"]?.get(1), "Double GPU should supply the demanded 1500.0") }, + // Host + { + assertEquals( + 1500.0, + doubleMonitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), + "GPU 0 demand at host \"DualGpuHost\" should be 750.0", + ) + }, + { + assertEquals( + 1500.0, + doubleMonitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), + "GPU 0 supply at host \"DualGpuHost\" should be 750.0", + ) + }, + { + assertEquals( + 0.0, + doubleMonitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), + "GPU 1 demand at host \"DualGpuHost\" should be 750.0", + ) + }, + { + assertEquals( + 0.0, + doubleMonitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), + "GPU 1 supply at host \"DualGpuHost\" should be 750.0", + ) + }, + ) + } + + /** + * This test verifies that the [BestEffortDistributionPolicy] correctly handles resource contention + * by using round-robin distribution when demand exceeds supply. + */ + @Test + fun bestEffortDistributionPolicyContentionTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 3000.0, 2), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 2500.0, 2), + ), + ), + ) + + val topology = createTopology("DistributionPolicies/bestEffort/multi_gpu_host.json") + val monitor = runTest(topology, workload) + + // Total demand: 5500.0 (3000.0 + 2500.0) + // Total capacity: 4000.0 (2 GPUs * 2000.0 each) + // Best effort should distribute proportionally based on demand while using round-robin + assertAll( + // Task 0 + { assertEquals(3000.0, monitor.taskGpuDemands["0"]?.get(1), "Task 0 GPU demand should be 3000.0") }, + { assertEquals(3000.0, monitor.taskGpuSupplied["0"]?.get(1), "Task 0 GPU supply should be 1000.0") }, + // Task 1 + { assertEquals(2500.0, monitor.taskGpuDemands["1"]?.get(1), "Task 1 GPU demand should be 2500.0") }, + { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(1), "Task 0 GPU supply should be 1000.0") }, + // Host + { assertEquals(2750.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), "GPU 0 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), "GPU 0 supplied at host should be 2000.0") }, + { assertEquals(2750.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), "GPU 1 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), "GPU 1 supplied at host should be 2000.0") }, + ) + } + + /** + * This test verifies that the [BestEffortDistributionPolicy] prioritizes already utilized resources + * when supply exceeds demand, demonstrating the efficiency optimization principle. + */ + @Test + fun bestEffortDistributionPolicyUtilizationOptimizationTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 1000.0, 1), + ), + ), + ) + + val doubleGpuTopology = createTopology("DistributionPolicies/bestEffort/multi_gpu_host.json") + val monitor = runTest(doubleGpuTopology, workload) + + // With low demand (1000.0) and high capacity (4000.0), best effort should: + // 1. Satisfy the demand + // 2. Utilize remaining capacity efficiently + assertAll( + { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(1), "Task GPU demand should be 1000.0") }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1), "Task GPU supplied should be 1000.0") }, + // host + { assertEquals(1000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), "GPU 0 demand at host should be 1000.0") }, + { assertEquals(1000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), "GPU 0 supplied at host should be 1000.0") }, + { assertEquals(0.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), "GPU 1 demand at host should be 0.0") }, + { assertEquals(0.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), "GPU 1 supplied at host should be 0.0") }, + ) + } + + /** + * This test verifies that the [BestEffortDistributionPolicy] handles varying demands correctly + * and does not distribute the resources equally. + */ + @Test + fun bestEffortDistributionPolicyVaryingDemandsTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 3500.0, 2), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 500.0, 2), + ), + ), + ) + + val topology = createTopology("DistributionPolicies/bestEffort/multi_gpu_host.json") + + val monitor = runTest(topology, workload) + + // Best effort should prioritize the high-demand task differently than equal share + assertAll( + // Best effort should allocate more to high-demand task compared to equal share + { assertEquals(3500.0, monitor.taskGpuDemands["0"]?.get(1), "Task 0 demand should be 3500.0") }, + { assertEquals(3500.0, monitor.taskGpuDemands["0"]?.get(1), "Task 0 supply should be 3500.0") }, + { assertEquals(500.0, monitor.taskGpuDemands["1"]?.get(1), "Task 1 demand should be 500.0") }, + { assertEquals(500.0, monitor.taskGpuSupplied["1"]?.get(1), "Task 1 supply should be 500.0") }, + // Host + { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), "GPU 0 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), "GPU 0 supplied at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), "GPU 1 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), "GPU 1 supplied at host should be 2000.0") }, + ) + } + + /** + * This test verifies that the [BestEffortDistributionPolicy] maintains fairness over time + * through its round-robin mechanism when resources are constrained. + */ + @Test + fun bestEffortDistributionPolicyFairnessTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 2000.0, 2), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 2000.0, 2), + ), + ), + createTestTask( + name = "2", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 2000.0, 2), + ), + ), + ) + + val topology = createTopology("DistributionPolicies/bestEffort/multi_gpu_host.json") + val monitor = runTest(topology, workload) + + // With equal demands (2000.0 each) and limited capacity (4000.0 total) + // Best effort should distribute fairly among all tasks in a round-robin manner + assertAll( + // Task Demands at start + { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(1), "Task 0 demand should be 2000.0") }, + { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(1), "Task 1 demand should be 2000.0") }, + { assertEquals(2000.0, monitor.taskGpuDemands["2"]?.get(1), "Task 2 demand should be 2000.0") }, + // Task supplies at start + { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(1), "Task 0 supply at the start should be 2000.0") }, + { assertEquals(0.0, monitor.taskGpuSupplied["1"]?.get(1), "Task 1 supply at the start should be 2000.0") }, + { assertEquals(2000.0, monitor.taskGpuSupplied["2"]?.get(1), "Task 2 supply at the start should be 0.0") }, + // Task supplies second step + { assertEquals(0.0, monitor.taskGpuSupplied["0"]?.get(2), "Task 0 supply at the second step should be 2000.0") }, + { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(2), "Task 1 supply at the second step should be 0.0") }, + { assertEquals(2000.0, monitor.taskGpuSupplied["2"]?.get(2), "Task 2 supply at the second step should be 2000.0") }, + // Task supplies third step + { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(3), "Task 0 supply at the third step should be 2000.0") }, + { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(3), "Task 1 supply at the third step should be 2000.0") }, + { assertEquals(0.0, monitor.taskGpuSupplied["2"]?.get(3), "Task 2 supply at the third step should be 0.0") }, + // Host + // At start + { assertEquals(3000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), "GPU 0 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), "GPU 0 supplied at host should be 2000.0") }, + { assertEquals(3000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), "GPU 1 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), "GPU 1 supplied at host should be 2000.0") }, + // Next Round + { assertEquals(3000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(2)?.get(0), "GPU 0 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(2)?.get(0), "GPU 0 supplied at host should be 2000.0") }, + { assertEquals(3000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(2)?.get(1), "GPU 1 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(2)?.get(1), "GPU 1 supplied at host should be 2000.0") }, + // Next Round + { assertEquals(3000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(3)?.get(0), "GPU 0 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(3)?.get(0), "GPU 0 supplied at host should be 2000.0") }, + { assertEquals(3000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(3)?.get(1), "GPU 1 demand at host should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(3)?.get(1), "GPU 1 supplied at host should be 2000.0") }, + ) + } + + /** + * This test verifies that the [FirstFitDistributionPolicy] places workloads on the first GPU + * before utilizing the second GPU, demonstrating the First Fit allocation strategy. + * All tasks should be satisfied as total demand is within available capacity. + */ + @Test + fun firstFitDistributionPolicyGpuPlacementTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 1500.0, 1), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 1000.0, 1), + ), + ), + ) + + val topology = createTopology("DistributionPolicies/firstFit/multi_gpu_host.json") + val monitor = runTest(topology, workload) + + // With First Fit policy, tasks should be placed on first GPU before second GPU + // Total demand (2500.0) is less than total capacity (4000.0), so all should be satisfied + assertAll( + // Task demands should remain as requested + { assertEquals(1500.0, monitor.taskGpuDemands["0"]?.get(1), "Task 0 GPU demand should be 1500.0") }, + { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(1), "Task 1 GPU demand should be 1000.0") }, + // All tasks should be fully satisfied + { assertEquals(1500.0, monitor.taskGpuSupplied["0"]?.get(1), "Task 0 GPU supply should be 1500.0") }, + { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(1), "Task 1 GPU supply should be 1000.0") }, + // First GPU should handle both tasks (total 2500.0, within its 2000.0 capacity limit per task) + { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), "GPU 0 demand should be 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), "GPU 0 supply should be 2000.0") }, + // Second GPU should have remaining demand + { assertEquals(500.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), "GPU 1 demand should be 500.0") }, + { assertEquals(500.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), "GPU 1 supply should be 500.0") }, + ) + } + + /** + * This test verifies that the [FirstFitDistributionPolicy] correctly handles scenarios + * where overall demand exceeds total available supply. Some tasks should receive no supply + * if they cannot be satisfied by a single GPU. + */ + @Test + fun firstFitDistributionPolicyOverdemandTest() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 2000.0, 1), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 2000.0, 1), + ), + ), + createTestTask( + name = "2", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 0.0, 0, 1500.0, 1), + ), + ), + ) + + val topology = createTopology("DistributionPolicies/firstFit/multi_gpu_host.json") + val monitor = runTest(topology, workload) + + // With First Fit policy and total demand (5500.0) > total capacity (4000.0), + // only tasks that can fit on individual GPUs should be satisfied + assertAll( + // Task demands should remain as requested + { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(1), "Task 0 GPU demand should be 2000.0") }, + { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(1), "Task 1 GPU demand should be 2000.0") }, + { assertEquals(1500.0, monitor.taskGpuDemands["2"]?.get(1), "Task 2 GPU demand should be 1500.0") }, + // First two tasks should be satisfied (each fits on one GPU) + { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(1), "Task 0 should be fully satisfied") }, + { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(1), "Task 1 should be fully satisfied") }, + // Third task should receive no supply as no single GPU can satisfy it after first two are allocated + { assertEquals(0.0, monitor.taskGpuSupplied["2"]?.get(1), "Task 2 should receive no supply") }, + // Both GPUs should be fully utilized by the first two tasks + { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0), "GPU 0 should have 2000.0 demand") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0), "GPU 0 should supply 2000.0") }, + { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1), "GPU 1 should have 2000.0 demand") }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1), "GPU 1 should supply 2000.0") }, + ) + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt index 8f71b7e7..efb83814 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt @@ -109,7 +109,7 @@ class SchedulerTest { ) // Topology with 1 host having 2 GPUs (both tasks can fit on one host) - val fittingTopology = createTopology("Gpus/dual_gpu_host.json") + val fittingTopology = createTopology("Gpus/dual_core_gpu_host.json") // Topology with 2 hosts each having 1 GPU (tasks must be distributed) val nonFittingTopology = createTopology("Gpus/single_gpu_hosts.json") diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/bestEffort/dual_core_gpu_host.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/bestEffort/dual_core_gpu_host.json new file mode 100644 index 00000000..a2758fd1 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/bestEffort/dual_core_gpu_host.json @@ -0,0 +1,42 @@ +{ + "clusters": [ + { + "name": "C01", + "hosts": [ + { + "name": "DualGpuHost", + "cpu": { + "coreCount": 4, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + }, + "cpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "cpuDistributionPolicy": { + "type": "MAX_MIN_FAIRNESS" + }, + "gpu": { + "coreCount": 2, + "coreSpeed": 2000 + }, + "gpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpuDistributionPolicy": { + "type": "BEST_EFFORT", + "updateIntervalLength": 60000 + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/bestEffort/multi_gpu_host.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/bestEffort/multi_gpu_host.json new file mode 100644 index 00000000..8169793d --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/bestEffort/multi_gpu_host.json @@ -0,0 +1,40 @@ +{ + "clusters": [ + { + "name": "C01", + "hosts": [ + { + "name": "DualGpuHost", + "cpu": { + "coreCount": 4, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + }, + "cpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpu": { + "count": 2, + "coreCount": 1, + "coreSpeed": 2000 + }, + "gpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpuDistributionPolicy": { + "type": "BEST_EFFORT", + "updateIntervalLength": 60000 + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/equalShare/dual_core_gpu_host.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/equalShare/dual_core_gpu_host.json new file mode 100644 index 00000000..aa128cba --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/equalShare/dual_core_gpu_host.json @@ -0,0 +1,41 @@ +{ + "clusters": [ + { + "name": "C01", + "hosts": [ + { + "name": "DualGpuHost", + "cpu": { + "coreCount": 4, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + }, + "cpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "cpuDistributionPolicy": { + "type": "MAX_MIN_FAIRNESS" + }, + "gpu": { + "coreCount": 2, + "coreSpeed": 2000 + }, + "gpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpuDistributionPolicy": { + "type": "EQUAL_SHARE" + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/equalShare/multi_gpu_host.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/equalShare/multi_gpu_host.json new file mode 100644 index 00000000..59b0dbd2 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/equalShare/multi_gpu_host.json @@ -0,0 +1,42 @@ +{ + "clusters": [ + { + "name": "C01", + "hosts": [ + { + "name": "DualGpuHost", + "cpu": { + "coreCount": 4, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + }, + "cpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "cpuDistributionPolicy": { + "type": "MAX_MIN_FAIRNESS" + }, + "gpu": { + "count": 2, + "coreCount": 1, + "coreSpeed": 2000 + }, + "gpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpuDistributionPolicy": { + "type": "EQUAL_SHARE" + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/firstFit/multi_gpu_host.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/firstFit/multi_gpu_host.json new file mode 100644 index 00000000..306431ca --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/firstFit/multi_gpu_host.json @@ -0,0 +1,40 @@ +{ + "clusters": [ + { + "name": "C01", + "hosts": [ + { + "name": "DualGpuHost", + "cpu": { + "coreCount": 4, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + }, + "cpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpu": { + "count": 2, + "coreCount": 1, + "coreSpeed": 2000 + }, + "gpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpuDistributionPolicy": { + "type": "FIRST_FIT" + } + } + ] + } + ] +} + diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/fixedShare/multi_gpu_host.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/fixedShare/multi_gpu_host.json new file mode 100644 index 00000000..2aa3a057 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/fixedShare/multi_gpu_host.json @@ -0,0 +1,40 @@ +{ + "clusters": [ + { + "name": "C01", + "hosts": [ + { + "name": "DualGpuHost", + "cpu": { + "coreCount": 4, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + }, + "cpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpu": { + "count": 2, + "coreCount": 1, + "coreSpeed": 2000 + }, + "gpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpuDistributionPolicy": { + "type": "FIXED_SHARE", + "shareRatio": 0.5 + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/maxMinFairness/multi_gpu_host.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/maxMinFairness/multi_gpu_host.json new file mode 100644 index 00000000..74df2f8c --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/DistributionPolicies/maxMinFairness/multi_gpu_host.json @@ -0,0 +1,39 @@ +{ + "clusters": [ + { + "name": "C01", + "hosts": [ + { + "name": "DualGpuHost", + "cpu": { + "coreCount": 4, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + }, + "cpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpu": { + "count": 2, + "coreCount": 1, + "coreSpeed": 2000 + }, + "gpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpuDistributionPolicy": { + "type": "MAX_MIN_FAIRNESS" + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/dual_gpu_host.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/dual_core_gpu_host.json index c5271ff8..c5271ff8 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/dual_gpu_host.json +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/dual_core_gpu_host.json diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index c9e3ab8c..5f4a4fcd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -46,6 +46,7 @@ import org.opendc.simulator.engine.graph.FlowDistributor; import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory; /** * A machine that is able to execute {@link SimWorkload} objects. @@ -220,7 +221,9 @@ public class SimMachine { -1); // Create a FlowDistributor and add the cpu as supplier - this.distributors.put(ResourceType.CPU, new FlowDistributor(engine)); + this.distributors.put( + ResourceType.CPU, + FlowDistributorFactory.getFlowDistributor(engine, this.machineModel.getCpuDistributionStrategy())); new FlowEdge( this.distributors.get(ResourceType.CPU), (FlowSupplier) this.computeResources.get(ResourceType.CPU).getFirst(), @@ -232,7 +235,9 @@ public class SimMachine { this.memory = new Memory(engine, this.machineModel.getMemory()); if (this.availableResources.contains(ResourceType.GPU)) { - this.distributors.put(ResourceType.GPU, new FlowDistributor(engine)); + this.distributors.put( + ResourceType.GPU, + FlowDistributorFactory.getFlowDistributor(engine, this.machineModel.getGpuDistributionStrategy())); ArrayList<ComputeResource> gpus = new ArrayList<>(); for (GpuModel gpuModel : machineModel.getGpuModels()) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java index e11d9cf2..f8cafc35 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Objects; import org.jetbrains.annotations.Nullable; import org.opendc.common.ResourceType; -import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicy; +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory.DistributionPolicy; /** * A description of the physical or virtual machine on which a bootable image runs. @@ -85,8 +85,8 @@ public final class MachineModel { cpus.get(0).getArchitecture()), memory, null, - null, - null); + DistributionPolicy.MAX_MIN_FAIRNESS, + DistributionPolicy.MAX_MIN_FAIRNESS); } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java index 8922a97d..a8944efe 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java @@ -209,7 +209,6 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier { long delta = now - lastUpdate; for (ResourceType resourceType : this.availableResources) { - int i = 0; final double factor = this.resourceTimeScalingFactor.get(resourceType) * delta; if (delta > 0) { this.resourcePerformanceCounters diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index f7fc2728..cae3e8a1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -27,49 +27,49 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import org.opendc.common.ResourceType; import org.opendc.simulator.engine.engine.FlowEngine; -import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicy; -import org.opendc.simulator.engine.graph.distributionPolicies.MaxMinFairnessPolicy; +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { - private static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class); - private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); - private HashMap<Integer, FlowEdge> supplierEdges = +/** + * A {@link FlowDistributor} is a node that distributes supply from multiple suppliers to multiple consumers. + * It can be used to model host-level resource distribution, such as CPU, memory, and GPU distribution. + * It is a subclass of {@link FlowNode} and implements both {@link FlowSupplier} and {@link FlowConsumer}. + * It maintains a list of consumer edges and supplier edges, and it can handle incoming demands and supplies. + * It also provides methods to update outgoing demands and supplies based on the incoming demands and supplies. + * This class is abstract and should be extended by specific implementations that define the distribution strategy. + * It uses a {@link FlowDistributorFactory.DistributionPolicy} to determine how to distribute the supply among the consumers. + * The default distribution policy is {@link MaxMinFairnessPolicy}, which distributes the supply fairly among the consumers. + */ +public abstract class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { + protected static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class); + protected final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); + protected HashMap<Integer, FlowEdge> supplierEdges = new HashMap<>(); // The suppliers that provide supply to this distributor - private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers - private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers + protected final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers + protected final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers - private double totalIncomingDemand; // The total demand of all the consumers + protected double totalIncomingDemand; // The total demand of all the consumers // AS index is based on the supplierIndex of the FlowEdge, ids of entries need to be stable - private HashMap<Integer, Double> currentIncomingSupplies = + protected HashMap<Integer, Double> currentIncomingSupplies = new HashMap<>(); // The current supply provided by the suppliers - private Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers - - private boolean outgoingDemandUpdateNeeded = false; - private Set<Integer> updatedDemands = new HashSet<>(); // Array of consumers that updated their demand in this cycle + protected Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers - private ResourceType supplierResourceType; - private ResourceType consumerResourceType; + protected boolean outgoingDemandUpdateNeeded = false; + protected Set<Integer> updatedDemands = + new HashSet<>(); // Array of consumers that updated their demand in this cycle - private boolean overloaded = false; + protected ResourceType supplierResourceType; + protected ResourceType consumerResourceType; - private double capacity; // What is the max capacity. Can probably be removed - private DistributionPolicy distributionPolicy; + protected double capacity; // What is the max capacity. Can probably be removed public FlowDistributor(FlowEngine engine) { super(engine); - this.distributionPolicy = new MaxMinFairnessPolicy(); - } - - public FlowDistributor(FlowEngine engine, DistributionPolicy distributionPolicy) { - super(engine); - this.distributionPolicy = distributionPolicy; } public double getTotalIncomingDemand() { @@ -100,64 +100,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return Long.MAX_VALUE; } - private void updateOutgoingDemand() { - // equally distribute the demand to all suppliers - for (FlowEdge supplierEdge : this.supplierEdges.values()) { - this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size()); - // alternatively a relative share could be used, based on capacity minus current incoming supply - // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() - - // currentIncomingSupplies.get(idx) / supplierEdges.size())); - } - - this.outgoingDemandUpdateNeeded = false; - - this.invalidate(); - } + protected abstract void updateOutgoingDemand(); // TODO: This should probably be moved to the distribution strategy - private void updateOutgoingSupplies() { - - // If the demand is higher than the current supply, the system is overloaded. - // The available supply is distributed based on the current distribution function. - if (this.totalIncomingDemand > this.totalIncomingSupply) { - this.overloaded = true; - - double[] supplies = this.distributionPolicy.distributeSupply( - this.incomingDemands, - new ArrayList<>(this.currentIncomingSupplies.values()), - this.totalIncomingSupply); + protected abstract void updateOutgoingSupplies(); - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); - } - - } else { - - // If the distributor was overloaded before, but is not anymore: - // provide all consumers with their demand - if (this.overloaded) { - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { - this.pushOutgoingSupply( - this.consumerEdges.get(idx), - this.incomingDemands.get(idx), - this.getConsumerResourceType()); - } - } - this.overloaded = false; - } - - // Update the supplies of the consumers that changed their demand in the current cycle - else { - for (int idx : this.updatedDemands) { - this.pushOutgoingSupply( - this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType()); - } - } - } - - this.updatedDemands.clear(); - } + public abstract double[] distributeSupply( + ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply); /** * Add a new consumer. @@ -322,4 +271,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu public ResourceType getConsumerResourceType() { return this.consumerResourceType; } + + public Boolean hasSupplierEdges() { + for (FlowEdge edge : this.supplierEdges.values()) { + if (edge != null) { + return true; + } + } + return false; + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java new file mode 100644 index 00000000..4a13beb2 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java @@ -0,0 +1,277 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import java.util.Objects; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowEdge; + +/** + * A Best Effort Flow Distributor that implements a timesliced round-robin approach. + * + * Key principles: + * - Timesliced Round Robin: Resources are allocated to consumers in a round-robin manner + * - Non-Guaranteed Shares: No fixed allocation per consumer, distribution based on current demand + * - Optimized Utilization: Maximizes resource utilization during idle periods + * + * This scheduler is suitable for environments with fluctuating workloads where fairness + * is less important than maximizing overall resource utilization. + * <a href="https://docs.nvidia.com/vgpu/knowledge-base/latest/vgpu-features.html#vgpu-schedulers">original description</a> + */ +public class BestEffortFlowDistributor extends FlowDistributor { + + private int currentRoundRobinIndex = 0; + private boolean overloaded = false; + private final long roundRobinInterval; + private long lastRoundRobinUpdate; + + public BestEffortFlowDistributor(FlowEngine flowEngine, long roundRobinInterval) { + super(flowEngine); + this.roundRobinInterval = roundRobinInterval; + this.lastRoundRobinUpdate = -roundRobinInterval; + } + + /** + * Updates the outgoing demand based on the total incoming demand. + * Prioritizes already utilized suppliers when potential supply exceeds demand. + */ + @Override + protected void updateOutgoingDemand() { + + // If potential supply exceeds demand, prioritize already utilized suppliers + if (this.capacity > this.totalIncomingDemand && this.totalIncomingDemand > 0) { + // Best-effort: try to satisfy demand using already active suppliers first + double remainingDemand = this.totalIncomingDemand; + + // Phase 1: Prioritize suppliers that are currently providing supply + for (var entry : this.supplierEdges.entrySet()) { + int supplierIdx = entry.getKey(); + FlowEdge supplierEdge = entry.getValue(); + double currentSupply = this.currentIncomingSupplies.get(supplierIdx); + + if (currentSupply > 0 && remainingDemand > 0) { + // Try to satisfy as much demand as possible from this already active supplier + double demandForThisSupplier = Math.min(remainingDemand, supplierEdge.getCapacity()); + this.pushOutgoingDemand(supplierEdge, demandForThisSupplier); + remainingDemand -= demandForThisSupplier; + } + } + + // Phase 2: If demand still remains, use inactive suppliers + if (remainingDemand > 0) { + for (var entry : this.supplierEdges.entrySet()) { + int supplierIdx = entry.getKey(); + FlowEdge supplierEdge = entry.getValue(); + double currentSupply = this.currentIncomingSupplies.get(supplierIdx); + + if (currentSupply == 0 && remainingDemand > 0) { + double demandForThisSupplier = Math.min(remainingDemand, supplierEdge.getCapacity()); + this.pushOutgoingDemand(supplierEdge, demandForThisSupplier); + remainingDemand -= demandForThisSupplier; + } + } + } + } else { + // System is overloaded or no demand: distribute demand equally across all suppliers + double demandPerSupplier = this.totalIncomingDemand / this.supplierEdges.size(); + + for (FlowEdge supplierEdge : this.supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, demandPerSupplier); + } + } + + this.outgoingDemandUpdateNeeded = false; + this.invalidate(); + } + + /** + * Updates the outgoing supplies using a best-effort approach. + * When overloaded, uses round-robin distribution. Otherwise, satisfies demands optimally. + */ + @Override + protected void updateOutgoingSupplies() { + // Check if system is overloaded (demand exceeds supply) + if (this.totalIncomingDemand > this.totalIncomingSupply) { + this.overloaded = true; + + // Use the distribution algorithm for supply allocation + double[] supplies = this.distributeSupply( + this.incomingDemands, + new ArrayList<>(this.currentIncomingSupplies.values()), + this.totalIncomingSupply); + + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); + } + } else { + // System is not overloaded - satisfy all demands and utilize remaining capacity + + if (this.overloaded) { + // Transitioning from overloaded to non-overloaded state + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { + this.pushOutgoingSupply( + this.consumerEdges.get(idx), + this.incomingDemands.get(idx), + this.getConsumerResourceType()); + } + } + this.overloaded = false; + } else { + // Update supplies for consumers that changed their demand + for (int idx : this.updatedDemands) { + this.pushOutgoingSupply( + this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType()); + } + } + } + + this.updatedDemands.clear(); + } + + /** + * Distributes available supply using a best-effort, round-robin approach. + * Algorithm: + * 1. First pass: Satisfy demands up to available capacity in round-robin order + * 2. Second pass: Distribute remaining capacity to consumers with unsatisfied demand + * 3. Optimize utilization by giving extra capacity to active consumers + */ + @Override + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + int numConsumers = this.consumerEdges.size(); + double[] allocation = new double[numConsumers]; + + if (numConsumers == 0 || totalSupply <= 0) { + return allocation; + } + + double remainingSupply = totalSupply; + + // Phase 1: Round-robin distribution based on demand + // Start from the current round-robin index to ensure fairness over time + for (int round = 0; round < numConsumers && remainingSupply > 0; round++) { + int idx = (currentRoundRobinIndex + round) % numConsumers; + double demand = demands.get(idx); + + if (demand > allocation[idx]) { + // Calculate how much we can allocate in this round + double unmetDemand = demand - allocation[idx]; + double toAllocate = Math.min(unmetDemand, remainingSupply); + + allocation[idx] += toAllocate; + remainingSupply -= toAllocate; + } + } + + // Phase 2: Distribute any remaining supply to maximize utilization + // Give preference to consumers with the highest relative demand + if (remainingSupply > 0) { + // Create a list of consumers with unsatisfied demand, sorted by relative need + ArrayList<Integer> unsatisfiedConsumers = new ArrayList<>(); + for (int i = 0; i < numConsumers; i++) { + if (demands.get(i) > allocation[i]) { + unsatisfiedConsumers.add(i); + } + } + + // If no unsatisfied demand, distribute remaining capacity equally among active consumers + if (unsatisfiedConsumers.isEmpty()) { + // Find consumers with any demand (active consumers) + ArrayList<Integer> activeConsumers = new ArrayList<>(); + for (int i = 0; i < numConsumers; i++) { + if (demands.get(i) > 0) { + activeConsumers.add(i); + } + } + + if (!activeConsumers.isEmpty()) { + double extraPerConsumer = remainingSupply / activeConsumers.size(); + for (int idx : activeConsumers) { + allocation[idx] += extraPerConsumer; + } + } + } else { + // Distribute remaining supply proportionally to unsatisfied demand + double totalUnsatisfiedDemand = 0; + for (int idx : unsatisfiedConsumers) { + totalUnsatisfiedDemand += demands.get(idx) - allocation[idx]; + } + + for (int idx : unsatisfiedConsumers) { + double unsatisfiedDemand = demands.get(idx) - allocation[idx]; + double proportion = unsatisfiedDemand / totalUnsatisfiedDemand; + allocation[idx] += remainingSupply * proportion; + } + } + } + + // Update round-robin index for next allocation cycle + if (numConsumers > 0) { + currentRoundRobinIndex = (currentRoundRobinIndex + 1) % numConsumers; + } + + return allocation; + } + + /** + * Enhanced onUpdate method that implements time-sliced round-robin scheduling. + * This method ensures the round-robin index advances at regular intervals, + * creating true time-sliced behavior for best-effort scheduling. + */ + @Override + public long onUpdate(long now) { + long nextUpdate = Long.MAX_VALUE; + + boolean updateNeeded = false; + + // Check if it's time for a round-robin advancement + if (now >= lastRoundRobinUpdate + this.roundRobinInterval) { + updateNeeded = true; + lastRoundRobinUpdate = now; + + // Schedule next round-robin update + nextUpdate = now + this.roundRobinInterval; + } else { + // Schedule the next potential round-robin update + nextUpdate = lastRoundRobinUpdate + this.roundRobinInterval; + } + + // Update demands if needed + if (this.outgoingDemandUpdateNeeded || updateNeeded) { + this.updateOutgoingDemand(); + } + + // Update supplies if needed + if (!this.outgoingSupplies.isEmpty() || updateNeeded) { + this.updateOutgoingSupplies(); + } + + if (this.consumerEdges.isEmpty() || !this.hasSupplierEdges()) { + nextUpdate = Long.MAX_VALUE; + } + + return nextUpdate; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java deleted file mode 100644 index 3a8bebbc..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2025 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.graph.distributionPolicies; - -import java.util.ArrayList; - -public interface DistributionPolicy { - double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java deleted file mode 100644 index 53cded87..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2025 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.graph.distributionPolicies; - -public class DistributionPolicyFactory { - - public enum DistributionPolicyType { - MaxMinFairness, - FixedShare; - } - - public static DistributionPolicy getDistributionStrategy(DistributionPolicyType distributionPolicyType) { - - return switch (distributionPolicyType) { - case MaxMinFairness -> new MaxMinFairnessPolicy(); - case FixedShare -> new FixedShare(1); - // actively misspelling - default -> throw new IllegalArgumentException( - "Unknown distribution strategy type: " + distributionPolicyType); - }; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java new file mode 100644 index 00000000..f58164cf --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import java.util.Arrays; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; + +/** + * A {@link FlowDistributor} that implements the Equal Share distribution policy. + * <p> + * This distributor allocates resources equally among all suppliers and consumers, ensuring that each supplier and + * consumer receives an equal share of the total capacity. + * <a href="https://docs.nvidia.com/vgpu/knowledge-base/latest/vgpu-features.html#vgpu-schedulers">original description</a> + */ +public class EqualShareFlowDistributor extends FlowDistributor { + + public EqualShareFlowDistributor(FlowEngine engine) { + super(engine); + } + + /** + * Updates the outgoing demand for each supplier edge based on the total capacity. + * The demand is equally distributed among all suppliers. + * This method is called when the outgoing demand needs to be updated. + */ + @Override + protected void updateOutgoingDemand() { + double equalShare = this.capacity / this.supplierEdges.size(); + + for (var supplierEdge : this.supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, equalShare); + } + + this.outgoingDemandUpdateNeeded = false; + } + + /** + * Updates the outgoing supply for each consumer edge based on the total supply. + * The supply is equally distributed among all consumers. + * This method is called when the outgoing supply needs to be updated. + */ + @Override + protected void updateOutgoingSupplies() { + double[] equalShare = distributeSupply(incomingDemands, outgoingSupplies, this.capacity); + + for (var consumerEdge : this.consumerEdges) { + this.pushOutgoingSupply(consumerEdge, equalShare[consumerEdge.getConsumerIndex()]); + } + } + + @Override + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + int numConsumers = demands.size(); + double[] allocation = new double[numConsumers]; + double equalShare = totalSupply / numConsumers; + + // Equal share regardless of individual demands + Arrays.fill(allocation, equalShare); + + return allocation; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java new file mode 100644 index 00000000..c0a8cd13 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowEdge; + +/** + * A {@link FlowDistributor} that implements the First Fit policy for distributing flow. + * + * This distributor allocates resources to consumers based on the first available supply that meets their demand. + * It does not attempt to balance loads or optimize resource usage beyond the first fit principle. + * It tries to place demands on already existing supplies without creating new ones. + * It assumes that resources can be partitioned, if one supplier cannot satisfy the demand, it will try to combine multiple suppliers. + */ +public class FirstFitPolicyFlowDistributor extends FlowDistributor { + + public FirstFitPolicyFlowDistributor(FlowEngine engine) { + super(engine); + } + + /** + * Updates the outgoing demand for suppliers in sequential order by their index. + * With each supplier being allocated up to its full capacity before moving to the next supplier. + */ + @Override + protected void updateOutgoingDemand() { + double remainingDemand = this.totalIncomingDemand; + + // Sort supplier edges by their index to ensure consistent first-fit ordering + var sortedSuppliers = this.supplierEdges.entrySet().stream() + .sorted((e1, e2) -> Integer.compare(e1.getKey(), e2.getKey())) + .toList(); + + // Apply First Fit strategy: fill suppliers in order until demand is satisfied + for (var supplierEntry : sortedSuppliers) { + var supplierEdge = supplierEntry.getValue(); + double supplierCapacity = supplierEdge.getCapacity(); + + if (remainingDemand <= 0) { + // No more demand to allocate + this.pushOutgoingDemand(supplierEdge, 0.0); + } else if (remainingDemand <= supplierCapacity) { + // This supplier can handle all remaining demand + this.pushOutgoingDemand(supplierEdge, remainingDemand); + remainingDemand = 0; + } else { + // This supplier gets filled to capacity, demand continues to next supplier + this.pushOutgoingDemand(supplierEdge, supplierCapacity); + remainingDemand -= supplierCapacity; + } + } + + this.outgoingDemandUpdateNeeded = false; + } + + /** + * Consumers receive their full demanded amount if it can be satisfied by the available supply, + * or zero if it cannot. + */ + @Override + protected void updateOutgoingSupplies() { + ArrayList<Double> currentPossibleSupplies = new ArrayList<>(); + for (var currentIncomingSupply : currentIncomingSupplies.entrySet()) { + currentPossibleSupplies.add(currentIncomingSupply.getValue()); + } + + double[] shares = distributeSupply(incomingDemands, currentPossibleSupplies, totalIncomingSupply); + + for (FlowEdge consumerEdge : this.consumerEdges) { + this.pushOutgoingSupply(consumerEdge, shares[consumerEdge.getConsumerIndex()]); + } + } + + /** + * Distributes supply among consumers using the First Fit allocation principle. + * Each consumer demand is allocated by trying suppliers in order, potentially + * combining multiple suppliers to satisfy a single demand. + * + * @param demands List of demand values from consumers + * @param currentSupply List of available supply values from suppliers + * @param totalSupply Total amount of supply available (unused in this implementation) + * @return Array of allocation amounts for each consumer + * + * @see #updateOutgoingSupplies() + */ + @Override + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + int numConsumers = demands.size(); + double[] allocation = new double[numConsumers]; + + // Create a copy of current supply to track remaining capacity as we allocate + ArrayList<Double> remainingSupply = new ArrayList<>(currentSupply); + + // For each demand, try to satisfy it using suppliers in order + for (int i = 0; i < numConsumers; i++) { + double remainingDemand = demands.get(i); + double totalAllocated = 0.0; + + if (remainingDemand > 0) { + // Try each supplier in order until demand is satisfied + for (int j = 0; j < remainingSupply.size() && remainingDemand > 0; j++) { + double availableSupply = remainingSupply.get(j); + + if (availableSupply > 0) { + // Allocate as much as possible from this supplier + double allocatedFromThisSupplier = Math.min(availableSupply, remainingDemand); + + totalAllocated += allocatedFromThisSupplier; + remainingDemand -= allocatedFromThisSupplier; + + // Reduce the remaining supply capacity + remainingSupply.set(j, availableSupply - allocatedFromThisSupplier); + } + } + } + + allocation[i] = totalAllocated; + } + + return allocation; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java deleted file mode 100644 index baa04975..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2025 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.graph.distributionPolicies; - -import java.util.ArrayList; - -/** - * A distribution policy that distributes supply equally among all nodes. - * The share can be set to a fixed value, defaulting to 1. - * This policy not implemented yet and is used as a placeholder. - */ -public class FixedShare implements DistributionPolicy { - - private int share; - - public FixedShare() { - this.share = 1; - } - - public FixedShare(int share) { - this.share = share; - } - - @Override - public double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply) { - return new double[0]; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java new file mode 100644 index 00000000..4c0a84d1 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java @@ -0,0 +1,214 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import java.util.HashSet; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowEdge; + +/** + * A {@link FlowDistributor} that implements Fixed Share GPU scheduling. + * + * This distributor allocates a dedicated, consistent portion of GPU resources to each VM (consumer), + * ensuring predictable availability and stable performance. Each active consumer receives a fixed + * share of the total GPU capacity, regardless of their individual demand or the demand of other consumers. + * This policy is heavily inspired by the fixed share GPU scheduling policy used in NVIDIA's MIG (Multi-Instance GPU) technology. + * <a href="https://docs.nvidia.com/vgpu/knowledge-base/latest/vgpu-features.html#vgpu-schedulers">original description</a> + * Key characteristics: + * - Each consumer gets a fixed percentage of total GPU capacity when active + * - Unused shares (from inactive consumers) remain unallocated, not redistributed + * - Performance remains consistent and predictable for each consumer + * - Share allocation is based on maximum supported consumers, not currently active ones + */ +public class FixedShareFlowDistributor extends FlowDistributor { + + private double fixedShare; + private final double shareRatio; + private int[] notSuppliedConsumers; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Creates a Fixed Share Flow Distributor. + * + * @param engine The flow engine + * @param shareRatio The fixed share ratio for each consumer (between 0.0 and 1.0) + */ + public FixedShareFlowDistributor(FlowEngine engine, double shareRatio) { + super(engine); + + if (shareRatio <= 0 || shareRatio > 1) { + throw new IllegalArgumentException("Share ratio must be between 0.0 and 1.0"); + } + this.shareRatio = shareRatio; + + // Each consumer gets an equal fixed share of the total capacity + this.fixedShare = this.shareRatio * this.capacity / this.supplierEdges.size(); + + // Initialize tracking for round-robin prioritization + this.notSuppliedConsumers = new int[0]; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Distribution Logic + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Updates the outgoing demand to suppliers. + * In Fixed Share mode, we request the total amount needed to satisfy all active consumers' fixed shares. + */ + @Override + protected void updateOutgoingDemand() { + // Calculate total demand based on active consumers + this.fixedShare = this.shareRatio * this.capacity / this.supplierEdges.size(); + + // Distribute demand equally across all suppliers + for (FlowEdge supplierEdge : supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, this.fixedShare); + } + + this.outgoingDemandUpdateNeeded = false; + this.invalidate(); + } + + /** + * Updates the outgoing supplies to consumers. + * Each active consumer receives their fixed share, regardless of their actual demand. + */ + @Override + protected void updateOutgoingSupplies() { + // Calculate the fixed allocation per consumer + + // Distribute to each consumer + int consumerIndex = 0; + if (this.consumerEdges.size() == this.supplierEdges.size()) { + for (FlowEdge consumerEdge : this.consumerEdges) { + this.pushOutgoingSupply(consumerEdge, this.fixedShare); + } + } else { + double[] supplies = distributeSupply(this.incomingDemands, this.outgoingSupplies, this.totalIncomingSupply); + for (FlowEdge consumerEdge : this.consumerEdges) { + if (supplies[consumerIndex] <= 0.0) { + continue; + } + this.pushOutgoingSupply(consumerEdge, this.fixedShare); + } + } + } + + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + double[] supplies = new double[this.consumerEdges.size()]; + + if (this.consumerEdges.size() < this.supplierEdges.size()) { + for (FlowEdge consumerEdge : this.consumerEdges) { + supplies[consumerEdge.getConsumerIndex()] = this.fixedShare; + } + } else { + // Round-robin approach: prioritize consumers that didn't get resources last time + ArrayList<Integer> currentNotSuppliedList = new ArrayList<>(); + + // Calculate how many consumers we can supply with available resources + int maxConsumersToSupply = (int) Math.floor(totalSupply / this.fixedShare); + int consumersSupplied = 0; + + // First pass: try to supply consumers that were not supplied in the previous round + for (int index : this.notSuppliedConsumers) { + if (index < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply) { + supplies[index] = this.fixedShare; + consumersSupplied++; + } + } + + // Second pass: supply remaining consumers if we still have capacity + for (int i = 0; i < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply; i++) { + if (supplies[i] == 0.0) { // This consumer hasn't been supplied yet + supplies[i] = this.fixedShare; + consumersSupplied++; + } + } + + // Build the list of consumers that didn't get resources this round + for (int i = 0; i < this.consumerEdges.size(); i++) { + if (supplies[i] == 0.0) { + currentNotSuppliedList.add(i); + } + } + + // Update the tracking array for next round + this.notSuppliedConsumers = + currentNotSuppliedList.stream().mapToInt(Integer::intValue).toArray(); + } + + return supplies; + } + + @Override + // index of not supplied consumers also need to be updated + public void removeConsumerEdge(FlowEdge consumerEdge) { + int idx = consumerEdge.getConsumerIndex(); + + if (idx == -1) { + return; + } + + this.totalIncomingDemand -= consumerEdge.getDemand(); + + // Remove idx from consumers that updated their demands + this.updatedDemands.remove(idx); + + this.consumerEdges.remove(idx); + this.incomingDemands.remove(idx); + this.outgoingSupplies.remove(idx); + + // update the consumer index for all consumerEdges higher than this. + for (int i = idx; i < this.consumerEdges.size(); i++) { + FlowEdge other = this.consumerEdges.get(i); + + other.setConsumerIndex(other.getConsumerIndex() - 1); + } + + HashSet<Integer> newUpdatedDemands = new HashSet<>(); + for (int idx_other : this.updatedDemands) { + if (idx_other > idx) { + newUpdatedDemands.add(idx_other - 1); + } else { + newUpdatedDemands.add(idx_other); + } + } + this.updatedDemands = newUpdatedDemands; + + // Decrease the index of not supplied consumers + for (int i = 0; i < this.notSuppliedConsumers.length; i++) { + if (this.notSuppliedConsumers[i] > idx) { + this.notSuppliedConsumers[i]--; + } + } + + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java new file mode 100644 index 00000000..eb5d4ff7 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; + +public class FlowDistributorFactory { + + public enum DistributionPolicy { + BEST_EFFORT, + EQUAL_SHARE, + FIRST_FIT, + FIXED_SHARE, + MAX_MIN_FAIRNESS; + + private final Map<String, Object> properties = new HashMap<>(); + + public void setProperty(String key, Object value) { + properties.put(key, value); + } + + public Object getProperty(String key) { + return properties.get(key); + } + + public <T> T getProperty(String key, Class<T> type) { + return type.cast(properties.get(key)); + } + + public Set<String> getPropertyNames() { + return properties.keySet(); + } + } + + public static FlowDistributor getFlowDistributor(FlowEngine flowEngine, DistributionPolicy distributionPolicyType) { + + return switch (distributionPolicyType) { + case BEST_EFFORT -> new BestEffortFlowDistributor( + flowEngine, distributionPolicyType.getProperty("updateIntervalLength", Long.class)); + case EQUAL_SHARE -> new EqualShareFlowDistributor(flowEngine); + case FIRST_FIT -> new FirstFitPolicyFlowDistributor(flowEngine); + case FIXED_SHARE -> { + if (!distributionPolicyType.getPropertyNames().contains("shareRatio")) { + throw new IllegalArgumentException( + "FixedShare distribution policy requires a 'shareRatio' property to be set."); + } + yield new FixedShareFlowDistributor( + flowEngine, distributionPolicyType.getProperty("shareRatio", Double.class)); + } + default -> new MaxMinFairnessFlowDistributor(flowEngine); + }; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java new file mode 100644 index 00000000..9b48f204 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowEdge; + +/** + * A flow distributor that implements the max-min fairness distribution policy. + * <p> + * This policy distributes the available supply to consumers in a way that maximizes the minimum supply received by any + * consumer, ensuring fairness across all consumers. + */ +public class MaxMinFairnessFlowDistributor extends FlowDistributor { + + private boolean overloaded = false; + + public MaxMinFairnessFlowDistributor(FlowEngine engine) { + super(engine); + } + + protected void updateOutgoingDemand() { + // equally distribute the demand to all suppliers + for (FlowEdge supplierEdge : this.supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size()); + // alternatively a relative share could be used, based on capacity minus current incoming supply + // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() - + // currentIncomingSupplies.get(idx) / supplierEdges.size())); + } + + this.outgoingDemandUpdateNeeded = false; + + this.invalidate(); + } + + // TODO: This should probably be moved to the distribution strategy + protected void updateOutgoingSupplies() { + + // If the demand is higher than the current supply, the system is overloaded. + // The available supply is distributed based on the current distribution function. + if (this.totalIncomingDemand > this.totalIncomingSupply) { + this.overloaded = true; + + double[] supplies = this.distributeSupply( + this.incomingDemands, + new ArrayList<>(this.currentIncomingSupplies.values()), + this.totalIncomingSupply); + + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); + } + + } else { + + // If the distributor was overloaded before, but is not anymore: + // provide all consumers with their demand + if (this.overloaded) { + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { + this.pushOutgoingSupply( + this.consumerEdges.get(idx), + this.incomingDemands.get(idx), + this.getConsumerResourceType()); + } + } + this.overloaded = false; + } + + // Update the supplies of the consumers that changed their demand in the current cycle + else { + for (int idx : this.updatedDemands) { + this.pushOutgoingSupply( + this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType()); + } + } + } + + this.updatedDemands.clear(); + } + + private record Demand(int idx, double value) {} + + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + int inputSize = demands.size(); + + final double[] supplies = new double[inputSize]; + final Demand[] tempDemands = new Demand[inputSize]; + + for (int i = 0; i < inputSize; i++) { + tempDemands[i] = new Demand(i, demands.get(i)); + } + + Arrays.sort(tempDemands, (o1, o2) -> { + Double i1 = o1.value; + Double i2 = o2.value; + return i1.compareTo(i2); + }); + + double availableCapacity = totalSupply; + + for (int i = 0; i < inputSize; i++) { + double d = tempDemands[i].value; + + if (d == 0.0) { + continue; + } + + double availableShare = availableCapacity / (inputSize - i); + double r = Math.min(d, availableShare); + + int idx = tempDemands[i].idx; + supplies[idx] = r; // Update the rates + availableCapacity -= r; + } + + return supplies; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java deleted file mode 100644 index 484e7fe4..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2025 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.graph.distributionPolicies; - -import java.util.ArrayList; -import java.util.Arrays; - -/** - * A distribution policy that implements the Max-Min Fairness algorithm. - * This policy distributes supply to demands in a way that maximizes the minimum - * allocation across all demands, ensuring fairness. - */ -public class MaxMinFairnessPolicy implements DistributionPolicy { - private record Demand(int idx, double value) {} - - @Override - public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { - int inputSize = demands.size(); - - final double[] supplies = new double[inputSize]; - final Demand[] tempDemands = new Demand[inputSize]; - - for (int i = 0; i < inputSize; i++) { - tempDemands[i] = new Demand(i, demands.get(i)); - } - - Arrays.sort(tempDemands, (o1, o2) -> { - Double i1 = o1.value; - Double i2 = o2.value; - return i1.compareTo(i2); - }); - - double availableCapacity = totalSupply; - - for (int i = 0; i < inputSize; i++) { - double d = tempDemands[i].value; - - if (d == 0.0) { - continue; - } - - double availableShare = availableCapacity / (inputSize - i); - double r = Math.min(d, availableShare); - - int idx = tempDemands[i].idx; - supplies[idx] = r; // Update the rates - availableCapacity -= r; - } - - return supplies; - } -} |
