From 4181a4bd51b54a5905be1f46f74c1349776e35c2 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 14 Oct 2025 16:38:27 +0200 Subject: Improved the performance by removing many invalidates from FlowNodes (#377) * Updated the UpDatedConsumer to boolean array * Updated SimTraceWorkload to not invalidate when the next fragment is started. * Removed as much invalidates as possible --- .../simulator/telemetry/ComputeMetricReader.kt | 2 +- .../telemetry/parquet/ParquetComputeMonitor.kt | 16 +- .../experiments/base/DistributionPoliciesTest.kt | 2 +- .../opendc/experiments/base/FlowDistributorTest.kt | 264 +++++++++++---------- .../opendc/experiments/base/FragmentScalingTest.kt | 2 +- .../base/VirtualizationOverheadTests.kt | 2 +- .../org/opendc/simulator/compute/cpu/SimCpu.java | 21 +- .../org/opendc/simulator/compute/gpu/SimGpu.java | 29 ++- .../compute/workload/trace/SimTraceWorkload.java | 99 ++++---- .../simulator/engine/engine/FlowCycleQueue.java | 10 +- .../opendc/simulator/engine/engine/FlowEngine.java | 14 +- .../simulator/engine/graph/FlowDistributor.java | 35 ++- .../opendc/simulator/engine/graph/FlowNode.java | 10 +- .../BestEffortFlowDistributor.java | 17 +- .../EqualShareFlowDistributor.java | 5 +- .../MaxMinFairnessFlowDistributor.java | 20 +- 16 files changed, 329 insertions(+), 219 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt index 83899678..59c0ed0e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt @@ -196,7 +196,7 @@ public class ComputeMetricReader( if (toMonitor[OutputFiles.SERVICE] == true) { this.serviceTableReader.record(now) - monitor.record(this.serviceTableReader.copy()) + this.monitor.record(this.serviceTableReader.copy()) } if (printFrequency != null && loggCounter % printFrequency == 0) { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt index 4fb930e1..ab893158 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt @@ -44,7 +44,6 @@ public class ParquetComputeMonitor( private val batteryExporter: Exporter?, private val serviceExporter: Exporter?, ) : ComputeMonitor, AutoCloseable { - // FIXME: Include GPU override fun record(reader: HostTableReader) { hostExporter?.write(reader) } @@ -179,16 +178,11 @@ public class ParquetComputeMonitor( } return ParquetComputeMonitor( - hostExporter = - hostExporter, - taskExporter = - taskExporter, - powerSourceExporter = - powerSourceExporter, - batteryExporter = - batteryExporter, - serviceExporter = - serviceExporter, + hostExporter = hostExporter, + taskExporter = taskExporter, + powerSourceExporter = powerSourceExporter, + batteryExporter = batteryExporter, + serviceExporter = serviceExporter, ) } } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/DistributionPoliciesTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/DistributionPoliciesTest.kt index 730f9fd0..0bc0bc88 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/DistributionPoliciesTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/DistributionPoliciesTest.kt @@ -108,7 +108,7 @@ class DistributionPoliciesTest { assertAll( // single gpu { assertEquals(2000.0, singleMonitor.taskGpuDemands[0]?.get(1), "Single GPU demand in task \"0\" should be 2000.0") }, - { assertEquals(4000.0, singleMonitor.taskGpuSupplied[0]?.get(1), "Single GPU demand in task \"0\" should be 2000.0") }, + { assertEquals(4000.0, singleMonitor.taskGpuSupplied[0]?.get(1), "Single GPU supplied in task \"0\" should be 4000.0") }, { assertEquals( 4000.0, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt index 53ff068e..9ed08d3a 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt @@ -59,12 +59,12 @@ class FlowDistributorTest { assertAll( { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuDemands[0]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -95,12 +95,12 @@ class FlowDistributorTest { assertAll( { assertEquals(3000.0, monitor.taskCpuDemands[0]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(4000.0, monitor.taskCpuDemands[0]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -131,12 +131,12 @@ class FlowDistributorTest { assertAll( { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(4000.0, monitor.taskCpuDemands[0]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -167,12 +167,12 @@ class FlowDistributorTest { assertAll( { assertEquals(4000.0, monitor.taskCpuDemands[0]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -203,12 +203,12 @@ class FlowDistributorTest { assertAll( { assertEquals(4000.0, monitor.taskCpuDemands[0]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuDemands[0]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -248,16 +248,16 @@ class FlowDistributorTest { assertAll( { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuDemands[0]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuDemands[1]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskCpuDemands[1]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[1]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[1]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -297,16 +297,16 @@ class FlowDistributorTest { assertAll( { assertEquals(6000.0, monitor.taskCpuDemands[0]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(5000.0, monitor.taskCpuDemands[0]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(5000.0, monitor.taskCpuDemands[1]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(6000.0, monitor.taskCpuDemands[1]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(11000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(11000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -348,22 +348,22 @@ class FlowDistributorTest { { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuDemands[0]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuDemands[0]?.get(14)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(5)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(14)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(5)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(14)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuDemands[1]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuDemands[1]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(6)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu supplied to task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(6)) { "The cpu supplied to task 1 is incorrect" } }, { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(5)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(9)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(14)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(9)) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(9)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -407,20 +407,20 @@ class FlowDistributorTest { { assertEquals(3000.0, monitor.taskCpuDemands[0]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuDemands[0]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuDemands[0]?.get(14)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2500.0, monitor.taskCpuSupplied[0]?.get(5)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2500.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied[0]?.get(14)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuSupplied[0]?.get(5)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied[0]?.get(14)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(1500.0, monitor.taskCpuDemands[1]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, { assertEquals(1500.0, monitor.taskCpuDemands[1]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(1500.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(1500.0, monitor.taskCpuSupplied[1]?.get(6)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu supplied to task 1 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuSupplied[1]?.get(6)) { "The cpu supplied to task 1 is incorrect" } }, { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(4500.0, monitor.hostCpuDemands["H01"]?.get(5)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(14)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu used by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu used by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -464,26 +464,26 @@ class FlowDistributorTest { { assertEquals(1500.0, monitor.taskCpuDemands[0]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2500.0, monitor.taskCpuDemands[0]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(14)) { "The cpu demanded is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1500.0, monitor.taskCpuSupplied[0]?.get(5)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(14)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuSupplied[0]?.get(5)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(14)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuDemands[1]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuDemands[1]?.get(5)) { "The cpu demanded by task 1 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuDemands[1]?.get(9)) { "The cpu demanded by task 1 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuDemands[1]?.get(14)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(2500.0, monitor.taskCpuSupplied[1]?.get(5)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(9)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied[1]?.get(14)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu supplied to task 1 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuSupplied[1]?.get(5)) { "The cpu supplied to task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[1]?.get(9)) { "The cpu supplied to task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied[1]?.get(14)) { "The cpu supplied to task 1 is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(4500.0, monitor.hostCpuDemands["H01"]?.get(5)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(5500.0, monitor.hostCpuDemands["H01"]?.get(9)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(14)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(9)) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(9)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu supplied to the host is incorrect" } }, ) } @@ -554,9 +554,11 @@ class FlowDistributorTest { */ @Test fun testFlowDistributor13() { + val numTasks = 1000 + val workload: ArrayList = arrayListOf().apply { - repeat(1000) { + repeat(numTasks) { this.add( createTestTask( id = 0, @@ -572,7 +574,7 @@ class FlowDistributorTest { val monitor = runTest(topology, workload) assertAll( - { assertEquals(1000 * 10 * 60 * 1000, monitor.maxTimestamp) { "The expected runtime is exceeded" } }, + { assertEquals(numTasks * 10 * 60 * 1000L, monitor.maxTimestamp) { "The expected runtime is exceeded" } }, ) } @@ -605,24 +607,24 @@ class FlowDistributorTest { // task { assertEquals(0.0, monitor.taskCpuDemands[0]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(0.0, monitor.taskCpuDemands[0]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(1)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(10)) { "The cpu supplied to task 0 is incorrect" } }, // host { assertEquals(0.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, // GPU // task { assertEquals(1000.0, monitor.taskGpuDemands[0]?.get(1)) { "The gpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskGpuDemands[0]?.get(10)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(1)) { "The gpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuSupplied[0]?.get(10)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(1)) { "The gpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied[0]?.get(10)) { "The gpu supplied to task 0 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu supplied to the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu supplied to the host is incorrect" } }, ) } @@ -654,24 +656,24 @@ class FlowDistributorTest { // task { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(0.0, monitor.taskCpuDemands[0]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, // GPU // task { assertEquals(1000.0, monitor.taskGpuDemands[0]?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskGpuDemands[0]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(0)) { "The gpu supplied to task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu supplied to task 0 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu supplied to the host is incorrect" } }, { assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu supplied to the host is incorrect" } }, ) } @@ -703,24 +705,24 @@ class FlowDistributorTest { // task { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(0.0, monitor.taskCpuDemands[0]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, // GPU // task { assertEquals(2000.0, monitor.taskGpuDemands[0]?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskGpuDemands[0]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuSupplied[0]?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied[0]?.get(0)) { "The gpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu supplied to task 0 is incorrect" } }, // host { assertEquals(2000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu supplied to the host is incorrect" } }, + { assertEquals(0.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu supplied to the host is incorrect" } }, ) } @@ -750,24 +752,24 @@ class FlowDistributorTest { // task { assertEquals(2000.0, monitor.taskCpuDemands[0]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(0.0, monitor.taskCpuDemands[0]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, // host { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, // GPU // task { assertEquals(1000.0, monitor.taskGpuDemands[0]?.get(1)) { "The gpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskGpuDemands[0]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(1)) { "The gpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(1)) { "The gpu supplied to task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu supplied to task 0 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu supplied to the host is incorrect" } }, + { assertEquals(0.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu supplied to the host is incorrect" } }, ) } @@ -805,40 +807,40 @@ class FlowDistributorTest { assertAll( // CPU // task 0 - { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuDemands[0]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(0)) { "The cpu demanded by task 0 at t=0 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuDemands[0]?.get(9)) { "The cpu demanded by task 0 at t=0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu supplied to task 0 at t=0 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu supplied to task 0 at t=0 is incorrect" } }, // task 1 - { assertEquals(0.0, monitor.taskCpuDemands[1]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuDemands[1]?.get(10)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuDemands[1]?.get(19)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[1]?.get(10)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[1]?.get(19)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuDemands[1]?.get(1)) { "The cpu demanded by task 1 at t=1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands[1]?.get(10)) { "The cpu demanded by task 1 at t=10 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuDemands[1]?.get(19)) { "The cpu demanded by task 1 at t=19 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[1]?.get(1)) { "The cpu supplied to task 1 at t=1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[1]?.get(10)) { "The cpu supplied to task 1 at t=10 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[1]?.get(19)) { "The cpu supplied to task 1 at t=9 is incorrect" } }, // host - { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host at t=1 is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host at t=10 is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host at t=1 is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host at t=10 is incorrect" } }, // GPU // task 0 { assertEquals(1000.0, monitor.taskGpuDemands[0]?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskGpuDemands[0]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(0)) { "The gpu supplied to task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu supplied to task 0 is incorrect" } }, // task 1 { assertEquals(0.0, monitor.taskGpuDemands[1]?.get(0)) { "The gpu demanded by task 1 is incorrect" } }, { assertEquals(1000.0, monitor.taskGpuDemands[1]?.get(10)) { "The gpu demanded by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuDemands[1]?.get(19)) { "The gpu used by task 1 is incorrect" } }, - { assertEquals(0.0, monitor.taskGpuSupplied[1]?.get(0)) { "The gpu used by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[1]?.get(10)) { "The gpu used by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[1]?.get(19)) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands[1]?.get(19)) { "The gpu supplied to task 1 is incorrect" } }, + { assertEquals(0.0, monitor.taskGpuSupplied[1]?.get(0)) { "The gpu supplied to task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[1]?.get(10)) { "The gpu supplied to task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[1]?.get(19)) { "The gpu supplied to task 1 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu supplied to the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu supplied to the host is incorrect" } }, ) } @@ -878,34 +880,34 @@ class FlowDistributorTest { // task 0 { assertEquals(1000.0, monitor.taskCpuDemands[0]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(0.0, monitor.taskCpuDemands[0]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied[0]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[0]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, // task 1 { assertEquals(0.0, monitor.taskCpuDemands[1]?.get(0)) { "The cpu demanded by task 1 is incorrect" } }, { assertEquals(0.0, monitor.taskCpuDemands[1]?.get(9)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[1]?.get(0)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(0.0, monitor.taskCpuSupplied[1]?.get(9)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[1]?.get(0)) { "The cpu supplied to task 1 is incorrect" } }, + { assertEquals(0.0, monitor.taskCpuSupplied[1]?.get(9)) { "The cpu supplied to task 1 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu supplied to the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu supplied to the host is incorrect" } }, // GPU // task 0 { assertEquals(0.0, monitor.taskGpuDemands[0]?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, { assertEquals(0.0, monitor.taskGpuDemands[0]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskGpuSupplied[0]?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskGpuSupplied[0]?.get(0)) { "The gpu supplied to task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskGpuSupplied[0]?.get(9)) { "The gpu supplied to task 0 is incorrect" } }, // task 1 { assertEquals(1000.0, monitor.taskGpuDemands[1]?.get(0)) { "The gpu demanded by task 1 is incorrect" } }, { assertEquals(1000.0, monitor.taskGpuDemands[1]?.get(9)) { "The gpu demanded by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[1]?.get(0)) { "The gpu used by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied[1]?.get(9)) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[1]?.get(0)) { "The gpu supplied to task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied[1]?.get(9)) { "The gpu supplied to task 1 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu supplied to the host is incorrect" } }, + { assertEquals(0.0, monitor.hostGpuSupplied["H01"]?.get(10)?.get(0)) { "The gpu supplied to the host is incorrect" } }, ) } } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt index ce26cc3a..316b0f91 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt @@ -71,8 +71,8 @@ class FragmentScalingTest { ) val topology = createTopology("single_1_2000.json") - val monitorNoDelay = runTest(topology, workloadNoDelay) val monitorPerfect = runTest(topology, workloadPerfect) + val monitorNoDelay = runTest(topology, workloadNoDelay) assertAll( { assertEquals(1200000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/VirtualizationOverheadTests.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/VirtualizationOverheadTests.kt index 3aa6c354..17db8d27 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/VirtualizationOverheadTests.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/VirtualizationOverheadTests.kt @@ -191,7 +191,7 @@ class VirtualizationOverheadTests { } /** - * Test that the share-based overhead model does not applies the correct amount of overhead, depending on the number of VMs. + * Test that the share-based overhead model does not apply the correct amount of overhead, depending on the number of VMs. */ @Test fun shareBasedVirtualizationOverheadModelTest() { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java index 52fc6093..91b5eabf 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java @@ -50,6 +50,8 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer private final PowerModel cpuPowerModel; + private double previousPowerDemand = 0.0f; + private double currentCpuDemand = 0.0f; // cpu capacity demanded by the mux private double currentCpuUtilization = 0.0f; private double currentCpuSupplied = 0.0f; // cpu capacity supplied to the mux @@ -228,6 +230,10 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer */ @Override public void handleIncomingDemand(FlowEdge consumerEdge, double newCpuDemand) { + if (newCpuDemand == this.currentCpuDemand) { + return; + } + updateCounters(); this.currentCpuDemand = newCpuDemand; @@ -236,7 +242,16 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer // Calculate Power Demand and send to PSU this.currentPowerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); - this.invalidate(); + // TODO: find a better solution for this + // If current Power Demand is equal to previous Power Demand, it means the CPU is overloaded and we can + // distribute + // immediately. + if (this.currentPowerDemand == this.previousPowerDemand) { + this.pushOutgoingSupply(consumerEdge, this.currentCpuSupplied); + } else { + this.previousPowerDemand = this.currentPowerDemand; + this.pushOutgoingDemand(this.psuEdge, this.currentPowerDemand); + } } /** @@ -247,7 +262,9 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer updateCounters(); this.currentPowerSupplied = newPowerSupply; - this.invalidate(); + this.currentCpuSupplied = Math.min(this.currentCpuDemand, this.maxCapacity); + + this.pushOutgoingSupply(this.distributorEdge, this.currentCpuSupplied, ResourceType.CPU); } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/gpu/SimGpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/gpu/SimGpu.java index a5fccf6c..e0ed64d0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/gpu/SimGpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/gpu/SimGpu.java @@ -50,6 +50,8 @@ public final class SimGpu extends FlowNode implements FlowSupplier, FlowConsumer private final PowerModel gpuPowerModel; + private double previousPowerDemand = 0.0f; + private double currentGpuDemand = 0.0f; // gpu capacity demanded by the mux private double currentGpuUtilization = 0.0f; private double currentGpuSupplied = 0.0f; // gpu capacity supplied to the mux @@ -239,7 +241,16 @@ public final class SimGpu extends FlowNode implements FlowSupplier, FlowConsumer // Calculate Power Demand and send to PSU this.currentPowerDemand = this.gpuPowerModel.computePower(this.currentGpuUtilization); - this.invalidate(); + // TODO: find a better solution for this + // If current Power Demand is equal to previous Power Demand, it means the CPU is overloaded and we can + // distribute + // immediately. + if (this.currentPowerDemand == this.previousPowerDemand) { + this.pushOutgoingSupply(consumerEdge, this.currentGpuSupplied); + } else { + this.previousPowerDemand = this.currentPowerDemand; + this.pushOutgoingDemand(this.psuEdge, this.currentPowerDemand); + } } /** @@ -260,7 +271,16 @@ public final class SimGpu extends FlowNode implements FlowSupplier, FlowConsumer // Calculate Power Demand and send to PSU this.currentPowerDemand = this.gpuPowerModel.computePower(this.currentGpuUtilization); - this.invalidate(); + // TODO: find a better solution for this + // If current Power Demand is equal to previous Power Demand, it means the CPU is overloaded and we can + // distribute + // immediately. + if (this.currentPowerDemand == this.previousPowerDemand) { + this.pushOutgoingSupply(consumerEdge, this.currentGpuSupplied); + } else { + this.previousPowerDemand = this.currentPowerDemand; + this.pushOutgoingDemand(this.psuEdge, this.currentPowerDemand); + } } /** @@ -271,7 +291,10 @@ public final class SimGpu extends FlowNode implements FlowSupplier, FlowConsumer updateCounters(); this.currentPowerSupplied = newPowerSupply; - this.invalidate(); + this.currentGpuSupplied = virtualizationOverheadModel.getSupply( + Math.min(this.currentGpuDemand, this.maxCapacity), this.consumerCount); + + this.pushOutgoingSupply(this.distributorEdge, this.currentGpuSupplied, ResourceType.CPU); } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index 70fe7e96..ff65fbf2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -23,6 +23,7 @@ package org.opendc.simulator.compute.workload.trace; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -53,8 +54,6 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private final double[] resourcesSupplied = new double[ResourceType.values().length]; // the currently supplied resources - private final double[] newResourcesSupply = - new double[ResourceType.values().length]; // The supplied resources with next update private final double[] resourcesDemand = new double[ResourceType.values().length]; // The demands per resource type private final double[] remainingWork = new double[ResourceType.values().length]; // The duration of the fragment at the demanded speeds @@ -116,9 +115,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { new FlowEdge(this, supplier); if (supplier instanceof VirtualMachine) { // instead iterate over the resources in the fragment as required resources not provided by the VM - for (ResourceType resourceType : workload.getResourceTypes()) { - this.usedResourceTypes.add(resourceType); - } + this.usedResourceTypes.addAll(Arrays.asList(workload.getResourceTypes())); } } @@ -157,11 +154,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { return true; } - @Override - public long onUpdate(long now) { - long passedTime = getPassedTime(now); - this.startOfFragment = now; - + // Update the remaining work for all resources based on the time passed since last update + private void updateRemainingWork(long passedTime) { for (ResourceType resourceType : this.usedResourceTypes) { // The amount of work done since last update double finishedWork = this.scalingPolicy.getFinishedWork( @@ -172,34 +166,17 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { // TODO: maybe remove Math.max, as as we are already checking for <= 0 this.remainingWork[resourceType.ordinal()] = Math.max(0, this.remainingWork[resourceType.ordinal()] - finishedWork); + this.totalRemainingWork -= finishedWork; + if (this.remainingWork[resourceType.ordinal()] <= 0) { this.workloadFinished[resourceType.ordinal()] = true; } } + } - // If this.totalRemainingWork <= 0, the fragment has been completed across all resources - if ((int) this.totalRemainingWork <= 0 && this.isWorkloadFinished()) { - this.startNextFragment(); - - this.invalidate(); - return Long.MAX_VALUE; - } - - for (ResourceType resourceType : this.usedResourceTypes) { - if (this.machineResourceEdges[resourceType.ordinal()] != null) { - this.pushOutgoingDemand( - this.machineResourceEdges[resourceType.ordinal()], - this.resourcesDemand[resourceType.ordinal()], - resourceType); - } - } - - // Update the supplied resources - for (ResourceType resourceType : this.usedResourceTypes) { - this.resourcesSupplied[resourceType.ordinal()] = this.newResourcesSupply[resourceType.ordinal()]; - } - + // Determine the next update time based on the remaining work and supplied resources + private long getNextUpdateTime(long now) { long timeUntilNextUpdate = Long.MIN_VALUE; for (ResourceType resourceType : this.usedResourceTypes) { @@ -226,7 +203,41 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { } } - long nextUpdate = timeUntilNextUpdate == Long.MAX_VALUE ? Long.MAX_VALUE : now + timeUntilNextUpdate; + return timeUntilNextUpdate == Long.MAX_VALUE ? Long.MAX_VALUE : now + timeUntilNextUpdate; + } + + private void pushNewDemands() { + for (ResourceType resourceType : this.usedResourceTypes) { + if (this.machineResourceEdges[resourceType.ordinal()] != null) { + this.pushOutgoingDemand( + this.machineResourceEdges[resourceType.ordinal()], + this.resourcesDemand[resourceType.ordinal()], + resourceType); + } + } + } + + @Override + public long onUpdate(long now) { + long passedTime = getPassedTime(now); + this.startOfFragment = now; + + this.updateRemainingWork(passedTime); + + // If this.totalRemainingWork <= 0, the fragment has been completed across all resources + if ((int) this.totalRemainingWork <= 0 && this.isWorkloadFinished()) { + this.startNextFragment(); + + if (this.nodeState == NodeState.CLOSING || this.nodeState == NodeState.CLOSED) { + return Long.MAX_VALUE; + } + + return getNextUpdateTime(this.startOfFragment); + } + + this.pushNewDemands(); + + long nextUpdate = getNextUpdateTime(this.startOfFragment); // if for all resources the remaining work is 0, then invalidate the workload, to reschedule the next fragment if (nextUpdate == now + Long.MIN_VALUE) { @@ -257,7 +268,6 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { // Reset the remaining work for all resources this.totalRemainingWork = 0.0; - // TODO: only acceleration is considered, not memory for (ResourceType resourceType : usedResourceTypes) { double demand = nextFragment.getResourceUsage(resourceType); @@ -384,10 +394,10 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { if (this.resourcesSupplied[suppliedResourceType.ordinal()] == newSupply) { return; } - this.resourcesSupplied[suppliedResourceType.ordinal()] = - this.newResourcesSupply[suppliedResourceType.ordinal()]; - this.newResourcesSupply[suppliedResourceType.ordinal()] = newSupply; + this.resourcesSupplied[suppliedResourceType.ordinal()] = newSupply; + + // TODO: Change this to just update deadline this.invalidate(); } @@ -406,10 +416,19 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { if (this.resourcesSupplied[resourceType.ordinal()] == newSupply) { return; } - this.resourcesSupplied[resourceType.ordinal()] = this.newResourcesSupply[resourceType.ordinal()]; - this.newResourcesSupply[resourceType.ordinal()] = newSupply; - this.invalidate(); + this.resourcesSupplied[resourceType.ordinal()] = newSupply; + + long now = this.clock.millis(); + this.startOfFragment = now; + long passedTime = getPassedTime(now); + + this.updateRemainingWork(passedTime); + long next_deadline = this.getNextUpdateTime(now); + + // Remove stage from the timer queue + this.setDeadline(next_deadline); + this.engine.scheduleDelayedInContext(this); } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java index 72dd217c..b0a37d0e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java @@ -24,6 +24,7 @@ package org.opendc.simulator.engine.engine; import java.util.ArrayDeque; import java.util.Arrays; +import java.util.HashMap; import org.opendc.simulator.engine.graph.FlowNode; /** @@ -32,7 +33,8 @@ import org.opendc.simulator.engine.graph.FlowNode; *

* By using a specialized class, we reduce the overhead caused by type-erasure. */ -final class FlowCycleQueue { +public final class FlowCycleQueue { + /** * The array of elements in the queue. */ @@ -45,10 +47,16 @@ final class FlowCycleQueue { nodeQueue = new FlowNode[initialCapacity]; } + public final HashMap nodeTypeCounter = new HashMap<>(); /** * Add the specified context to the queue. */ void add(FlowNode ctx) { + + String nodeType = ctx.getClass().getSimpleName(); + nodeTypeCounter.putIfAbsent(nodeType, 0); + nodeTypeCounter.put(nodeType, nodeTypeCounter.get(nodeType) + 1); + if (ctx.getInCycleQueue()) { return; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index e5dbb7a8..9bbfa777 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -24,6 +24,7 @@ package org.opendc.simulator.engine.engine; import java.time.Clock; import java.time.InstantSource; +import java.util.LinkedList; import kotlin.coroutines.CoroutineContext; import org.opendc.common.Dispatcher; import org.opendc.simulator.engine.graph.FlowNode; @@ -38,7 +39,9 @@ public final class FlowEngine implements Runnable { /** * The queue of {@link FlowNode} updates that need to be updated in the current cycle. */ - private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256); + // private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256); + + private final LinkedList cycleQueue = new LinkedList<>(); /** * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. @@ -139,16 +142,14 @@ public final class FlowEngine implements Runnable { * Run all the enqueued actions for the specified timestamp (now). */ private void doRunEngine(long now) { - final FlowCycleQueue cycleQueue = this.cycleQueue; - final FlowEventQueue eventQueue = this.eventQueue; - try { // Mark the engine as active to prevent concurrent calls to this method active = true; + // int EventCount = 0; // Execute all scheduled updates at current timestamp while (true) { - final FlowNode ctx = eventQueue.poll(now); + final FlowNode ctx = this.eventQueue.poll(now); if (ctx == null) { break; } @@ -158,13 +159,14 @@ public final class FlowEngine implements Runnable { // Execute all immediate updates while (true) { - final FlowNode ctx = cycleQueue.poll(); + final FlowNode ctx = this.cycleQueue.poll(); if (ctx == null) { break; } ctx.update(now); } + } finally { active = false; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index cb2a3ba6..39712f20 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -55,7 +55,10 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, protected final FlowEdge[] consumerEdges; protected final double[] incomingDemands; // What is demanded by the consumers protected final double[] outgoingSupplies; // What is supplied to the consumers - protected ArrayList updatedDemands = new ArrayList<>(); + + protected final boolean[] updatedDemands; + protected int numUpdatedDemands = 0; + // protected ArrayList updatedDemands = new ArrayList<>(); protected double previousTotalDemand = 0.0; protected double totalIncomingDemand; // The total demand of all the consumers @@ -73,6 +76,10 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, protected double capacity; // What is the max capacity. Can probably be removed + protected static HashMap updateMap = new HashMap(); + + protected boolean overloaded = false; + public FlowDistributor(FlowEngine engine, int maxConsumers) { super(engine); @@ -91,6 +98,8 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, this.incomingDemands = new double[this.maxConsumers]; this.outgoingSupplies = new double[this.maxConsumers]; + + this.updatedDemands = new boolean[this.maxConsumers]; } public double getTotalIncomingDemand() { @@ -109,13 +118,13 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, // Check if current supply is different from total demand if (this.outgoingDemandUpdateNeeded) { + this.updateOutgoingDemand(); return Long.MAX_VALUE; } - // TODO: look into whether this is always needed - if (this.numConsumers > 0) { + if (this.numUpdatedDemands > 0 || this.overloaded) { this.updateOutgoingSupplies(); } @@ -141,7 +150,6 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, this.numConsumers++; this.consumerEdges[consumerIndex] = consumerEdge; - this.outgoingDemandUpdateNeeded = true; } @Override @@ -169,9 +177,11 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, } // Remove idx from consumers that updated their demands - if (this.updatedDemands.contains(consumerIndex)) { - this.updatedDemands.remove(Integer.valueOf(consumerIndex)); - } + // if (this.updatedDemands.contains(consumerIndex)) { + // this.updatedDemands.remove(Integer.valueOf(consumerIndex)); + // } + + this.updatedDemands[consumerIndex] = false; this.consumerEdges[consumerIndex] = null; this.incomingDemands[consumerIndex] = 0.0; @@ -196,7 +206,9 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, this.currentIncomingSupplies.put(idx, 0.0); if (this.supplierEdges.isEmpty()) { - this.updatedDemands.clear(); + // this.updatedDemands.clear(); + Arrays.fill(this.updatedDemands, false); + this.numUpdatedDemands = 0; } } @@ -220,9 +232,12 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, } // TODO: can be optimized by using a boolean array - this.updatedDemands.add(consumerIndex); + // this.updatedDemands.add(consumerIndex); + this.updatedDemands[consumerIndex] = true; + this.numUpdatedDemands++; this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @@ -246,6 +261,7 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, this.totalIncomingSupply += (newSupply - prevSupply); this.outgoingSupplyUpdateNeeded = true; + this.invalidate(); } @@ -281,7 +297,6 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, @Override public ResourceType getSupplierResourceType() { - // return this.supplierEdge.getSupplierResourceType(); return this.supplierResourceType; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index cbfe39a3..ff6ad6f0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -137,10 +137,14 @@ public abstract class FlowNode { // If there is already an update running, // notify the update, that a next update should be run after - if (this.nodeState != NodeState.CLOSING && this.nodeState != NodeState.CLOSED) { - this.nodeState = NodeState.INVALIDATED; - engine.scheduleImmediate(now, this); + if (this.nodeState == NodeState.CLOSING + || this.nodeState == NodeState.CLOSED + || this.nodeState == NodeState.INVALIDATED) { + return; } + + this.nodeState = NodeState.INVALIDATED; + engine.scheduleImmediate(now, this); } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java index 04317d6a..5446f261 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java @@ -23,6 +23,7 @@ package org.opendc.simulator.engine.graph.distributionPolicies; import java.util.ArrayList; +import java.util.Arrays; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowDistributor; import org.opendc.simulator.engine.graph.FlowEdge; @@ -143,17 +144,29 @@ public class BestEffortFlowDistributor extends FlowDistributor { // Update the supplies of the consumers that changed their demand in the current cycle else { - for (int consumerIndex : this.updatedDemands) { + for (int consumerIndex = 0; consumerIndex < this.numConsumers; consumerIndex++) { + if (!this.updatedDemands[consumerIndex]) { + continue; + } this.pushOutgoingSupply( this.consumerEdges[consumerIndex], this.incomingDemands[consumerIndex], this.getConsumerResourceType()); } + + // for (int consumerIndex : this.updatedDemands) { + // this.pushOutgoingSupply( + // this.consumerEdges[consumerIndex], + // this.incomingDemands[consumerIndex], + // this.getConsumerResourceType()); + // } } } this.outgoingSupplyUpdateNeeded = false; - this.updatedDemands.clear(); + // this.updatedDemands.clear(); + Arrays.fill(this.updatedDemands, false); + this.numUpdatedDemands = 0; } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java index 89b1b314..6938f7d7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java @@ -54,8 +54,6 @@ public class EqualShareFlowDistributor extends FlowDistributor { } this.outgoingDemandUpdateNeeded = false; - this.updatedDemands.clear(); - this.invalidate(); } /** @@ -72,6 +70,9 @@ public class EqualShareFlowDistributor extends FlowDistributor { this.pushOutgoingSupply( this.consumerEdges[consumerIndex], equalShare[consumerIndex], this.getConsumerResourceType()); } + + Arrays.fill(this.updatedDemands, false); + this.numUpdatedDemands = 0; } @Override diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java index e5e4eb59..b8337b7a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java @@ -36,8 +36,6 @@ import org.opendc.simulator.engine.graph.FlowEdge; */ public class MaxMinFairnessFlowDistributor extends FlowDistributor { - private boolean overloaded = false; - public MaxMinFairnessFlowDistributor(FlowEngine engine, int maxConsumers) { super(engine, maxConsumers); } @@ -63,6 +61,7 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { // If the demand is higher than the current supply, the system is overloaded. // The available supply is distributed based on the current distribution function. + // FIXME: There can a problem that the incoming supply is ony 11 decimal numbers and thus is smaller. if (this.totalIncomingDemand > this.totalIncomingSupply) { this.overloaded = true; @@ -95,16 +94,29 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { // Update the supplies of the consumers that changed their demand in the current cycle else { - for (int consumerIndex : this.updatedDemands) { + for (int consumerIndex : this.usedConsumerIndices) { + if (!this.updatedDemands[consumerIndex]) { + continue; + } this.pushOutgoingSupply( this.consumerEdges[consumerIndex], this.incomingDemands[consumerIndex], this.getConsumerResourceType()); } + + // + // for (int consumerIndex : this.updatedDemands) { + // this.pushOutgoingSupply( + // this.consumerEdges[consumerIndex], + // this.incomingDemands[consumerIndex], + // this.getConsumerResourceType()); + // } } } - this.updatedDemands.clear(); + // this.updatedDemands.clear(); + Arrays.fill(this.updatedDemands, false); + this.numUpdatedDemands = 0; } private record Demand(int idx, double value) {} -- cgit v1.2.3