From e22c97dcca7478d6941b78bdf7cd873bc0d23cdc Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 22 Jul 2025 15:47:44 +0200 Subject: Updated workload schema (#360) --- .../compute/workload/trace/SimTraceWorkload.java | 10 ++-- .../compute/workload/trace/TraceFragment.java | 29 +++-------- .../compute/workload/trace/TraceWorkload.java | 60 ++++++---------------- .../simulator/engine/graph/FlowDistributor.java | 6 ++- .../MaxMinFairnessFlowDistributor.java | 4 -- 5 files changed, 31 insertions(+), 78 deletions(-) (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index 8b3a7188..95487476 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -66,7 +66,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private final TraceWorkload snapshot; private final ScalingPolicy scalingPolicy; - private final String taskName; + private final int taskId; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Basic Getters and Setters @@ -107,7 +107,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.scalingPolicy = workload.getScalingPolicy(); this.remainingFragments = new LinkedList<>(workload.getFragments()); this.fragmentIndex = 0; - this.taskName = workload.getTaskName(); + this.taskId = workload.getTaskId(); this.startOfFragment = this.clock.millis(); @@ -135,7 +135,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.scalingPolicy = workload.getScalingPolicy(); this.remainingFragments = new LinkedList<>(workload.getFragments()); this.fragmentIndex = 0; - this.taskName = workload.getTaskName(); + this.taskId = workload.getTaskId(); this.startOfFragment = this.clock.millis(); @@ -325,9 +325,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { TraceFragment newFragment = new TraceFragment( remainingDuration, currentFragment.cpuUsage(), - currentFragment.cpuCoreCount(), currentFragment.gpuUsage(), - currentFragment.gpuCoreCount(), currentFragment.gpuMemoryUsage()); // Alter the snapshot by removing finished fragments @@ -340,9 +338,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { TraceFragment snapshotFragment = new TraceFragment( this.checkpointDuration, this.snapshot.getMaxCpuDemand(), - this.snapshot.getMaxCoreCount(), this.snapshot.getMaxGpuDemand(), - this.snapshot.getMaxGpuCoreCount(), this.snapshot.getMaxGpuMemoryDemand()); this.remainingFragments.addFirst(snapshotFragment); diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java index bc3685a3..c17671a7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java @@ -24,19 +24,18 @@ package org.opendc.simulator.compute.workload.trace; import org.opendc.common.ResourceType; -public record TraceFragment( - long duration, double cpuUsage, int cpuCoreCount, double gpuUsage, int gpuCoreCount, Long gpuMemoryUsage) { +public record TraceFragment(long duration, double cpuUsage, double gpuUsage, int gpuMemoryUsage) { - public TraceFragment(long start, long duration, double cpuUsage, int cpuCoreCount) { - this(duration, cpuUsage, cpuCoreCount, 0.0, 0, 0L); + public TraceFragment(long start, long duration, double cpuUsage) { + this(duration, cpuUsage, 0.0, 0); } - public TraceFragment(long duration, double cpuUsage, int cpuCoreCount) { - this(duration, cpuUsage, cpuCoreCount, 0.0, 0, 0L); + public TraceFragment(long duration, double cpuUsage) { + this(duration, cpuUsage, 0.0, 0); } - public TraceFragment(long duration, double cpuUsage, int cpuCoreCount, double gpuUsage, int gpuCoreCount) { - this(duration, cpuUsage, cpuCoreCount, gpuUsage, gpuCoreCount, 0L); + public TraceFragment(long duration, double cpuUsage, double gpuUsage) { + this(duration, cpuUsage, gpuUsage, 0); } /** @@ -53,18 +52,4 @@ public record TraceFragment( default -> throw new IllegalArgumentException("Invalid resource type: " + resourceType); }; } - - /** - * Returns the core count for the specified resource type. - * - * @param resourceType the type of resource - * @return the core count for the specified resource type - */ - public int getCoreCount(ResourceType resourceType) throws IllegalArgumentException { - return switch (resourceType) { - case CPU -> cpuCoreCount; - case GPU -> gpuCoreCount; - default -> throw new IllegalArgumentException("Invalid resource type: " + resourceType); - }; - } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java index d698a48d..53ce9f31 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -41,12 +41,10 @@ public class TraceWorkload implements Workload { private final long checkpointDuration; private final double checkpointIntervalScaling; private final double maxCpuDemand; - private final int maxCpuCoreCount; private final double maxGpuDemand; - private final int maxGpuCoreCount; - private final long maxGpuMemoryDemand; - private final String taskName; - private ResourceType[] resourceTypes = new ResourceType[ResourceType.values().length]; + private final int maxGpuMemoryDemand; + private final int taskId; + private final ResourceType[] resourceTypes; public ScalingPolicy getScalingPolicy() { return scalingPolicy; @@ -60,36 +58,25 @@ public class TraceWorkload implements Workload { long checkpointDuration, double checkpointIntervalScaling, ScalingPolicy scalingPolicy, - String taskName, + int taskId, ResourceType[] resourceTypes) { this.fragments = fragments; this.checkpointInterval = checkpointInterval; this.checkpointDuration = checkpointDuration; this.checkpointIntervalScaling = checkpointIntervalScaling; this.scalingPolicy = scalingPolicy; - this.taskName = taskName; + this.taskId = taskId; // TODO: remove if we decide not to use it. this.maxCpuDemand = fragments.stream() .max(Comparator.comparing(TraceFragment::cpuUsage)) .get() - // .cpuUsage(); .getResourceUsage(ResourceType.CPU); - this.maxCpuCoreCount = fragments.stream() - .max(Comparator.comparing(TraceFragment::cpuCoreCount)) - .get() - // .cpuCoreCount(); - .getCoreCount(ResourceType.CPU); - this.maxGpuDemand = fragments.stream() .max(Comparator.comparing(TraceFragment::gpuUsage)) .get() .getResourceUsage(ResourceType.GPU); - this.maxGpuCoreCount = fragments.stream() - .max(Comparator.comparing(TraceFragment::gpuCoreCount)) - .get() - .getCoreCount(ResourceType.GPU); - this.maxGpuMemoryDemand = 0L; // TODO: add GPU memory demand to the trace fragments + this.maxGpuMemoryDemand = 0; // TODO: add GPU memory demand to the trace fragments this.resourceTypes = resourceTypes; } @@ -113,10 +100,6 @@ public class TraceWorkload implements Workload { return checkpointIntervalScaling; } - public int getMaxCoreCount() { - return maxCpuCoreCount; - } - public double getMaxCpuDemand() { return maxCpuDemand; } @@ -125,16 +108,12 @@ public class TraceWorkload implements Workload { return maxGpuDemand; } - public int getMaxGpuCoreCount() { - return maxGpuCoreCount; - } - - public long getMaxGpuMemoryDemand() { + public int getMaxGpuMemoryDemand() { return maxGpuMemoryDemand; } - public String getTaskName() { - return taskName; + public int getTaskId() { + return taskId; } public void removeFragments(int numberOfFragments) { @@ -171,8 +150,8 @@ public class TraceWorkload implements Workload { long checkpointDuration, double checkpointIntervalScaling, ScalingPolicy scalingPolicy, - String taskName) { - return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy, taskName); + int taskId) { + return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy, taskId); } public static final class Builder { @@ -181,7 +160,7 @@ public class TraceWorkload implements Workload { private final long checkpointDuration; private final double checkpointIntervalScaling; private final ScalingPolicy scalingPolicy; - private final String taskName; + private final int taskId; private final ResourceType[] resourceTypes = new ResourceType[ResourceType.values().length]; /** @@ -192,13 +171,13 @@ public class TraceWorkload implements Workload { long checkpointDuration, double checkpointIntervalScaling, ScalingPolicy scalingPolicy, - String taskName) { + int taskId) { this.fragments = new ArrayList<>(); this.checkpointInterval = checkpointInterval; this.checkpointDuration = checkpointDuration; this.checkpointIntervalScaling = checkpointIntervalScaling; this.scalingPolicy = scalingPolicy; - this.taskName = taskName; + this.taskId = taskId; } /** @@ -206,22 +185,17 @@ public class TraceWorkload implements Workload { * * @param duration The timestamp at which the fragment ends (in epoch millis). * @param cpuUsage The CPU usage at this fragment. - * @param cpuCores The number of cores used during this fragment. * @param gpuUsage The GPU usage at this fragment. - * @param gpuCores The number of GPU cores used during this fragment. * @param gpuMemoryUsage The GPU memory usage at this fragment. */ - public void add( - long duration, double cpuUsage, int cpuCores, double gpuUsage, int gpuCores, long gpuMemoryUsage) { + public void add(long duration, double cpuUsage, double gpuUsage, int gpuMemoryUsage) { if (cpuUsage > 0.0) { this.resourceTypes[ResourceType.CPU.ordinal()] = ResourceType.CPU; } if (gpuUsage > 0.0) { this.resourceTypes[ResourceType.GPU.ordinal()] = ResourceType.GPU; } - fragments.add( - fragments.size(), - new TraceFragment(duration, cpuUsage, cpuCores, gpuUsage, gpuCores, gpuMemoryUsage)); + fragments.add(fragments.size(), new TraceFragment(duration, cpuUsage, gpuUsage, gpuMemoryUsage)); } /** @@ -234,7 +208,7 @@ public class TraceWorkload implements Workload { this.checkpointDuration, this.checkpointIntervalScaling, this.scalingPolicy, - this.taskName, + this.taskId, this.resourceTypes); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index c388293b..501bbf10 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; * It also provides methods to update outgoing demands and supplies based on the incoming demands and supplies. * This class is abstract and should be extended by specific implementations that define the distribution strategy. * It uses a {@link FlowDistributorFactory.DistributionPolicy} to determine how to distribute the supply among the consumers. - * The default distribution policy is {@link MaxMinFairnessPolicy}, which distributes the supply fairly among the consumers. + * The default distribution policy is MaxMinFairnessPolicy, which distributes the supply fairly among the consumers. */ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { protected static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class); @@ -178,7 +178,9 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, // supplierIndex not always set, so we use 0 as default to avoid index out of bounds int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); // to keep index consistent, entries are neutralized instead of removed - this.supplierEdges.put(idx, null); + // this.supplierEdges.put(idx, null); + + this.supplierEdges.remove(idx); this.capacity -= supplierEdge.getCapacity(); this.currentIncomingSupplies.put(idx, 0.0); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java index 9b48f204..371015a4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java @@ -44,12 +44,8 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { } protected void updateOutgoingDemand() { - // equally distribute the demand to all suppliers for (FlowEdge supplierEdge : this.supplierEdges.values()) { this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size()); - // alternatively a relative share could be used, based on capacity minus current incoming supply - // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() - - // currentIncomingSupplies.get(idx) / supplierEdges.size())); } this.outgoingDemandUpdateNeeded = false; -- cgit v1.2.3