summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-11-29 13:54:31 +0100
committerGitHub <noreply@github.com>2024-11-29 13:54:31 +0100
commita49a3878758438fe8d04bf4c4d3e3ffc5873aace (patch)
treed89f2fcc058a9b23b798c29402f8b8fd69beca41
parent124b40ce36fa03c5275e12ff5a020fc40fe5fd5a (diff)
Multiplexer update (#278)
* Fixed the Multiplexer.java to properly divide the supply over the different consumers. Fixed a bug where fragments were being loaded in reversed order. * Optimized the Multiplexer.java, by only updating the supply of the consumer that updated its demand when possible.
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt10
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt10
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt26
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java114
8 files changed, 123 insertions, 65 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java
index 97202104..61c4145c 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java
@@ -40,4 +40,5 @@ public record GuestCpuStats(
long lostTime,
double capacity,
double usage,
+ double demand,
double utilization) {}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 3a923222..3a2416a8 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -233,6 +233,7 @@ public class Guest(
counters.cpuLostTime / 1000L,
counters.cpuCapacity,
counters.cpuSupply,
+ counters.cpuDemand,
counters.cpuSupply / cpuLimit,
)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
index 6658e444..cf315947 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
@@ -97,6 +97,16 @@ public object DfltTaskExportColumns {
field = Types.required(FLOAT).named("cpu_limit"),
) { it.cpuLimit }
+ public val CPU_USAGE: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("cpu_usage"),
+ ) { it.cpuUsage }
+
+ public val CPU_DEMAND: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("cpu_demand"),
+ ) { it.cpuDemand }
+
public val CPU_TIME_ACTIVE: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(INT64).named("cpu_time_active"),
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt
index 50ffa5fc..0a752e75 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt
@@ -100,6 +100,16 @@ public interface TaskTableReader : Exportable {
public val cpuLimit: Double
/**
+ * The CPU given to this task (in MHz).
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The CPU demanded by this task (in MHz).
+ */
+ public val cpuDemand: Double
+
+ /**
* The duration (in seconds) that a CPU was active in the task.
*/
public val cpuActiveTime: Long
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt
index 5a0897f7..edcc8b20 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt
@@ -55,6 +55,8 @@ public class TaskTableReaderImpl(
_timestampAbsolute = table.timestampAbsolute
_cpuLimit = table.cpuLimit
+ _cpuDemand = table.cpuDemand
+ _cpuUsage = table.cpuUsage
_cpuActiveTime = table.cpuActiveTime
_cpuIdleTime = table.cpuIdleTime
_cpuStealTime = table.cpuStealTime
@@ -128,6 +130,14 @@ public class TaskTableReaderImpl(
get() = _cpuLimit
private var _cpuLimit = 0.0
+ override val cpuUsage: Double
+ get() = _cpuUsage
+ private var _cpuUsage = 0.0
+
+ override val cpuDemand: Double
+ get() = _cpuDemand
+ private var _cpuDemand = 0.0
+
override val cpuActiveTime: Long
get() = _cpuActiveTime - previousCpuActiveTime
private var _cpuActiveTime = 0L
@@ -181,14 +191,16 @@ public class TaskTableReaderImpl(
_timestampAbsolute = now + startTime
_cpuLimit = cpuStats?.capacity ?: 0.0
- _cpuActiveTime = cpuStats?.activeTime ?: 0
- _cpuIdleTime = cpuStats?.idleTime ?: 0
- _cpuStealTime = cpuStats?.stealTime ?: 0
- _cpuLostTime = cpuStats?.lostTime ?: 0
- _uptime = sysStats?.uptime?.toMillis() ?: 0
- _downtime = sysStats?.downtime?.toMillis() ?: 0
+ _cpuDemand = cpuStats?.demand ?: 0.0
+ _cpuUsage = cpuStats?.usage ?: 0.0
+ _cpuActiveTime = cpuStats?.activeTime ?: _cpuActiveTime
+ _cpuIdleTime = cpuStats?.idleTime ?: _cpuIdleTime
+ _cpuStealTime = cpuStats?.stealTime ?: _cpuStealTime
+ _cpuLostTime = cpuStats?.lostTime ?: _cpuLostTime
+ _uptime = sysStats?.uptime?.toMillis() ?: _uptime
+ _downtime = sysStats?.downtime?.toMillis() ?: _downtime
_provisionTime = task.launchedAt
- _bootTime = sysStats?.bootTime
+ _bootTime = sysStats?.bootTime ?: _bootTime
_creationTime = task.createdAt
_finishTime = task.finishedAt
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
index 9ce25c99..10478174 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
@@ -253,9 +253,9 @@ class ScenarioIntegrationTest {
assertAll(
{ assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } },
{ assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } },
- { assertEquals(4297000, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(5003000, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(14824, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
)
@@ -294,8 +294,8 @@ class ScenarioIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(1803918431, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(787181569, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(1803918432, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(787181568, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
@@ -341,8 +341,8 @@ class ScenarioIntegrationTest {
{ assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
{ assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
- { assertEquals(43101787498, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(3489412502, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
{ assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
index d3edc957..63331a6c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
@@ -40,8 +40,9 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
private double currentCpuDemand = 0.0f; // cpu capacity demanded by the mux
private double currentCpuUtilization = 0.0f;
- private double currentPowerDemand = 0.0f; // power demanded of the psu
private double currentCpuSupplied = 0.0f; // cpu capacity supplied to the mux
+
+ private double currentPowerDemand = 0.0f; // power demanded of the psu
private double currentPowerSupplied = 0.0f; // cpu capacity supplied by the psu
private double maxCapacity;
@@ -122,7 +123,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
public long onUpdate(long now) {
updateCounters(now);
- this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;
+ this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);
// Calculate Power Demand and send to PSU
double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
@@ -132,7 +133,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
}
// Calculate the amount of cpu this can provide
- double cpuSupply = this.currentCpuDemand;
+ double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);
if (cpuSupply != this.currentCpuSupplied) {
this.pushSupply(this.muxEdge, cpuSupply);
@@ -205,6 +206,8 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
this.currentCpuDemand = newCpuDemand;
this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;
+ this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);
+
// Calculate Power Demand and send to PSU
double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
@@ -223,7 +226,8 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
this.currentPowerSupplied = newPowerSupply;
// Calculate the amount of cpu this can provide
- double cpuSupply = this.currentCpuDemand;
+ double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);
+ ;
if (cpuSupply != this.currentCpuSupplied) {
this.pushSupply(this.muxEdge, cpuSupply);
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
index 8cd2fa6f..ece90c20 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
@@ -39,6 +39,10 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
private double totalDemand; // The total demand of all the consumers
private double totalSupply; // The total supply from the supplier
+
+ private boolean overProvisioned = false;
+ private int currentConsumerIdx = -1;
+
private double capacity; // What is the max capacity
public Multiplexer(FlowGraph graph) {
@@ -59,52 +63,63 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
public long onUpdate(long now) {
- if (this.totalDemand > this.capacity) {
- redistributeSupply(this.consumerEdges, this.supplies, this.capacity);
- } else {
- for (int i = 0; i < this.demands.size(); i++) {
- this.supplies.set(i, this.demands.get(i));
+ return Long.MAX_VALUE;
+ }
+
+ private void distributeSupply() {
+ // if supply >= demand -> push supplies to all tasks
+ // TODO: possible optimization -> Only has to be done for the specific consumer that changed demand
+ if (this.totalSupply >= this.totalDemand) {
+
+ // If this came from a state of over provisioning, provide all consumers with their demand
+ if (this.overProvisioned) {
+ for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
+ this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx));
+ }
}
- }
- double totalSupply = 0;
- for (int i = 0; i < this.consumerEdges.size(); i++) {
- this.pushSupply(this.consumerEdges.get(i), this.supplies.get(i));
- totalSupply += this.supplies.get(i);
+ if (this.currentConsumerIdx != -1) {
+ this.pushSupply(
+ this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx));
+ this.currentConsumerIdx = -1;
+ }
+
+ this.overProvisioned = false;
}
- // Only update supplier if supply has changed
- if (this.totalSupply != totalSupply) {
- this.totalSupply = totalSupply;
+ // if supply < demand -> distribute the supply over all consumers
+ else {
+ this.overProvisioned = true;
+ double[] supplies = redistributeSupply(this.demands, this.totalSupply);
- pushDemand(this.supplierEdge, this.totalSupply);
+ for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
+ this.pushSupply(this.consumerEdges.get(idx), supplies[idx]);
+ }
}
-
- return Long.MAX_VALUE;
}
- private static double redistributeSupply(
- ArrayList<FlowEdge> consumerEdges, ArrayList<Double> supplies, double capacity) {
- final long[] consumers = new long[consumerEdges.size()];
+ private record Demand(int idx, double value) {}
- for (int i = 0; i < consumers.length; i++) {
- FlowEdge consumer = consumerEdges.get(i);
+ private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) {
+ int inputSize = demands.size();
- if (consumer == null) {
- break;
- }
+ final double[] supplies = new double[inputSize];
+ final Demand[] tempDemands = new Demand[inputSize];
- consumers[i] = (Double.doubleToRawLongBits(consumer.getDemand()) << 32) | (i & 0xFFFFFFFFL);
+ for (int i = 0; i < inputSize; i++) {
+ tempDemands[i] = new Demand(i, demands.get(i));
}
- Arrays.sort(consumers);
- double availableCapacity = capacity;
- int inputSize = consumers.length;
+ Arrays.sort(tempDemands, (o1, o2) -> {
+ Double i1 = o1.value;
+ Double i2 = o2.value;
+ return i1.compareTo(i2);
+ });
+
+ double availableCapacity = totalSupply; // totalSupply
for (int i = 0; i < inputSize; i++) {
- long v = consumers[i];
- int slot = (int) v;
- double d = Double.longBitsToDouble((int) (v >> 32));
+ double d = tempDemands[i].value;
if (d == 0.0) {
continue;
@@ -113,12 +128,13 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
double availableShare = availableCapacity / (inputSize - i);
double r = Math.min(d, availableShare);
- supplies.set(slot, r); // Update the rates
+ int idx = tempDemands[i].idx;
+ supplies[idx] = r; // Update the rates
availableCapacity -= r;
}
// Return the used capacity
- return capacity - availableCapacity;
+ return supplies;
}
/**
@@ -132,8 +148,6 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
this.consumerEdges.add(consumerEdge);
this.demands.add(0.0);
this.supplies.add(0.0);
-
- this.invalidate();
}
@Override
@@ -141,8 +155,6 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
this.supplierEdge = supplierEdge;
this.capacity = supplierEdge.getCapacity();
this.totalSupply = 0;
-
- this.invalidate();
}
@Override
@@ -164,7 +176,9 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
this.consumerEdges.get(i).setConsumerIndex(i);
}
- this.invalidate();
+ this.currentConsumerIdx = -1;
+
+ this.pushDemand(this.supplierEdge, this.totalDemand);
}
@Override
@@ -178,28 +192,34 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
public void handleDemand(FlowEdge consumerEdge, double newDemand) {
int idx = consumerEdge.getConsumerIndex();
+ this.currentConsumerIdx = idx;
+
if (idx == -1) {
System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer");
return;
}
+ // Update the total demand (This is cheaper than summing over all demands)
double prevDemand = demands.get(idx);
- demands.set(idx, newDemand);
+ demands.set(idx, newDemand);
this.totalDemand += (newDemand - prevDemand);
- if (this.totalDemand <= this.capacity) {
-
- this.totalSupply = this.totalDemand;
- this.pushDemand(this.supplierEdge, this.totalSupply);
-
- this.pushSupply(consumerEdge, newDemand);
+ if (overProvisioned) {
+ distributeSupply();
}
- // TODO: add behaviour if capacity is reached
+
+ // Send new totalDemand to CPU
+ // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply)
+ this.pushDemand(this.supplierEdge, this.totalDemand);
}
@Override
- public void handleSupply(FlowEdge supplierEdge, double newSupply) {}
+ public void handleSupply(FlowEdge supplierEdge, double newSupply) {
+ this.totalSupply = newSupply; // Currently this is from a single supply, might turn into multiple suppliers
+
+ this.distributeSupply();
+ }
@Override
public void pushDemand(FlowEdge supplierEdge, double newDemand) {