summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostGpuStats.java3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt29
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt5
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt78
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt23
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReader.kt12
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReaderImpl.kt15
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskInfo.kt1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt14
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt139
-rw-r--r--opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt3
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt17
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt14
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt68
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/GpuTest.kt58
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt12
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/resources/experiments/experiment_2.json23
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/small_gpu.json37
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/fragments.parquetbin0 -> 20422 bytes
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/tasks.parquetbin0 -> 5368 bytes
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java32
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java149
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java66
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java88
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java13
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt12
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt2
32 files changed, 553 insertions, 386 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostGpuStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostGpuStats.java
index e42d7704..e7790975 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostGpuStats.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostGpuStats.java
@@ -43,4 +43,5 @@ public record HostGpuStats(
double capacity,
double demand,
double usage,
- double utilization) {}
+ double utilization,
+ double powerDraw) {}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
index effe3d5b..b7d3b730 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
@@ -361,6 +361,7 @@ public class SimHost(
for (gpu in simMachine!!.gpus) {
gpu.updateCounters(this.clock.millis())
val counters = simMachine!!.getGpuPerformanceCounters(gpu.id)
+ val powerDraw = simMachine!!.psu.getPowerDraw(ResourceType.GPU, gpu.id)
gpuStats.add(
HostGpuStats(
@@ -372,13 +373,14 @@ public class SimHost(
counters.demand,
counters.supply,
counters.supply / gpu.getCapacity(ResourceType.GPU),
+ powerDraw,
),
)
}
return gpuStats
}
- public fun getGpuStats(task: ServiceTask): List<GuestGpuStats> {
+ public fun getGpuStats(task: ServiceTask): GuestGpuStats? {
val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" }
return guest.getGpuStats()
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index a980f6cb..40de94bb 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -258,27 +258,24 @@ public class Guest(
)
}
- public fun getGpuStats(): List<GuestGpuStats> {
+ public fun getGpuStats(): GuestGpuStats? {
virtualMachine!!.updateCounters(this.clock.millis())
val counters = virtualMachine!!.gpuPerformanceCounters
- val gpuStats = mutableListOf<GuestGpuStats>()
- for (gpuCounter in counters) {
- gpuStats.add(
- GuestGpuStats(
- gpuCounter.activeTime / 1000L,
- gpuCounter.idleTime / 1000L,
- gpuCounter.stealTime / 1000L,
- gpuCounter.lostTime / 1000L,
- gpuCounter.capacity,
- gpuCounter.supply,
- gpuCounter.demand,
- // Assuming similar scaling as CPU
- gpuCounter.supply / gpuLimit,
- ),
+ return if (counters == null) {
+ null
+ } else {
+ GuestGpuStats(
+ counters.activeTime / 1000L,
+ counters.idleTime / 1000L,
+ counters.stealTime / 1000L,
+ counters.lostTime / 1000L,
+ counters.capacity,
+ counters.supply,
+ counters.demand,
+ counters.supply / gpuLimit,
)
}
- return gpuStats
}
/**
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt
index c7549433..43b9b449 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt
@@ -217,6 +217,11 @@ public data class ComputeExportConfig(
}
}
+public fun ComputeExportConfig.withGpuColumns(count: Int): ComputeExportConfig {
+ val hostCols = hostExportColumns + DfltHostExportColumns.gpuColumns(count)
+ return copy(hostExportColumns = hostCols)
+}
+
private val json = Json { ignoreUnknownKeys = true }
private inline fun <reified T : Exportable> JsonElement?.toFieldList(): List<ExportColumn<T>> =
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt
index affaab58..7a091379 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt
@@ -144,48 +144,6 @@ public object DfltHostExportColumns {
field = Types.required(INT64).named("cpu_time_lost"),
) { it.cpuLostTime }
- // TODO: support multiple GPUs
-
- public val GPU_CAPACITY: ExportColumn<HostTableReader> =
- ExportColumn(
- field = Types.optional(FLOAT).named("gpu_capacity"),
- ) { it.gpuCapacities.getOrNull(0) }
-
- public val GPU_USAGE: ExportColumn<HostTableReader> =
- ExportColumn(
- field = Types.optional(FLOAT).named("gpu_usage"),
- ) { it.gpuUsages.getOrNull(0) }
-
- public val GPU_DEMAND: ExportColumn<HostTableReader> =
- ExportColumn(
- field = Types.optional(FLOAT).named("gpu_demand"),
- ) { it.gpuDemands.getOrNull(0) }
-
- public val GPU_UTILIZATION: ExportColumn<HostTableReader> =
- ExportColumn(
- field = Types.optional(FLOAT).named("gpu_utilization"),
- ) { it.gpuUtilizations.getOrNull(0) }
-
- public val GPU_TIME_ACTIVE: ExportColumn<HostTableReader> =
- ExportColumn(
- field = Types.optional(INT64).named("gpu_time_active"),
- ) { it.gpuActiveTimes.getOrNull(0) }
-
- public val GPU_TIME_IDLE: ExportColumn<HostTableReader> =
- ExportColumn(
- field = Types.optional(INT64).named("gpu_time_idle"),
- ) { it.gpuIdleTimes.getOrNull(0) }
-
- public val GPU_TIME_STEAL: ExportColumn<HostTableReader> =
- ExportColumn(
- field = Types.optional(INT64).named("gpu_time_steal"),
- ) { it.gpuStealTimes.getOrNull(0) }
-
- public val GPU_TIME_LOST: ExportColumn<HostTableReader> =
- ExportColumn(
- field = Types.optional(INT64).named("gpu_time_lost"),
- ) { it.gpuLostTimes.getOrNull(0) }
-
public val POWER_DRAW: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(FLOAT).named("power_draw"),
@@ -217,6 +175,42 @@ public object DfltHostExportColumns {
) { it.bootTime?.toEpochMilli() }
/**
+ * Returns GPU-related export columns for the given number of GPUs.
+ */
+ public fun gpuColumns(count: Int): Set<ExportColumn<HostTableReader>> =
+ (0 until count).flatMap { i ->
+ listOf<ExportColumn<HostTableReader>>(
+ ExportColumn(
+ field = Types.optional(FLOAT).named("gpu_capacity_$i"),
+ ) { it.gpuCapacities.getOrNull(i) },
+ ExportColumn(
+ field = Types.optional(FLOAT).named("gpu_usage_$i"),
+ ) { it.gpuUsages.getOrNull(i) },
+ ExportColumn(
+ field = Types.optional(FLOAT).named("gpu_demand_$i"),
+ ) { it.gpuDemands.getOrNull(i) },
+ ExportColumn(
+ field = Types.optional(FLOAT).named("gpu_utilization_$i"),
+ ) { it.gpuUtilizations.getOrNull(i) },
+ ExportColumn(
+ field = Types.optional(INT64).named("gpu_time_active_$i"),
+ ) { it.gpuActiveTimes.getOrNull(i) },
+ ExportColumn(
+ field = Types.optional(INT64).named("gpu_time_idle_$i"),
+ ) { it.gpuIdleTimes.getOrNull(i) },
+ ExportColumn(
+ field = Types.optional(INT64).named("gpu_time_steal_$i"),
+ ) { it.gpuStealTimes.getOrNull(i) },
+ ExportColumn(
+ field = Types.optional(INT64).named("gpu_time_lost_$i"),
+ ) { it.gpuLostTimes.getOrNull(i) },
+ ExportColumn(
+ field = Types.optional(FLOAT).named("gpu_power_draw_$i"),
+ ) { it.gpuPowerDraws.getOrNull(i) },
+ )
+ }.toSet()
+
+ /**
* The columns that are always included in the output file.
*/
internal val BASE_EXPORT_COLUMNS =
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
index ad7a1d52..07750114 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
@@ -132,42 +132,45 @@ public object DfltTaskExportColumns {
field = Types.required(INT64).named("cpu_time_lost"),
) { it.cpuLostTime }
- // TODO: support multiple GPUs
+ public val GPU_COUNT: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.optional(INT32).named("gpu_count"),
+ ) { it.taskInfo.gpuCount }
- public val GPU_CAPACITY: ExportColumn<TaskTableReader> =
+ public val GPU_LIMIT: ExportColumn<TaskTableReader> =
ExportColumn(
- field = Types.optional(FLOAT).named("gpu_capacity"),
- ) { it.gpuLimits?.getOrNull(0) }
+ field = Types.optional(FLOAT).named("gpu_limit"),
+ ) { it.gpuLimit }
public val GPU_USAGE: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.optional(FLOAT).named("gpu_usage"),
- ) { it.gpuUsages?.getOrNull(0) }
+ ) { it.gpuUsage }
public val GPU_DEMAND: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.optional(FLOAT).named("gpu_demand"),
- ) { it.gpuDemands?.getOrNull(0) }
+ ) { it.gpuDemand }
public val GPU_TIME_ACTIVE: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.optional(INT64).named("gpu_time_active"),
- ) { it.gpuActiveTimes?.getOrNull(0) }
+ ) { it.gpuActiveTime }
public val GPU_TIME_IDLE: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.optional(INT64).named("gpu_time_idle"),
- ) { it.gpuIdleTimes?.getOrNull(0) }
+ ) { it.gpuIdleTime }
public val GPU_TIME_STEAL: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.optional(INT64).named("gpu_time_steal"),
- ) { it.gpuStealTimes?.getOrNull(0) }
+ ) { it.gpuStealTime }
public val GPU_TIME_LOST: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.optional(INT64).named("gpu_time_lost"),
- ) { it.gpuLostTimes?.getOrNull(0) }
+ ) { it.gpuLostTime }
public val UP_TIME: ExportColumn<TaskTableReader> =
ExportColumn(
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReader.kt
index fbffd508..f904ac9e 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReader.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReader.kt
@@ -113,16 +113,11 @@ public interface HostTableReader : Exportable {
public val cpuLostTime: Long
/**
- * The capacity of the CPUs in the host (in MHz).
+ * The capacity of the GPUs in the host (in MHz).They inserted by GPU ID.
*/
public val gpuCapacities: ArrayList<Double>
/**
- * The capacity of the GPUs in the host (in MHz). They inserted by GPU ID.
- */
- public val gpuLimits: ArrayList<Double>
-
- /**
* The usage per GPU in the host (in MHz). They inserted by GPU ID
*/
public val gpuUsages: ArrayList<Double>
@@ -158,6 +153,11 @@ public interface HostTableReader : Exportable {
public val gpuLostTimes: ArrayList<Long>
/**
+ * The power draw of the respective GPU in the host (in W). They inserted by GPU ID.
+ */
+ public val gpuPowerDraws: ArrayList<Double>
+
+ /**
* The current power draw of the host in W.
*/
public val powerDraw: Double
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReaderImpl.kt
index cb25358a..e46edc88 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReaderImpl.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReaderImpl.kt
@@ -60,7 +60,6 @@ public class HostTableReaderImpl(
_cpuLostTime = table.cpuLostTime
// GPU stats
_gpuCapacities = table.gpuCapacities
- _gpuLimits = table.gpuLimits
_gpuDemands = table.gpuDemands
_gpuUsages = table.gpuUsages
_gpuUtilizations = table.gpuUtilizations
@@ -68,6 +67,7 @@ public class HostTableReaderImpl(
_gpuIdleTimes = table.gpuIdleTimes
_gpuStealTimes = table.gpuStealTimes
_gpuLostTimes = table.gpuLostTimes
+ _gpuPowerDraws = table.gpuPowerDraws
// energy & carbon stats
_powerDraw = table.powerDraw
@@ -152,10 +152,6 @@ public class HostTableReaderImpl(
get() = _gpuCapacities
private var _gpuCapacities: ArrayList<Double> = ArrayList()
- override val gpuLimits: ArrayList<Double>
- get() = _gpuLimits
- private var _gpuLimits: ArrayList<Double> = ArrayList()
-
override val gpuUsages: ArrayList<Double>
get() = _gpuUsages
private var _gpuUsages: ArrayList<Double> = ArrayList()
@@ -170,7 +166,6 @@ public class HostTableReaderImpl(
// half of the CPU stats
override val gpuActiveTimes: ArrayList<Long>
-// get() = _gpuActiveTimes.zip(previousGpuActiveTimes) { current, previous -> current - previous} as ArrayList<Long>
get() =
(0 until _gpuActiveTimes.size).map {
i ->
@@ -180,7 +175,6 @@ public class HostTableReaderImpl(
private var previousGpuActiveTimes: ArrayList<Long> = ArrayList()
override val gpuIdleTimes: ArrayList<Long>
-// get() = _gpuIdleTimes.zip(previousGpuIdleTimes) { current, previous -> current - previous} as ArrayList<Long>
get() =
(0 until _gpuIdleTimes.size).map {
i ->
@@ -207,6 +201,10 @@ public class HostTableReaderImpl(
private var _gpuLostTimes: ArrayList<Long> = ArrayList()
private var previousGpuLostTimes: ArrayList<Long> = ArrayList()
+ override val gpuPowerDraws: ArrayList<Double>
+ get() = _gpuPowerDraws
+ private var _gpuPowerDraws: ArrayList<Double> = ArrayList()
+
override val powerDraw: Double
get() = _powerDraw
private var _powerDraw = 0.0
@@ -258,7 +256,7 @@ public class HostTableReaderImpl(
_cpuStealTime = hostCpuStats.stealTime
_cpuLostTime = hostCpuStats.lostTime
// GPU stats
- _gpuLimits = hostGpuStats.map { it.capacity } as ArrayList<Double>
+ _gpuCapacities = hostGpuStats.map { it.capacity } as ArrayList<Double>
_gpuDemands = hostGpuStats.map { it.demand } as ArrayList<Double>
_gpuUsages = hostGpuStats.map { it.usage } as ArrayList<Double>
_gpuUtilizations = hostGpuStats.map { it.utilization } as ArrayList<Double>
@@ -266,6 +264,7 @@ public class HostTableReaderImpl(
_gpuIdleTimes = hostGpuStats.map { it.idleTime } as ArrayList<Long>
_gpuStealTimes = hostGpuStats.map { it.stealTime } as ArrayList<Long>
_gpuLostTimes = hostGpuStats.map { it.lostTime } as ArrayList<Long>
+ _gpuPowerDraws = hostGpuStats.map { it.powerDraw } as ArrayList<Double>
// energy & carbon stats
_powerDraw = hostSysStats.powerDraw
_energyUsage = hostSysStats.energyUsage
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskInfo.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskInfo.kt
index e0b28379..c1a14613 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskInfo.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskInfo.kt
@@ -32,4 +32,5 @@ public data class TaskInfo(
val arch: String,
val cpuCount: Int,
val memCapacity: Long,
+ val gpuCount: Int,
)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt
index f71587c7..e3860606 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt
@@ -133,37 +133,37 @@ public interface TaskTableReader : Exportable {
/**
* The capacity of the GPUs of Host on which the task is running (in MHz).
*/
- public val gpuLimits: DoubleArray?
+ public val gpuLimit: Double?
/**
* The amount of GPus allocated to the task (in MHz).
*/
- public val gpuUsages: DoubleArray?
+ public val gpuUsage: Double?
/**
* The GPU demanded by this task (in MHz).
*/
- public val gpuDemands: DoubleArray?
+ public val gpuDemand: Double?
/**
* The duration (in seconds) that a GPU was active in the task.
*/
- public val gpuActiveTimes: LongArray?
+ public val gpuActiveTime: Long?
/**
* The duration (in seconds) that a GPU was idle in the task.
*/
- public val gpuIdleTimes: LongArray?
+ public val gpuIdleTime: Long?
/**
* The duration (in seconds) that a vGPU wanted to run, but no capacity was available.
*/
- public val gpuStealTimes: LongArray?
+ public val gpuStealTime: Long?
/**
* The duration (in seconds) of GPU time that was lost due to interference.
*/
- public val gpuLostTimes: LongArray?
+ public val gpuLostTime: Long?
/**
* The state of the task
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt
index 6128c9a2..ce62fdb0 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt
@@ -64,13 +64,13 @@ public class TaskTableReaderImpl(
_cpuStealTime = table.cpuStealTime
_cpuLostTime = table.cpuLostTime
// GPU stats
- _gpuLimits = table.gpuLimits
- _gpuDemands = table.gpuDemands
- _gpuUsages = table.gpuUsages
- _gpuActiveTimes = table.gpuActiveTimes
- _gpuIdleTimes = table.gpuIdleTimes
- _gpuStealTimes = table.gpuStealTimes
- _gpuLostTimes = table.gpuLostTimes
+ _gpuLimit = table.gpuLimit
+ _gpuDemand = table.gpuDemand
+ _gpuUsage = table.gpuUsage
+ _gpuActiveTime = table.gpuActiveTime
+ _gpuIdleTime = table.gpuIdleTime
+ _gpuStealTime = table.gpuStealTime
+ _gpuLostTime = table.gpuLostTime
_uptime = table.uptime
_downtime = table.downtime
@@ -95,6 +95,7 @@ public class TaskTableReaderImpl(
"x86",
task.flavor.cpuCoreCount,
task.flavor.memorySize,
+ task.flavor.gpuCoreCount,
)
/**
@@ -177,73 +178,37 @@ public class TaskTableReaderImpl(
private var _cpuLostTime = 0L
private var previousCpuLostTime = 0L
- override val gpuLimits: DoubleArray?
- get() = _gpuLimits ?: DoubleArray(0)
- private var _gpuLimits: DoubleArray? = null
+ override val gpuLimit: Double?
+ get() = _gpuLimit
+ private var _gpuLimit: Double? = 0.0
- override val gpuUsages: DoubleArray?
- get() = _gpuUsages ?: DoubleArray(0)
- private var _gpuUsages: DoubleArray? = null
+ override val gpuUsage: Double?
+ get() = _gpuUsage
+ private var _gpuUsage: Double? = 0.0
- override val gpuDemands: DoubleArray?
- get() = _gpuDemands ?: DoubleArray(0)
- private var _gpuDemands: DoubleArray? = null
+ override val gpuDemand: Double?
+ get() = _gpuDemand
+ private var _gpuDemand: Double? = 0.0
- override val gpuActiveTimes: LongArray?
- get() {
- val current = _gpuActiveTimes ?: return LongArray(0)
- val previous = previousGpuActiveTimes
+ override val gpuActiveTime: Long?
+ get() = (_gpuActiveTime ?: 0L) - (previousGpuActiveTime ?: 0L)
+ private var _gpuActiveTime: Long? = null
+ private var previousGpuActiveTime: Long? = null
- return if (previous == null || current.size != previous.size) { // not sure if I like the second clause
- current
- } else {
- LongArray(current.size) { i -> current[i] - previous[i] }
- }
- }
- private var _gpuActiveTimes: LongArray? = null
- private var previousGpuActiveTimes: LongArray? = null
-
- override val gpuIdleTimes: LongArray?
- get() {
- val current = _gpuIdleTimes ?: return LongArray(0)
- val previous = previousGpuIdleTimes
-
- return if (previous == null || current.size != previous.size) { // not sure if I like the second clause
- current
- } else {
- LongArray(current.size) { i -> current[i] - previous[i] }
- }
- }
- private var _gpuIdleTimes: LongArray? = null
- private var previousGpuIdleTimes: LongArray? = null
-
- override val gpuStealTimes: LongArray?
- get() {
- val current = _gpuStealTimes ?: return LongArray(0)
- val previous = previousGpuStealTimes
-
- return if (previous == null || current.size != previous.size) {
- current
- } else {
- LongArray(current.size) { i -> current[i] - previous[i] }
- }
- }
- private var _gpuStealTimes: LongArray? = null
- private var previousGpuStealTimes: LongArray? = null
-
- override val gpuLostTimes: LongArray?
- get() {
- val current = _gpuLostTimes ?: return LongArray(0)
- val previous = previousGpuLostTimes
-
- return if (previous == null || current.size != previous.size) {
- current
- } else {
- LongArray(current.size) { i -> current[i] - previous[i] }
- }
- }
- private var _gpuLostTimes: LongArray? = null
- private var previousGpuLostTimes: LongArray? = null
+ override val gpuIdleTime: Long?
+ get() = (_gpuIdleTime ?: 0L) - (previousGpuIdleTime ?: 0L)
+ private var _gpuIdleTime: Long? = null
+ private var previousGpuIdleTime: Long? = null
+
+ override val gpuStealTime: Long?
+ get() = (_gpuStealTime ?: 0L) - (previousGpuStealTime ?: 0L)
+ private var _gpuStealTime: Long? = null
+ private var previousGpuStealTime: Long? = null
+
+ override val gpuLostTime: Long?
+ get() = (_gpuLostTime ?: 0L) - (previousGpuLostTime ?: 0L)
+ private var _gpuLostTime: Long? = null
+ private var previousGpuLostTime: Long? = null
override val taskState: TaskState?
get() = _taskState
@@ -292,24 +257,14 @@ public class TaskTableReaderImpl(
_scheduleTime = task.scheduledAt
_finishTime = task.finishedAt
- if (gpuStats != null && gpuStats.isNotEmpty()) {
- val size = gpuStats.size
- _gpuLimits = DoubleArray(size) { i -> gpuStats[i].capacity }
- _gpuDemands = DoubleArray(size) { i -> gpuStats[i].demand }
- _gpuUsages = DoubleArray(size) { i -> gpuStats[i].usage }
- _gpuActiveTimes = LongArray(size) { i -> gpuStats[i].activeTime }
- _gpuIdleTimes = LongArray(size) { i -> gpuStats[i].idleTime }
- _gpuStealTimes = LongArray(size) { i -> gpuStats[i].stealTime }
- _gpuLostTimes = LongArray(size) { i -> gpuStats[i].lostTime }
- } else {
- _gpuIdleTimes = null
- _gpuStealTimes = null
- _gpuLostTimes = null
- _gpuIdleTimes = null
- _gpuLimits = null
- _gpuUsages = null
- _gpuDemands = null
- _gpuActiveTimes = null
+ if (gpuStats != null) {
+ _gpuLimit = gpuStats.capacity
+ _gpuDemand = gpuStats.demand
+ _gpuUsage = gpuStats.usage
+ _gpuActiveTime = gpuStats.activeTime
+ _gpuIdleTime = gpuStats.idleTime
+ _gpuStealTime = gpuStats.stealTime
+ _gpuLostTime = gpuStats.lostTime
}
_taskState = task.state
@@ -325,10 +280,10 @@ public class TaskTableReaderImpl(
previousCpuIdleTime = _cpuIdleTime
previousCpuStealTime = _cpuStealTime
previousCpuLostTime = _cpuLostTime
- previousGpuActiveTimes = _gpuActiveTimes
- previousGpuIdleTimes = _gpuIdleTimes
- previousGpuStealTimes = _gpuStealTimes
- previousGpuLostTimes = _gpuLostTimes
+ previousGpuActiveTime = _gpuActiveTime
+ previousGpuIdleTime = _gpuIdleTime
+ previousGpuStealTime = _gpuStealTime
+ previousGpuLostTime = _gpuLostTime
simHost = null
_cpuLimit = 0.0
diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt
index b52608a9..09a8fe64 100644
--- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt
+++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt
@@ -157,6 +157,7 @@ private fun ClusterJSONSpec.toClusterSpec(): ClusterSpec {
* Helper method to convert a [HostJSONSpec] into a [HostSpec]s.
*/
private var globalCoreId = 0
+private var globalGpuId = 0
private fun HostJSONSpec.toHostSpec(clusterName: String): HostSpec {
val units =
@@ -172,7 +173,7 @@ private fun HostJSONSpec.toHostSpec(clusterName: String): HostSpec {
val gpuUnits =
List(gpu?.count ?: 0) {
GpuModel(
- globalCoreId++,
+ globalGpuId++,
gpu!!.coreCount,
gpu.coreSpeed.toMHz(),
gpu.memoryBandwidth.toKibps(),
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index f7c444ea..e14a06cc 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -30,7 +30,9 @@ import org.opendc.compute.simulator.provisioner.setupComputeService
import org.opendc.compute.simulator.provisioner.setupHosts
import org.opendc.compute.simulator.scheduler.ComputeScheduler
import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig
import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor
+import org.opendc.compute.simulator.telemetry.parquet.withGpuColumns
import org.opendc.compute.topology.clusterTopology
import org.opendc.experiments.base.experiment.Scenario
import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec
@@ -125,7 +127,17 @@ public fun runScenario(
setupHosts(serviceDomain, topology, startTimeLong),
)
- addExportModel(provisioner, serviceDomain, scenario, seed, startTime, scenario.id)
+ val gpuCount = topology.flatMap { it.hostSpecs }.maxOfOrNull { it.model.gpuModels.size } ?: 0
+ addExportModel(
+ provisioner,
+ serviceDomain,
+ scenario,
+ seed,
+ startTime,
+ scenario.id,
+ computeExportConfig =
+ scenario.exportModelSpec.computeExportConfig.withGpuColumns(gpuCount),
+ )
val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
service.setTasksExpected(workload.size)
@@ -180,6 +192,7 @@ public fun addExportModel(
seed: Long,
startTime: Duration,
index: Int,
+ computeExportConfig: ComputeExportConfig = scenario.exportModelSpec.computeExportConfig,
) {
provisioner.runStep(
registerComputeMonitor(
@@ -189,7 +202,7 @@ public fun addExportModel(
"seed=$seed",
bufferSize = 4096,
scenario.exportModelSpec.filesToExportDict,
- computeExportConfig = scenario.exportModelSpec.computeExportConfig,
+ computeExportConfig = computeExportConfig,
),
Duration.ofSeconds(scenario.exportModelSpec.exportInterval),
startTime,
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt
index 6365f60d..cd16f174 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt
@@ -43,4 +43,18 @@ class ExperimentRunnerTest {
val someDir = File("output")
someDir.deleteRecursively()
}
+
+ /**
+ * ExperimentRunner test 2
+ * This test runs the experiment defined in the experiment_2.json file.
+ *
+ * In this test, parts of the Marconi 100 workload is executed . This trace contains GPU tasks.
+ */
+ @Test
+ fun testExperimentRunner2() {
+ ExperimentCommand().main(arrayOf("--experiment-path", "src/test/resources/experiments/experiment_2.json"))
+
+ val someDir = File("output")
+ someDir.deleteRecursively()
+ }
}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
index 7b7b23d2..606ba571 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
@@ -595,10 +595,10 @@ class FlowDistributorTest {
{ assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
// GPU
// task
- { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(1)?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
- { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(10)?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
- { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1)?.get(0)) { "The gpu used by task 0 is incorrect" } },
- { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(10)?.get(0)) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(1)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(10)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1)) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(10)) { "The gpu used by task 0 is incorrect" } },
// host
{ assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } },
@@ -643,10 +643,10 @@ class FlowDistributorTest {
{ assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
// GPU
// task
- { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(0)?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
- { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } },
- { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(0)?.get(0)) { "The gpu used by task 0 is incorrect" } },
- { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(0)) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } },
// host
{ assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } },
{ assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } },
@@ -691,10 +691,10 @@ class FlowDistributorTest {
{ assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
// GPU
// task
- { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(0)?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
- { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } },
- { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(0)?.get(0)) { "The gpu used by task 0 is incorrect" } },
- { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(0)) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } },
// host
{ assertEquals(2000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } },
@@ -737,10 +737,10 @@ class FlowDistributorTest {
{ assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
// GPU
// task
- { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(1)?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
- { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } },
- { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1)?.get(0)) { "The gpu used by task 0 is incorrect" } },
- { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(1)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1)) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } },
// host
{ assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } },
@@ -799,17 +799,17 @@ class FlowDistributorTest {
{ assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
// GPU
// task 0
- { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(0)?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
- { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } },
- { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(0)?.get(0)) { "The gpu used by task 0 is incorrect" } },
- { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(0)) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } },
// task 1
- { assert(monitor.taskGpuDemands["1"]?.get(0)?.isEmpty() ?: false) { "The gpu demanded by task 1 is incorrect" } },
- { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(10)?.get(0)) { "The gpu demanded by task 1 is incorrect" } },
- { assert(monitor.taskGpuDemands["1"]?.get(19)?.isEmpty() ?: false) { "The gpu demanded by task 1 is incorrect" } },
- { assert(monitor.taskGpuSupplied["1"]?.get(0)?.isEmpty() ?: false) { "The gpu used by task 1 is incorrect" } },
- { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(10)?.get(0)) { "The gpu used by task 1 is incorrect" } },
- { assert(monitor.taskGpuSupplied["1"]?.get(19)?.isEmpty() ?: false) { "The gpu used by task 1 is incorrect" } },
+ { assertEquals(0.0, monitor.taskGpuDemands["1"]?.get(0)) { "The gpu demanded by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(10)) { "The gpu demanded by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(19)) { "The gpu used by task 1 is incorrect" } },
+ { assertEquals(0.0, monitor.taskGpuSupplied["1"]?.get(0)) { "The gpu used by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(10)) { "The gpu used by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(19)) { "The gpu used by task 1 is incorrect" } },
// host
{ assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } },
{ assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } },
@@ -865,15 +865,15 @@ class FlowDistributorTest {
{ assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
// GPU
// task 0
- { assertEquals(0.0, monitor.taskGpuDemands["0"]?.get(0)?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
- { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } },
- { assertEquals(0.0, monitor.taskGpuSupplied["0"]?.get(0)?.get(0)) { "The gpu used by task 0 is incorrect" } },
- { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(0.0, monitor.taskGpuDemands["0"]?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(0.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(0.0, monitor.taskGpuSupplied["0"]?.get(0)) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(0.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } },
// task 1
- { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(0)?.get(0)) { "The gpu demanded by task 1 is incorrect" } },
- { assert(monitor.taskGpuDemands["1"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 1 is incorrect" } },
- { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(0)?.get(0)) { "The gpu used by task 1 is incorrect" } },
- { assert(monitor.taskGpuSupplied["1"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(0)) { "The gpu demanded by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(9)) { "The gpu demanded by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(0)) { "The gpu used by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(9)) { "The gpu used by task 1 is incorrect" } },
// host
{ assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } },
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/GpuTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/GpuTest.kt
index 6e5a6b5e..2778e613 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/GpuTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/GpuTest.kt
@@ -119,17 +119,16 @@ class GpuTest {
@Test
fun testGpuHostCreationMultiMinimal() {
val topology = createTopology("Gpus/multi_gpu_no_vendor_no_memory.json")
- val count = 3
assertGpuConfiguration(
topology,
- coreCount = 1 * count,
+ coreCount = 1,
coreSpeed = 2000.0,
- memorySize = -1L * count,
+ memorySize = -1L,
memoryBandwidth = -1.0,
vendor = "unknown",
modelName = "unknown",
architecture = "unknown",
- gpuCount = 1,
+ gpuCount = 3,
)
}
@@ -139,18 +138,17 @@ class GpuTest {
@Test
fun testGpuHostCreationMultiWithMemoryNoVendor() {
val topology = createTopology("Gpus/multi_gpu_no_vendor.json")
- val count = 100
assertGpuConfiguration(
topology,
- coreCount = 1 * count,
+ coreCount = 1,
coreSpeed = 2000.0,
- memorySize = 4096L * count,
+ memorySize = 4096L,
memoryBandwidth = 500.0,
vendor = "unknown",
modelName = "unknown",
architecture = "unknown",
- gpuCount = 1,
+ gpuCount = 100,
)
}
@@ -160,17 +158,16 @@ class GpuTest {
@Test
fun testGpuHostCreationMultiNoMemoryWithVendor() {
val topology = createTopology("Gpus/multi_gpu_no_memory.json")
- val count = 2
assertGpuConfiguration(
topology,
- coreCount = 1 * count,
+ coreCount = 1,
coreSpeed = 2000.0,
- memorySize = -1L * count,
+ memorySize = -1L,
memoryBandwidth = -1.0,
vendor = "NVIDIA",
modelName = "Tesla V100",
architecture = "Volta",
- gpuCount = 1,
+ gpuCount = 2,
)
}
@@ -180,19 +177,20 @@ class GpuTest {
@Test
fun testGpuHostCreationMultiWithMemoryWithVendor() {
val topology = createTopology("Gpus/multi_gpu_full.json")
+ // temporary implementation, to account for GPU concatenation
val count = 5
assertGpuConfiguration(
topology,
// cuda cores
- coreCount = 5120 * count,
+ coreCount = 5120,
// fictional value
coreSpeed = 5000.0,
- memorySize = 30517578125 * count,
+ memorySize = 30517578125,
memoryBandwidth = 7031250000.0,
vendor = "NVIDIA",
modelName = "Tesla V100",
architecture = "Volta",
- gpuCount = 1,
+ gpuCount = 5,
)
}
@@ -243,20 +241,26 @@ class GpuTest {
{ assertEquals(2000.0, monitor.hostCpuSupplied["DualGpuHost"]?.get(9)) { "The cpu used by the host is incorrect" } },
// GPU
// task 0
- { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(1)?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
- { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(8)?.get(0)) { "The gpu demanded by task 0 is incorrect" } },
- { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(1)?.get(0)) { "The gpu used by task 0 is incorrect" } },
- { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(8)?.get(0)) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(1)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(8)) { "The gpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(1)) { "The gpu used by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(8)) { "The gpu used by task 0 is incorrect" } },
// task 1
- { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(1)?.get(0)) { "The gpu demanded by task 1 is incorrect" } },
- { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(8)?.get(0)) { "The gpu demanded by task 1 is incorrect" } },
- { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(1)?.get(0)) { "The gpu used by task 1 is incorrect" } },
- { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(8)?.get(0)) { "The gpu used by task 1 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(1)) { "The gpu demanded by task 1 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(8)) { "The gpu demanded by task 1 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(1)) { "The gpu used by task 1 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(8)) { "The gpu used by task 1 is incorrect" } },
// host
- { assertEquals(4000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(9)?.get(0)) { "The gpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(9)?.get(0)) { "The gpu used by the host is incorrect" } },
+ // GPU 0
+ { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(9)?.get(0)) { "The gpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(9)?.get(0)) { "The gpu used by the host is incorrect" } },
+ // GPU 1
+ { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1)) { "The gpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(9)?.get(1)) { "The gpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1)) { "The gpu used by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(9)?.get(1)) { "The gpu used by the host is incorrect" } },
)
}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
index 59b8d070..7b3db348 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
@@ -148,8 +148,8 @@ fun runTest(
class TestComputeMonitor : ComputeMonitor {
var taskCpuDemands = mutableMapOf<String, ArrayList<Double>>()
var taskCpuSupplied = mutableMapOf<String, ArrayList<Double>>()
- var taskGpuDemands = mutableMapOf<String, ArrayList<DoubleArray?>>()
- var taskGpuSupplied = mutableMapOf<String, ArrayList<DoubleArray?>>()
+ var taskGpuDemands = mutableMapOf<String, ArrayList<Double?>?>()
+ var taskGpuSupplied = mutableMapOf<String, ArrayList<Double?>?>()
override fun record(reader: TaskTableReader) {
val taskName: String = reader.taskInfo.name
@@ -162,11 +162,11 @@ class TestComputeMonitor : ComputeMonitor {
taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage)
}
if (taskName in taskGpuDemands) {
- taskGpuDemands[taskName]?.add(reader.gpuDemands)
- taskGpuSupplied[taskName]?.add(reader.gpuUsages)
+ taskGpuDemands[taskName]?.add(reader.gpuDemand)
+ taskGpuSupplied[taskName]?.add(reader.gpuUsage)
} else {
- taskGpuDemands[taskName] = arrayListOf(reader.gpuDemands)
- taskGpuSupplied[taskName] = arrayListOf(reader.gpuUsages)
+ taskGpuDemands[taskName] = arrayListOf(reader.gpuDemand)
+ taskGpuSupplied[taskName] = arrayListOf(reader.gpuUsage)
}
}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/experiments/experiment_2.json b/opendc-experiments/opendc-experiments-base/src/test/resources/experiments/experiment_2.json
new file mode 100644
index 00000000..65fb4b5f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/resources/experiments/experiment_2.json
@@ -0,0 +1,23 @@
+{
+ "topologies": [
+ {
+ "pathToFile": "src/test/resources/topologies/Gpus/small_gpu.json"
+ }
+ ],
+ "workloads": [{
+ "pathToFile": "src/test/resources/workloadTraces/small_gpu",
+ "type": "ComputeWorkload"
+ }],
+ "allocationPolicies": [
+ {
+ "type": "prefab",
+ "name": "ProvisionedCpuGpuCores"
+ }
+ ],
+ "exportModels": [
+ {
+ "exportInterval": 3600,
+ "printFrequency": 24
+ }
+ ]
+}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/small_gpu.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/small_gpu.json
new file mode 100644
index 00000000..4848c303
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/small_gpu.json
@@ -0,0 +1,37 @@
+{
+ "clusters": [
+ {
+ "name": "C01",
+ "hosts": [
+ {
+ "name": "H01",
+ "cpu": {
+ "coreCount": 4,
+ "coreSpeed": 2000
+ },
+ "memory": {
+ "memorySize": 140457600000
+ },
+ "cpuPowerModel": {
+ "modelType": "linear",
+ "power": 400.0,
+ "idlePower": 100.0,
+ "maxPower": 200.0
+ },
+ "gpu": {
+ "coreCount": 2,
+ "coreSpeed": 2000,
+ "count": 3
+ },
+ "gpuPowerModel": {
+ "modelType": "linear",
+ "power": 400.0,
+ "idlePower": 100.0,
+ "maxPower": 200.0
+ },
+ "count": 4
+ }
+ ]
+ }
+ ]
+}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/fragments.parquet
new file mode 100644
index 00000000..7dda2c97
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/fragments.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/tasks.parquet
new file mode 100644
index 00000000..23331729
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/tasks.parquet
Binary files differ
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
index 8792552e..c9e3ab8c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
@@ -211,27 +211,43 @@ public class SimMachine {
ResourceType.CPU,
new ArrayList<>(List.of(new SimCpu(engine, this.machineModel.getCpuModel(), cpuPowerModel, 0))));
- new FlowEdge((FlowConsumer) this.computeResources.get(ResourceType.CPU).getFirst(), this.psu);
+ // Connect the CPU to the PSU
+ new FlowEdge(
+ (FlowConsumer) this.computeResources.get(ResourceType.CPU).getFirst(),
+ this.psu,
+ ResourceType.POWER,
+ 0,
+ -1);
// Create a FlowDistributor and add the cpu as supplier
this.distributors.put(ResourceType.CPU, new FlowDistributor(engine));
- new FlowEdge(this.distributors.get(ResourceType.CPU), (FlowSupplier)
- this.computeResources.get(ResourceType.CPU).getFirst());
+ new FlowEdge(
+ this.distributors.get(ResourceType.CPU),
+ (FlowSupplier) this.computeResources.get(ResourceType.CPU).getFirst(),
+ ResourceType.CPU,
+ -1,
+ 0);
// TODO: include memory as flow node
this.memory = new Memory(engine, this.machineModel.getMemory());
if (this.availableResources.contains(ResourceType.GPU)) {
this.distributors.put(ResourceType.GPU, new FlowDistributor(engine));
- short i = 0;
ArrayList<ComputeResource> gpus = new ArrayList<>();
for (GpuModel gpuModel : machineModel.getGpuModels()) {
- SimGpu gpu = new SimGpu(engine, gpuModel, gpuPowerModel, i);
+ // create a new GPU
+ SimGpu gpu = new SimGpu(engine, gpuModel, gpuPowerModel, gpuModel.getId());
gpus.add(gpu);
- // suspends here without the distributor
- new FlowEdge(this.distributors.get(ResourceType.GPU), gpu);
- new FlowEdge(gpu, this.psu);
+ // Connect the GPU to the distributor
+ new FlowEdge(
+ this.distributors.get(ResourceType.GPU),
+ gpu,
+ ResourceType.GPU,
+ gpuModel.getId(),
+ gpuModel.getId());
+ // Connect the GPU to the PSU
+ new FlowEdge(gpu, this.psu, ResourceType.POWER, gpuModel.getId(), gpuModel.getId());
}
this.computeResources.put(ResourceType.GPU, gpus);
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java
index 874194f6..e11d9cf2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java
@@ -35,7 +35,6 @@ import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicy
public final class MachineModel {
private final CpuModel cpuModel;
private final MemoryUnit memory;
- // private final List<GpuModel> gpuModels = new ArrayList<>(); // TODO: Implement multi GPU support
private final List<GpuModel> gpuModels;
private final DistributionPolicy cpuDistributionStrategy;
private final DistributionPolicy gpuDistributionPolicy;
@@ -58,19 +57,8 @@ public final class MachineModel {
this.gpuDistributionPolicy = gpuDistributionPolicy;
this.availableResources.add(ResourceType.CPU);
// TODO: Add Memory
- // this.usedResources.add(ResourceType.Memory);
if (gpuModels != null && !gpuModels.isEmpty()) {
- // this.gpuModels = gpuModels;
- this.gpuModels = new ArrayList<>();
- this.gpuModels.add(new GpuModel(
- 0,
- gpuModels.getFirst().getCoreCount() * gpuModels.size(), // merges multiple GPUs into one
- gpuModels.getFirst().getCoreSpeed(),
- gpuModels.getFirst().getMemoryBandwidth(),
- gpuModels.getFirst().getMemorySize() * gpuModels.size(), // merges multiple GPUs into one
- gpuModels.getFirst().getVendor(),
- gpuModels.getFirst().getModelName(),
- gpuModels.getFirst().getArchitecture()));
+ this.gpuModels = gpuModels;
this.availableResources.add(ResourceType.GPU);
} else {
this.gpuModels = new ArrayList<>();
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java
index 1ea7c570..f40f4fec 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java
@@ -43,11 +43,14 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
private static final Logger LOGGER = LoggerFactory.getLogger(SimPsu.class);
private long lastUpdate;
- private final HashMap<ResourceType, ArrayList<Double>> powerDemandsPerResource = new HashMap<>();
- private final HashMap<ResourceType, ArrayList<Double>> powerSuppliedPerResource = new HashMap<>();
+ private final HashMap<ResourceType, HashMap<Integer, Double>> powerDemandsPerResource = new HashMap<>();
+ private final HashMap<ResourceType, HashMap<Integer, Double>> powerSuppliedPerResource = new HashMap<>();
+
+ private double totalPowerDemand = 0.0;
+ private double totalPowerSupplied = 0.0;
private double totalEnergyUsage = 0.0;
- private final HashMap<ResourceType, ArrayList<FlowEdge>> resourceEdges = new HashMap<>();
+ private final HashMap<ResourceType, HashMap<Integer, FlowEdge>> resourceEdges = new HashMap<>();
private FlowEdge powerSupplyEdge;
private final double capacity = Long.MAX_VALUE;
@@ -72,28 +75,51 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
* This method provides access to the power consumption of the machine before PSU losses are applied.
*/
public double getPowerDemand() {
- return this.powerDemandsPerResource.values().stream()
- .flatMap(List::stream)
- .findFirst()
- .orElse(0.0);
+ return this.totalPowerDemand;
}
+ /**
+ * Return the power demand of the machine (in W) measured in the PSU for a specific resource type.
+ * <p>
+ * This method provides access to the power consumption of the machine before PSU losses are applied.
+ */
public double getPowerDemand(ResourceType resourceType) {
- return this.powerDemandsPerResource.get(resourceType).getFirst();
+ // return this.powerDemandsPerResource.get(resourceType).stream().mapToDouble(Double::doubleValue).sum();
+ return this.powerDemandsPerResource.get(resourceType).values().stream()
+ .mapToDouble(Double::doubleValue)
+ .sum();
+ }
+
+ /**
+ * Return the power demand of the machine (in W) measured in the PSU for a specific resource type for a specific resource.
+ * <p>
+ * This method provides access to the power consumption of the machine before PSU losses are applied.
+ */
+ public double getPowerDemand(ResourceType resourceType, int id) {
+ return this.powerDemandsPerResource.get(resourceType).get(id);
}
/**
* Return the instantaneous power usage of the machine (in W) measured at the InPort of the power supply.
*/
public double getPowerDraw() {
- return this.powerSuppliedPerResource.values().stream()
- .flatMap(List::stream)
- .findFirst()
- .orElse(0.0);
+ return this.totalPowerSupplied;
}
+ /**
+ * Return the instantaneous power usage of the machine (in W) measured at the InPort of the power supply for a specific resource type.
+ */
public double getPowerDraw(ResourceType resourceType) {
- return this.powerSuppliedPerResource.get(resourceType).getFirst();
+ return this.powerSuppliedPerResource.get(resourceType).values().stream()
+ .mapToDouble(Double::doubleValue)
+ .sum();
+ }
+
+ /**
+ * Return the instantaneous power usage of the machine (in W) measured at the InPort of the power supply for a specific resource type for a specific resource.
+ */
+ public double getPowerDraw(ResourceType resourceType, int id) {
+ return this.powerSuppliedPerResource.get(resourceType).get(id);
}
/**
@@ -127,16 +153,22 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
public long onUpdate(long now) {
updateCounters();
for (ResourceType resourceType : this.resourceEdges.keySet()) {
- ArrayList<FlowEdge> edges = this.resourceEdges.get(resourceType);
+ HashMap<Integer, FlowEdge> edges = this.resourceEdges.get(resourceType);
if (edges != null && !edges.isEmpty()) {
- double powerSupply =
- this.powerDemandsPerResource.get(resourceType).getFirst();
- double powerSupplied =
- this.powerSuppliedPerResource.get(resourceType).getFirst();
-
- if (powerSupply != powerSupplied) {
- for (FlowEdge edge : edges) {
- edge.pushSupply(powerSupply);
+ for (FlowEdge edge : edges.values()) {
+ // If the edge is null, it means that the edge has been removed -> no update is needed
+ if (edge == null) {
+ continue;
+ }
+
+ int consumerIndex = edge.getConsumerIndex() == -1 ? 0 : edge.getConsumerIndex();
+ double powerDemand =
+ this.powerDemandsPerResource.get(resourceType).get(consumerIndex);
+ double powerSupplied =
+ this.powerSuppliedPerResource.get(resourceType).get(consumerIndex);
+
+ if (powerDemand != powerSupplied) {
+ edge.pushSupply(powerDemand);
}
}
}
@@ -159,7 +191,8 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
long duration = now - lastUpdate;
if (duration > 0) {
for (ResourceType resourceType : this.powerSuppliedPerResource.keySet()) {
- for (double powerSupplied : this.powerSuppliedPerResource.get(resourceType)) {
+ for (double powerSupplied :
+ this.powerSuppliedPerResource.get(resourceType).values()) {
this.totalEnergyUsage += (powerSupplied * duration * 0.001);
}
}
@@ -171,17 +204,8 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@Override
- public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand, ResourceType resourceType) {
- this.powerDemandsPerResource.put(resourceType, new ArrayList<>(List.of(newDemand)));
- powerSupplyEdge.pushDemand(newDemand);
- }
-
- @Override
public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
- double totalDemand = this.powerDemandsPerResource.values().stream()
- .flatMap(List::stream)
- .reduce(0.0, Double::sum);
- this.powerSupplyEdge.pushDemand(totalDemand);
+ this.powerSupplyEdge.pushDemand(newDemand);
}
@Override
@@ -191,7 +215,13 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
@Override
public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply, ResourceType resourceType) {
- this.powerSuppliedPerResource.put(resourceType, new ArrayList<>(List.of(newSupply)));
+ int consumerIndex = consumerEdge.getConsumerIndex() == -1 ? 0 : consumerEdge.getConsumerIndex();
+
+ double previousSupply = this.powerSuppliedPerResource.get(resourceType).get(consumerIndex);
+ this.totalPowerSupplied += newSupply - previousSupply;
+
+ this.powerSuppliedPerResource.get(resourceType).put(consumerIndex, newSupply);
+
consumerEdge.pushSupply(newSupply, false, resourceType);
}
@@ -203,18 +233,29 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
@Override
public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand, ResourceType resourceType) {
updateCounters();
- this.powerDemandsPerResource.put(resourceType, new ArrayList<>(List.of(newPowerDemand)));
+ int consumerIndex = consumerEdge.getConsumerIndex() == -1 ? 0 : consumerEdge.getConsumerIndex();
+
+ double previousPowerDemand =
+ this.powerDemandsPerResource.get(resourceType).get(consumerIndex);
+ this.totalPowerDemand += newPowerDemand - previousPowerDemand;
- pushOutgoingDemand(this.powerSupplyEdge, newPowerDemand);
+ this.powerDemandsPerResource.get(resourceType).put(consumerIndex, newPowerDemand);
+
+ pushOutgoingDemand(this.powerSupplyEdge, totalPowerDemand);
}
@Override
public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
updateCounters();
for (ResourceType resourceType : this.resourceEdges.keySet()) {
- for (FlowEdge edge : this.resourceEdges.get(resourceType)) {
+ for (FlowEdge edge : this.resourceEdges.get(resourceType).values()) {
+ // If the edge is null, it means that the edge has been removed -> no update is needed
+ if (edge == null) {
+ continue;
+ }
+ int consumerIndex = edge.getConsumerIndex() == -1 ? 0 : edge.getConsumerIndex();
double outgoingSupply =
- Math.min(this.powerDemandsPerResource.get(resourceType).getFirst(), newSupply);
+ Math.min(this.powerDemandsPerResource.get(resourceType).get(consumerIndex), newSupply);
pushOutgoingSupply(edge, outgoingSupply, resourceType);
}
}
@@ -222,10 +263,19 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
@Override
public void addConsumerEdge(FlowEdge consumerEdge) {
+
ResourceType consumerResourceType = consumerEdge.getConsumerResourceType();
- this.resourceEdges.put(consumerResourceType, new ArrayList<>(List.of(consumerEdge)));
- this.powerDemandsPerResource.put(consumerResourceType, new ArrayList<>(List.of(0.0)));
- this.powerSuppliedPerResource.put(consumerResourceType, new ArrayList<>(List.of(0.0)));
+ int consumerIndex = consumerEdge.getConsumerIndex() == -1 ? 0 : consumerEdge.getConsumerIndex();
+
+ if (!this.resourceEdges.containsKey(consumerResourceType)) {
+ this.resourceEdges.put(consumerResourceType, new HashMap<>());
+ this.powerDemandsPerResource.put(consumerResourceType, new HashMap<>());
+ this.powerSuppliedPerResource.put(consumerResourceType, new HashMap<>());
+ }
+
+ this.resourceEdges.get(consumerResourceType).put(consumerIndex, consumerEdge);
+ this.powerDemandsPerResource.get(consumerResourceType).put(consumerIndex, 0.0);
+ this.powerSuppliedPerResource.get(consumerResourceType).put(consumerIndex, 0.0);
}
@Override
@@ -236,10 +286,18 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
@Override
public void removeConsumerEdge(FlowEdge consumerEdge) {
ResourceType resourceType = consumerEdge.getConsumerResourceType();
+ int consumerIndex = consumerEdge.getConsumerIndex() == -1 ? 0 : consumerEdge.getConsumerIndex();
+
if (this.resourceEdges.containsKey(resourceType)) {
- this.resourceEdges.remove(resourceType);
- this.powerDemandsPerResource.remove(resourceType);
- this.powerSuppliedPerResource.remove(resourceType);
+ this.resourceEdges.get(resourceType).put(consumerIndex, null);
+
+ this.totalPowerDemand -=
+ this.powerDemandsPerResource.get(resourceType).get(consumerIndex);
+ this.powerDemandsPerResource.get(resourceType).put(consumerIndex, 0.0);
+
+ this.totalPowerSupplied -=
+ this.powerSuppliedPerResource.get(resourceType).get(consumerIndex);
+ this.powerSuppliedPerResource.get(resourceType).put(consumerIndex, 0.0);
}
}
@@ -252,7 +310,8 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() {
List<FlowEdge> supplyingEdges = new ArrayList<>();
for (ResourceType resourceType : this.resourceEdges.keySet()) {
- List<FlowEdge> edges = this.resourceEdges.get(resourceType);
+ List<FlowEdge> edges =
+ this.resourceEdges.get(resourceType).values().stream().toList();
if (edges != null && !edges.isEmpty()) {
supplyingEdges.addAll(edges);
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java
index 622d2b89..8922a97d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java
@@ -56,7 +56,7 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier {
private final Hashtable<ResourceType, Double> resourceCapacities = new Hashtable<>();
private final Hashtable<ResourceType, Double> resourceTimeScalingFactor = new Hashtable<>(); // formerly known as d
private final Hashtable<ResourceType, FlowEdge> distributorEdges = new Hashtable<>();
- private final Hashtable<ResourceType, List<PerformanceCounters>> resourcePerformanceCounters = new Hashtable<>();
+ private final Hashtable<ResourceType, PerformanceCounters> resourcePerformanceCounters = new Hashtable<>();
private final long checkpointInterval;
private final long checkpointDuration;
@@ -108,21 +108,11 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier {
}
public PerformanceCounters getCpuPerformanceCounters() {
- return this.resourcePerformanceCounters.get(ResourceType.CPU).getFirst();
+ return this.resourcePerformanceCounters.get(ResourceType.CPU);
}
- public List<PerformanceCounters> getGpuPerformanceCounters() {
- return this.resourcePerformanceCounters.get(ResourceType.GPU) != null
- ? this.resourcePerformanceCounters.get(ResourceType.GPU)
- : new ArrayList<>();
- }
-
- public PerformanceCounters getGpuPerformanceCounters(int gpuId) {
- List<PerformanceCounters> gpuPerformanceCounters = this.resourcePerformanceCounters.get(ResourceType.GPU);
- if (gpuId < 0 || gpuId >= gpuPerformanceCounters.size()) {
- throw new IndexOutOfBoundsException("No such GPU id: " + gpuId);
- }
- return gpuPerformanceCounters.get(gpuId);
+ public PerformanceCounters getGpuPerformanceCounters() {
+ return this.resourcePerformanceCounters.get(ResourceType.GPU);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -171,13 +161,9 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier {
this.resourceCapacities.put(resourceType, resources.getFirst().getCapacity());
- ArrayList<PerformanceCounters> performanceCounters = new ArrayList<>();
-
- for (ComputeResource resource : resources) {
- performanceCounters.add(new PerformanceCounters());
- this.resourceTimeScalingFactor.put(resourceType, 1.0 / resource.getCapacity());
- }
- this.resourcePerformanceCounters.put(resourceType, performanceCounters);
+ this.resourceTimeScalingFactor.put(
+ resourceType, 1.0 / resources.getFirst().getCapacity());
+ this.resourcePerformanceCounters.put(resourceType, new PerformanceCounters());
this.resourceDemands.put(resourceType, 0.0);
this.resourceSupplies.put(resourceType, 0.0);
}
@@ -225,21 +211,24 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier {
for (ResourceType resourceType : this.availableResources) {
int i = 0;
final double factor = this.resourceTimeScalingFactor.get(resourceType) * delta;
- for (PerformanceCounters performanceCounter : this.resourcePerformanceCounters.get(resourceType)) {
- if (delta > 0) {
- performanceCounter.addActiveTime(Math.round(this.resourceSupplies.get(resourceType) * factor));
- performanceCounter.setIdleTime(Math.round(
- (this.resourceCapacities.get(resourceType) - this.resourceSupplies.get(resourceType))
- * factor));
- performanceCounter.addStealTime(Math.round(
- (this.resourceDemands.get(resourceType) - this.resourceSupplies.get(resourceType))
- * factor));
- }
- performanceCounter.setDemand(this.resourceDemands.get(resourceType));
- performanceCounter.setSupply(this.resourceSupplies.get(resourceType));
- performanceCounter.setCapacity(this.resourceCapacities.get(resourceType));
- i++;
+ if (delta > 0) {
+ this.resourcePerformanceCounters
+ .get(resourceType)
+ .addActiveTime(Math.round(this.resourceSupplies.get(resourceType) * factor));
+ this.resourcePerformanceCounters
+ .get(resourceType)
+ .setIdleTime(Math.round(
+ (this.resourceCapacities.get(resourceType) - this.resourceSupplies.get(resourceType))
+ * factor));
+ this.resourcePerformanceCounters
+ .get(resourceType)
+ .addStealTime(Math.round(
+ (this.resourceDemands.get(resourceType) - this.resourceSupplies.get(resourceType))
+ * factor));
}
+ this.resourcePerformanceCounters.get(resourceType).setDemand(this.resourceDemands.get(resourceType));
+ this.resourcePerformanceCounters.get(resourceType).setSupply(this.resourceSupplies.get(resourceType));
+ this.resourcePerformanceCounters.get(resourceType).setCapacity(this.resourceCapacities.get(resourceType));
}
}
@@ -317,7 +306,12 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier {
@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
ResourceType resourceType = supplierEdge.getSupplierResourceType();
- this.resourceCapacities.put(resourceType, supplierEdge.getCapacity());
+ if (this.resourceCapacities.containsKey(resourceType) && this.resourceCapacities.get(resourceType) > 0) {
+ this.resourceCapacities.put(
+ resourceType, this.resourceCapacities.get(resourceType) + supplierEdge.getCapacity());
+ } else {
+ this.resourceCapacities.put(resourceType, supplierEdge.getCapacity());
+ }
this.distributorEdges.put(resourceType, supplierEdge);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index 674db8ca..f7fc2728 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -23,6 +23,7 @@
package org.opendc.simulator.engine.graph;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -32,20 +33,30 @@ import org.opendc.common.ResourceType;
import org.opendc.simulator.engine.engine.FlowEngine;
import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicy;
import org.opendc.simulator.engine.graph.distributionPolicies.MaxMinFairnessPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class);
private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
- private FlowEdge supplierEdge;
+ private HashMap<Integer, FlowEdge> supplierEdges =
+ new HashMap<>(); // The suppliers that provide supply to this distributor
private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers
private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers
private double totalIncomingDemand; // The total demand of all the consumers
- private double currentIncomingSupply; // The current supply provided by the supplier
+ // AS index is based on the supplierIndex of the FlowEdge, ids of entries need to be stable
+ private HashMap<Integer, Double> currentIncomingSupplies =
+ new HashMap<>(); // The current supply provided by the suppliers
+ private Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers
private boolean outgoingDemandUpdateNeeded = false;
private Set<Integer> updatedDemands = new HashSet<>(); // Array of consumers that updated their demand in this cycle
+ private ResourceType supplierResourceType;
+ private ResourceType consumerResourceType;
+
private boolean overloaded = false;
private double capacity; // What is the max capacity. Can probably be removed
@@ -66,7 +77,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
}
public double getCurrentIncomingSupply() {
- return currentIncomingSupply;
+ return this.totalIncomingSupply;
}
public double getCapacity() {
@@ -90,7 +101,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
}
private void updateOutgoingDemand() {
- this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand);
+ // equally distribute the demand to all suppliers
+ for (FlowEdge supplierEdge : this.supplierEdges.values()) {
+ this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size());
+ // alternatively a relative share could be used, based on capacity minus current incoming supply
+ // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() -
+ // currentIncomingSupplies.get(idx) / supplierEdges.size()));
+ }
this.outgoingDemandUpdateNeeded = false;
@@ -102,11 +119,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
// If the demand is higher than the current supply, the system is overloaded.
// The available supply is distributed based on the current distribution function.
- if (this.totalIncomingDemand > this.currentIncomingSupply) {
+ if (this.totalIncomingDemand > this.totalIncomingSupply) {
this.overloaded = true;
- double[] supplies =
- this.distributionPolicy.distributeSupply(this.incomingDemands, this.currentIncomingSupply);
+ double[] supplies = this.distributionPolicy.distributeSupply(
+ this.incomingDemands,
+ new ArrayList<>(this.currentIncomingSupplies.values()),
+ this.totalIncomingSupply);
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType());
@@ -151,13 +170,18 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
this.consumerEdges.add(consumerEdge);
this.incomingDemands.add(0.0);
this.outgoingSupplies.add(0.0);
+ this.consumerResourceType = consumerEdge.getConsumerResourceType();
}
@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
- this.supplierEdge = supplierEdge;
- this.capacity = supplierEdge.getCapacity();
- this.currentIncomingSupply = 0;
+ // supplierIndex not always set, so we use 0 as default to avoid index out of bounds
+ int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex();
+
+ this.supplierEdges.put(idx, supplierEdge);
+ this.capacity += supplierEdge.getCapacity();
+ this.currentIncomingSupplies.put(idx, 0.0);
+ this.supplierResourceType = supplierEdge.getSupplierResourceType();
}
@Override
@@ -202,13 +226,16 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
@Override
public void removeSupplierEdge(FlowEdge supplierEdge) {
- this.supplierEdge = null;
- this.capacity = 0;
- this.currentIncomingSupply = 0;
-
- this.updatedDemands.clear();
-
- this.closeNode();
+ // supplierIndex not always set, so we use 0 as default to avoid index out of bounds
+ int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex();
+ // to keep index consistent, entries are neutralized instead of removed
+ this.supplierEdges.put(idx, null);
+ this.capacity -= supplierEdge.getCapacity();
+ this.currentIncomingSupplies.put(idx, 0.0);
+
+ if (this.supplierEdges.isEmpty()) {
+ this.updatedDemands.clear();
+ }
}
@Override
@@ -216,7 +243,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
int idx = consumerEdge.getConsumerIndex();
if (idx == -1) {
- System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer");
+ LOGGER.warn("Demand {} pushed by an unknown consumer", newDemand);
return;
}
@@ -224,6 +251,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
double prevDemand = incomingDemands.get(idx);
incomingDemands.set(idx, newDemand);
+ // only update the total supply if the new supply is different from the previous one
this.totalIncomingDemand += (newDemand - prevDemand);
this.updatedDemands.add(idx);
@@ -243,14 +271,20 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
@Override
public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
- this.currentIncomingSupply = newSupply;
+ // supplierIndex not always set, so we use 0 as default to avoid index out of bounds
+ int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex();
+ double prevSupply = currentIncomingSupplies.get(idx);
+
+ currentIncomingSupplies.put(idx, newSupply);
+ // only update the total supply if the new supply is different from the previous one
+ this.totalIncomingSupply += (newSupply - prevSupply);
this.invalidate();
}
@Override
public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
- this.supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType());
+ supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType());
}
@Override
@@ -267,23 +301,25 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
outgoingSupplies.set(idx, newSupply);
consumerEdge.pushSupply(newSupply, false, this.getSupplierResourceType());
- consumerEdge.pushSupply(newSupply, false, this.getSupplierResourceType());
}
@Override
public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() {
- List<FlowEdge> supplyingEdges = (this.supplierEdge != null) ? List.of(this.supplierEdge) : List.of();
-
- return Map.of(FlowEdge.NodeType.CONSUMING, supplyingEdges, FlowEdge.NodeType.SUPPLYING, this.consumerEdges);
+ return Map.of(
+ FlowEdge.NodeType.CONSUMING,
+ this.consumerEdges,
+ FlowEdge.NodeType.SUPPLYING,
+ new ArrayList<>(this.supplierEdges.values()));
}
@Override
public ResourceType getSupplierResourceType() {
- return this.supplierEdge.getSupplierResourceType();
+ // return this.supplierEdge.getSupplierResourceType();
+ return this.supplierResourceType;
}
@Override
public ResourceType getConsumerResourceType() {
- return this.consumerEdges.getFirst().getConsumerResourceType();
+ return this.consumerResourceType;
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
index aa3894c1..db2a2944 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
@@ -54,6 +54,15 @@ public class FlowEdge {
}
public FlowEdge(FlowConsumer consumer, FlowSupplier supplier, ResourceType resourceType) {
+ this(consumer, supplier, resourceType, -1, -1);
+ }
+
+ public FlowEdge(
+ FlowConsumer consumer,
+ FlowSupplier supplier,
+ ResourceType resourceType,
+ int consumerIndex,
+ int supplierIndex) {
if (!(consumer instanceof FlowNode)) {
throw new IllegalArgumentException("Flow consumer is not a FlowNode");
}
@@ -67,6 +76,10 @@ public class FlowEdge {
this.capacity = supplier.getCapacity(resourceType);
+ // to avoid race condition of setting indices and requiring them in the PSU
+ this.supplierIndex = supplierIndex;
+ this.consumerIndex = consumerIndex;
+
this.consumer.addSupplierEdge(this);
this.supplier.addConsumerEdge(this);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java
index 9d2246cd..3a8bebbc 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java
@@ -25,5 +25,5 @@ package org.opendc.simulator.engine.graph.distributionPolicies;
import java.util.ArrayList;
public interface DistributionPolicy {
- double[] distributeSupply(ArrayList<Double> supply, double currentSupply);
+ double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java
index 40d70b5e..baa04975 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java
@@ -42,7 +42,7 @@ public class FixedShare implements DistributionPolicy {
}
@Override
- public double[] distributeSupply(ArrayList<Double> supply, double currentSupply) {
+ public double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply) {
return new double[0];
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java
index 1d387349..484e7fe4 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java
@@ -34,7 +34,7 @@ public class MaxMinFairnessPolicy implements DistributionPolicy {
private record Demand(int idx, double value) {}
@Override
- public double[] distributeSupply(ArrayList<Double> demands, double currentSupply) {
+ public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
int inputSize = demands.size();
final double[] supplies = new double[inputSize];
@@ -50,7 +50,7 @@ public class MaxMinFairnessPolicy implements DistributionPolicy {
return i1.compareTo(i2);
});
- double availableCapacity = currentSupply; // totalSupply
+ double availableCapacity = totalSupply;
for (int i = 0; i < inputSize; i++) {
double d = tempDemands[i].value;
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt
index e7d35630..53e594de 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt
@@ -110,6 +110,12 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) :
Types
.required(PrimitiveType.PrimitiveTypeName.DOUBLE)
.named("cpuUsage"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("gpuCount"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("gpuUsage"),
)
.named("resource_state")
@@ -137,6 +143,12 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) :
Types
.required(PrimitiveType.PrimitiveTypeName.DOUBLE)
.named("cpu_usage"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("gpu_count"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("gpu_usage"),
)
.named("resource_state")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt
index 9ad786d5..a53dcdb2 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt
@@ -87,7 +87,7 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate
localCpuUsage = value
}
}
- "gpu_count", "gpu_cores" ->
+ "gpu_count", "gpuCount", "gpu_cores", "gpuCores" ->
object : PrimitiveConverter() {
override fun addInt(value: Int) {
localGpuCount = value