diff options
32 files changed, 553 insertions, 386 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostGpuStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostGpuStats.java index e42d7704..e7790975 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostGpuStats.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostGpuStats.java @@ -43,4 +43,5 @@ public record HostGpuStats( double capacity, double demand, double usage, - double utilization) {} + double utilization, + double powerDraw) {} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt index effe3d5b..b7d3b730 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -361,6 +361,7 @@ public class SimHost( for (gpu in simMachine!!.gpus) { gpu.updateCounters(this.clock.millis()) val counters = simMachine!!.getGpuPerformanceCounters(gpu.id) + val powerDraw = simMachine!!.psu.getPowerDraw(ResourceType.GPU, gpu.id) gpuStats.add( HostGpuStats( @@ -372,13 +373,14 @@ public class SimHost( counters.demand, counters.supply, counters.supply / gpu.getCapacity(ResourceType.GPU), + powerDraw, ), ) } return gpuStats } - public fun getGpuStats(task: ServiceTask): List<GuestGpuStats> { + public fun getGpuStats(task: ServiceTask): GuestGpuStats? { val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" } return guest.getGpuStats() } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index a980f6cb..40de94bb 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -258,27 +258,24 @@ public class Guest( ) } - public fun getGpuStats(): List<GuestGpuStats> { + public fun getGpuStats(): GuestGpuStats? { virtualMachine!!.updateCounters(this.clock.millis()) val counters = virtualMachine!!.gpuPerformanceCounters - val gpuStats = mutableListOf<GuestGpuStats>() - for (gpuCounter in counters) { - gpuStats.add( - GuestGpuStats( - gpuCounter.activeTime / 1000L, - gpuCounter.idleTime / 1000L, - gpuCounter.stealTime / 1000L, - gpuCounter.lostTime / 1000L, - gpuCounter.capacity, - gpuCounter.supply, - gpuCounter.demand, - // Assuming similar scaling as CPU - gpuCounter.supply / gpuLimit, - ), + return if (counters == null) { + null + } else { + GuestGpuStats( + counters.activeTime / 1000L, + counters.idleTime / 1000L, + counters.stealTime / 1000L, + counters.lostTime / 1000L, + counters.capacity, + counters.supply, + counters.demand, + counters.supply / gpuLimit, ) } - return gpuStats } /** diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt index c7549433..43b9b449 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt @@ -217,6 +217,11 @@ public data class ComputeExportConfig( } } +public fun ComputeExportConfig.withGpuColumns(count: Int): ComputeExportConfig { + val hostCols = hostExportColumns + DfltHostExportColumns.gpuColumns(count) + return copy(hostExportColumns = hostCols) +} + private val json = Json { ignoreUnknownKeys = true } private inline fun <reified T : Exportable> JsonElement?.toFieldList(): List<ExportColumn<T>> = diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt index affaab58..7a091379 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt @@ -144,48 +144,6 @@ public object DfltHostExportColumns { field = Types.required(INT64).named("cpu_time_lost"), ) { it.cpuLostTime } - // TODO: support multiple GPUs - - public val GPU_CAPACITY: ExportColumn<HostTableReader> = - ExportColumn( - field = Types.optional(FLOAT).named("gpu_capacity"), - ) { it.gpuCapacities.getOrNull(0) } - - public val GPU_USAGE: ExportColumn<HostTableReader> = - ExportColumn( - field = Types.optional(FLOAT).named("gpu_usage"), - ) { it.gpuUsages.getOrNull(0) } - - public val GPU_DEMAND: ExportColumn<HostTableReader> = - ExportColumn( - field = Types.optional(FLOAT).named("gpu_demand"), - ) { it.gpuDemands.getOrNull(0) } - - public val GPU_UTILIZATION: ExportColumn<HostTableReader> = - ExportColumn( - field = Types.optional(FLOAT).named("gpu_utilization"), - ) { it.gpuUtilizations.getOrNull(0) } - - public val GPU_TIME_ACTIVE: ExportColumn<HostTableReader> = - ExportColumn( - field = Types.optional(INT64).named("gpu_time_active"), - ) { it.gpuActiveTimes.getOrNull(0) } - - public val GPU_TIME_IDLE: ExportColumn<HostTableReader> = - ExportColumn( - field = Types.optional(INT64).named("gpu_time_idle"), - ) { it.gpuIdleTimes.getOrNull(0) } - - public val GPU_TIME_STEAL: ExportColumn<HostTableReader> = - ExportColumn( - field = Types.optional(INT64).named("gpu_time_steal"), - ) { it.gpuStealTimes.getOrNull(0) } - - public val GPU_TIME_LOST: ExportColumn<HostTableReader> = - ExportColumn( - field = Types.optional(INT64).named("gpu_time_lost"), - ) { it.gpuLostTimes.getOrNull(0) } - public val POWER_DRAW: ExportColumn<HostTableReader> = ExportColumn( field = Types.required(FLOAT).named("power_draw"), @@ -217,6 +175,42 @@ public object DfltHostExportColumns { ) { it.bootTime?.toEpochMilli() } /** + * Returns GPU-related export columns for the given number of GPUs. + */ + public fun gpuColumns(count: Int): Set<ExportColumn<HostTableReader>> = + (0 until count).flatMap { i -> + listOf<ExportColumn<HostTableReader>>( + ExportColumn( + field = Types.optional(FLOAT).named("gpu_capacity_$i"), + ) { it.gpuCapacities.getOrNull(i) }, + ExportColumn( + field = Types.optional(FLOAT).named("gpu_usage_$i"), + ) { it.gpuUsages.getOrNull(i) }, + ExportColumn( + field = Types.optional(FLOAT).named("gpu_demand_$i"), + ) { it.gpuDemands.getOrNull(i) }, + ExportColumn( + field = Types.optional(FLOAT).named("gpu_utilization_$i"), + ) { it.gpuUtilizations.getOrNull(i) }, + ExportColumn( + field = Types.optional(INT64).named("gpu_time_active_$i"), + ) { it.gpuActiveTimes.getOrNull(i) }, + ExportColumn( + field = Types.optional(INT64).named("gpu_time_idle_$i"), + ) { it.gpuIdleTimes.getOrNull(i) }, + ExportColumn( + field = Types.optional(INT64).named("gpu_time_steal_$i"), + ) { it.gpuStealTimes.getOrNull(i) }, + ExportColumn( + field = Types.optional(INT64).named("gpu_time_lost_$i"), + ) { it.gpuLostTimes.getOrNull(i) }, + ExportColumn( + field = Types.optional(FLOAT).named("gpu_power_draw_$i"), + ) { it.gpuPowerDraws.getOrNull(i) }, + ) + }.toSet() + + /** * The columns that are always included in the output file. */ internal val BASE_EXPORT_COLUMNS = diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt index ad7a1d52..07750114 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt @@ -132,42 +132,45 @@ public object DfltTaskExportColumns { field = Types.required(INT64).named("cpu_time_lost"), ) { it.cpuLostTime } - // TODO: support multiple GPUs + public val GPU_COUNT: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.optional(INT32).named("gpu_count"), + ) { it.taskInfo.gpuCount } - public val GPU_CAPACITY: ExportColumn<TaskTableReader> = + public val GPU_LIMIT: ExportColumn<TaskTableReader> = ExportColumn( - field = Types.optional(FLOAT).named("gpu_capacity"), - ) { it.gpuLimits?.getOrNull(0) } + field = Types.optional(FLOAT).named("gpu_limit"), + ) { it.gpuLimit } public val GPU_USAGE: ExportColumn<TaskTableReader> = ExportColumn( field = Types.optional(FLOAT).named("gpu_usage"), - ) { it.gpuUsages?.getOrNull(0) } + ) { it.gpuUsage } public val GPU_DEMAND: ExportColumn<TaskTableReader> = ExportColumn( field = Types.optional(FLOAT).named("gpu_demand"), - ) { it.gpuDemands?.getOrNull(0) } + ) { it.gpuDemand } public val GPU_TIME_ACTIVE: ExportColumn<TaskTableReader> = ExportColumn( field = Types.optional(INT64).named("gpu_time_active"), - ) { it.gpuActiveTimes?.getOrNull(0) } + ) { it.gpuActiveTime } public val GPU_TIME_IDLE: ExportColumn<TaskTableReader> = ExportColumn( field = Types.optional(INT64).named("gpu_time_idle"), - ) { it.gpuIdleTimes?.getOrNull(0) } + ) { it.gpuIdleTime } public val GPU_TIME_STEAL: ExportColumn<TaskTableReader> = ExportColumn( field = Types.optional(INT64).named("gpu_time_steal"), - ) { it.gpuStealTimes?.getOrNull(0) } + ) { it.gpuStealTime } public val GPU_TIME_LOST: ExportColumn<TaskTableReader> = ExportColumn( field = Types.optional(INT64).named("gpu_time_lost"), - ) { it.gpuLostTimes?.getOrNull(0) } + ) { it.gpuLostTime } public val UP_TIME: ExportColumn<TaskTableReader> = ExportColumn( diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReader.kt index fbffd508..f904ac9e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReader.kt @@ -113,16 +113,11 @@ public interface HostTableReader : Exportable { public val cpuLostTime: Long /** - * The capacity of the CPUs in the host (in MHz). + * The capacity of the GPUs in the host (in MHz).They inserted by GPU ID. */ public val gpuCapacities: ArrayList<Double> /** - * The capacity of the GPUs in the host (in MHz). They inserted by GPU ID. - */ - public val gpuLimits: ArrayList<Double> - - /** * The usage per GPU in the host (in MHz). They inserted by GPU ID */ public val gpuUsages: ArrayList<Double> @@ -158,6 +153,11 @@ public interface HostTableReader : Exportable { public val gpuLostTimes: ArrayList<Long> /** + * The power draw of the respective GPU in the host (in W). They inserted by GPU ID. + */ + public val gpuPowerDraws: ArrayList<Double> + + /** * The current power draw of the host in W. */ public val powerDraw: Double diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReaderImpl.kt index cb25358a..e46edc88 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReaderImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/host/HostTableReaderImpl.kt @@ -60,7 +60,6 @@ public class HostTableReaderImpl( _cpuLostTime = table.cpuLostTime // GPU stats _gpuCapacities = table.gpuCapacities - _gpuLimits = table.gpuLimits _gpuDemands = table.gpuDemands _gpuUsages = table.gpuUsages _gpuUtilizations = table.gpuUtilizations @@ -68,6 +67,7 @@ public class HostTableReaderImpl( _gpuIdleTimes = table.gpuIdleTimes _gpuStealTimes = table.gpuStealTimes _gpuLostTimes = table.gpuLostTimes + _gpuPowerDraws = table.gpuPowerDraws // energy & carbon stats _powerDraw = table.powerDraw @@ -152,10 +152,6 @@ public class HostTableReaderImpl( get() = _gpuCapacities private var _gpuCapacities: ArrayList<Double> = ArrayList() - override val gpuLimits: ArrayList<Double> - get() = _gpuLimits - private var _gpuLimits: ArrayList<Double> = ArrayList() - override val gpuUsages: ArrayList<Double> get() = _gpuUsages private var _gpuUsages: ArrayList<Double> = ArrayList() @@ -170,7 +166,6 @@ public class HostTableReaderImpl( // half of the CPU stats override val gpuActiveTimes: ArrayList<Long> -// get() = _gpuActiveTimes.zip(previousGpuActiveTimes) { current, previous -> current - previous} as ArrayList<Long> get() = (0 until _gpuActiveTimes.size).map { i -> @@ -180,7 +175,6 @@ public class HostTableReaderImpl( private var previousGpuActiveTimes: ArrayList<Long> = ArrayList() override val gpuIdleTimes: ArrayList<Long> -// get() = _gpuIdleTimes.zip(previousGpuIdleTimes) { current, previous -> current - previous} as ArrayList<Long> get() = (0 until _gpuIdleTimes.size).map { i -> @@ -207,6 +201,10 @@ public class HostTableReaderImpl( private var _gpuLostTimes: ArrayList<Long> = ArrayList() private var previousGpuLostTimes: ArrayList<Long> = ArrayList() + override val gpuPowerDraws: ArrayList<Double> + get() = _gpuPowerDraws + private var _gpuPowerDraws: ArrayList<Double> = ArrayList() + override val powerDraw: Double get() = _powerDraw private var _powerDraw = 0.0 @@ -258,7 +256,7 @@ public class HostTableReaderImpl( _cpuStealTime = hostCpuStats.stealTime _cpuLostTime = hostCpuStats.lostTime // GPU stats - _gpuLimits = hostGpuStats.map { it.capacity } as ArrayList<Double> + _gpuCapacities = hostGpuStats.map { it.capacity } as ArrayList<Double> _gpuDemands = hostGpuStats.map { it.demand } as ArrayList<Double> _gpuUsages = hostGpuStats.map { it.usage } as ArrayList<Double> _gpuUtilizations = hostGpuStats.map { it.utilization } as ArrayList<Double> @@ -266,6 +264,7 @@ public class HostTableReaderImpl( _gpuIdleTimes = hostGpuStats.map { it.idleTime } as ArrayList<Long> _gpuStealTimes = hostGpuStats.map { it.stealTime } as ArrayList<Long> _gpuLostTimes = hostGpuStats.map { it.lostTime } as ArrayList<Long> + _gpuPowerDraws = hostGpuStats.map { it.powerDraw } as ArrayList<Double> // energy & carbon stats _powerDraw = hostSysStats.powerDraw _energyUsage = hostSysStats.energyUsage diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskInfo.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskInfo.kt index e0b28379..c1a14613 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskInfo.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskInfo.kt @@ -32,4 +32,5 @@ public data class TaskInfo( val arch: String, val cpuCount: Int, val memCapacity: Long, + val gpuCount: Int, ) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt index f71587c7..e3860606 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt @@ -133,37 +133,37 @@ public interface TaskTableReader : Exportable { /** * The capacity of the GPUs of Host on which the task is running (in MHz). */ - public val gpuLimits: DoubleArray? + public val gpuLimit: Double? /** * The amount of GPus allocated to the task (in MHz). */ - public val gpuUsages: DoubleArray? + public val gpuUsage: Double? /** * The GPU demanded by this task (in MHz). */ - public val gpuDemands: DoubleArray? + public val gpuDemand: Double? /** * The duration (in seconds) that a GPU was active in the task. */ - public val gpuActiveTimes: LongArray? + public val gpuActiveTime: Long? /** * The duration (in seconds) that a GPU was idle in the task. */ - public val gpuIdleTimes: LongArray? + public val gpuIdleTime: Long? /** * The duration (in seconds) that a vGPU wanted to run, but no capacity was available. */ - public val gpuStealTimes: LongArray? + public val gpuStealTime: Long? /** * The duration (in seconds) of GPU time that was lost due to interference. */ - public val gpuLostTimes: LongArray? + public val gpuLostTime: Long? /** * The state of the task diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt index 6128c9a2..ce62fdb0 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt @@ -64,13 +64,13 @@ public class TaskTableReaderImpl( _cpuStealTime = table.cpuStealTime _cpuLostTime = table.cpuLostTime // GPU stats - _gpuLimits = table.gpuLimits - _gpuDemands = table.gpuDemands - _gpuUsages = table.gpuUsages - _gpuActiveTimes = table.gpuActiveTimes - _gpuIdleTimes = table.gpuIdleTimes - _gpuStealTimes = table.gpuStealTimes - _gpuLostTimes = table.gpuLostTimes + _gpuLimit = table.gpuLimit + _gpuDemand = table.gpuDemand + _gpuUsage = table.gpuUsage + _gpuActiveTime = table.gpuActiveTime + _gpuIdleTime = table.gpuIdleTime + _gpuStealTime = table.gpuStealTime + _gpuLostTime = table.gpuLostTime _uptime = table.uptime _downtime = table.downtime @@ -95,6 +95,7 @@ public class TaskTableReaderImpl( "x86", task.flavor.cpuCoreCount, task.flavor.memorySize, + task.flavor.gpuCoreCount, ) /** @@ -177,73 +178,37 @@ public class TaskTableReaderImpl( private var _cpuLostTime = 0L private var previousCpuLostTime = 0L - override val gpuLimits: DoubleArray? - get() = _gpuLimits ?: DoubleArray(0) - private var _gpuLimits: DoubleArray? = null + override val gpuLimit: Double? + get() = _gpuLimit + private var _gpuLimit: Double? = 0.0 - override val gpuUsages: DoubleArray? - get() = _gpuUsages ?: DoubleArray(0) - private var _gpuUsages: DoubleArray? = null + override val gpuUsage: Double? + get() = _gpuUsage + private var _gpuUsage: Double? = 0.0 - override val gpuDemands: DoubleArray? - get() = _gpuDemands ?: DoubleArray(0) - private var _gpuDemands: DoubleArray? = null + override val gpuDemand: Double? + get() = _gpuDemand + private var _gpuDemand: Double? = 0.0 - override val gpuActiveTimes: LongArray? - get() { - val current = _gpuActiveTimes ?: return LongArray(0) - val previous = previousGpuActiveTimes + override val gpuActiveTime: Long? + get() = (_gpuActiveTime ?: 0L) - (previousGpuActiveTime ?: 0L) + private var _gpuActiveTime: Long? = null + private var previousGpuActiveTime: Long? = null - return if (previous == null || current.size != previous.size) { // not sure if I like the second clause - current - } else { - LongArray(current.size) { i -> current[i] - previous[i] } - } - } - private var _gpuActiveTimes: LongArray? = null - private var previousGpuActiveTimes: LongArray? = null - - override val gpuIdleTimes: LongArray? - get() { - val current = _gpuIdleTimes ?: return LongArray(0) - val previous = previousGpuIdleTimes - - return if (previous == null || current.size != previous.size) { // not sure if I like the second clause - current - } else { - LongArray(current.size) { i -> current[i] - previous[i] } - } - } - private var _gpuIdleTimes: LongArray? = null - private var previousGpuIdleTimes: LongArray? = null - - override val gpuStealTimes: LongArray? - get() { - val current = _gpuStealTimes ?: return LongArray(0) - val previous = previousGpuStealTimes - - return if (previous == null || current.size != previous.size) { - current - } else { - LongArray(current.size) { i -> current[i] - previous[i] } - } - } - private var _gpuStealTimes: LongArray? = null - private var previousGpuStealTimes: LongArray? = null - - override val gpuLostTimes: LongArray? - get() { - val current = _gpuLostTimes ?: return LongArray(0) - val previous = previousGpuLostTimes - - return if (previous == null || current.size != previous.size) { - current - } else { - LongArray(current.size) { i -> current[i] - previous[i] } - } - } - private var _gpuLostTimes: LongArray? = null - private var previousGpuLostTimes: LongArray? = null + override val gpuIdleTime: Long? + get() = (_gpuIdleTime ?: 0L) - (previousGpuIdleTime ?: 0L) + private var _gpuIdleTime: Long? = null + private var previousGpuIdleTime: Long? = null + + override val gpuStealTime: Long? + get() = (_gpuStealTime ?: 0L) - (previousGpuStealTime ?: 0L) + private var _gpuStealTime: Long? = null + private var previousGpuStealTime: Long? = null + + override val gpuLostTime: Long? + get() = (_gpuLostTime ?: 0L) - (previousGpuLostTime ?: 0L) + private var _gpuLostTime: Long? = null + private var previousGpuLostTime: Long? = null override val taskState: TaskState? get() = _taskState @@ -292,24 +257,14 @@ public class TaskTableReaderImpl( _scheduleTime = task.scheduledAt _finishTime = task.finishedAt - if (gpuStats != null && gpuStats.isNotEmpty()) { - val size = gpuStats.size - _gpuLimits = DoubleArray(size) { i -> gpuStats[i].capacity } - _gpuDemands = DoubleArray(size) { i -> gpuStats[i].demand } - _gpuUsages = DoubleArray(size) { i -> gpuStats[i].usage } - _gpuActiveTimes = LongArray(size) { i -> gpuStats[i].activeTime } - _gpuIdleTimes = LongArray(size) { i -> gpuStats[i].idleTime } - _gpuStealTimes = LongArray(size) { i -> gpuStats[i].stealTime } - _gpuLostTimes = LongArray(size) { i -> gpuStats[i].lostTime } - } else { - _gpuIdleTimes = null - _gpuStealTimes = null - _gpuLostTimes = null - _gpuIdleTimes = null - _gpuLimits = null - _gpuUsages = null - _gpuDemands = null - _gpuActiveTimes = null + if (gpuStats != null) { + _gpuLimit = gpuStats.capacity + _gpuDemand = gpuStats.demand + _gpuUsage = gpuStats.usage + _gpuActiveTime = gpuStats.activeTime + _gpuIdleTime = gpuStats.idleTime + _gpuStealTime = gpuStats.stealTime + _gpuLostTime = gpuStats.lostTime } _taskState = task.state @@ -325,10 +280,10 @@ public class TaskTableReaderImpl( previousCpuIdleTime = _cpuIdleTime previousCpuStealTime = _cpuStealTime previousCpuLostTime = _cpuLostTime - previousGpuActiveTimes = _gpuActiveTimes - previousGpuIdleTimes = _gpuIdleTimes - previousGpuStealTimes = _gpuStealTimes - previousGpuLostTimes = _gpuLostTimes + previousGpuActiveTime = _gpuActiveTime + previousGpuIdleTime = _gpuIdleTime + previousGpuStealTime = _gpuStealTime + previousGpuLostTime = _gpuLostTime simHost = null _cpuLimit = 0.0 diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt index b52608a9..09a8fe64 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt @@ -157,6 +157,7 @@ private fun ClusterJSONSpec.toClusterSpec(): ClusterSpec { * Helper method to convert a [HostJSONSpec] into a [HostSpec]s. */ private var globalCoreId = 0 +private var globalGpuId = 0 private fun HostJSONSpec.toHostSpec(clusterName: String): HostSpec { val units = @@ -172,7 +173,7 @@ private fun HostJSONSpec.toHostSpec(clusterName: String): HostSpec { val gpuUnits = List(gpu?.count ?: 0) { GpuModel( - globalCoreId++, + globalGpuId++, gpu!!.coreCount, gpu.coreSpeed.toMHz(), gpu.memoryBandwidth.toKibps(), diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index f7c444ea..e14a06cc 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -30,7 +30,9 @@ import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor +import org.opendc.compute.simulator.telemetry.parquet.withGpuColumns import org.opendc.compute.topology.clusterTopology import org.opendc.experiments.base.experiment.Scenario import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec @@ -125,7 +127,17 @@ public fun runScenario( setupHosts(serviceDomain, topology, startTimeLong), ) - addExportModel(provisioner, serviceDomain, scenario, seed, startTime, scenario.id) + val gpuCount = topology.flatMap { it.hostSpecs }.maxOfOrNull { it.model.gpuModels.size } ?: 0 + addExportModel( + provisioner, + serviceDomain, + scenario, + seed, + startTime, + scenario.id, + computeExportConfig = + scenario.exportModelSpec.computeExportConfig.withGpuColumns(gpuCount), + ) val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! service.setTasksExpected(workload.size) @@ -180,6 +192,7 @@ public fun addExportModel( seed: Long, startTime: Duration, index: Int, + computeExportConfig: ComputeExportConfig = scenario.exportModelSpec.computeExportConfig, ) { provisioner.runStep( registerComputeMonitor( @@ -189,7 +202,7 @@ public fun addExportModel( "seed=$seed", bufferSize = 4096, scenario.exportModelSpec.filesToExportDict, - computeExportConfig = scenario.exportModelSpec.computeExportConfig, + computeExportConfig = computeExportConfig, ), Duration.ofSeconds(scenario.exportModelSpec.exportInterval), startTime, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt index 6365f60d..cd16f174 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt @@ -43,4 +43,18 @@ class ExperimentRunnerTest { val someDir = File("output") someDir.deleteRecursively() } + + /** + * ExperimentRunner test 2 + * This test runs the experiment defined in the experiment_2.json file. + * + * In this test, parts of the Marconi 100 workload is executed . This trace contains GPU tasks. + */ + @Test + fun testExperimentRunner2() { + ExperimentCommand().main(arrayOf("--experiment-path", "src/test/resources/experiments/experiment_2.json")) + + val someDir = File("output") + someDir.deleteRecursively() + } } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt index 7b7b23d2..606ba571 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt @@ -595,10 +595,10 @@ class FlowDistributorTest { { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, // GPU // task - { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(1)?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(10)?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1)?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(10)?.get(0)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(1)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(10)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(10)) { "The gpu used by task 0 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, @@ -643,10 +643,10 @@ class FlowDistributorTest { { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, // GPU // task - { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(0)?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, - { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(0)?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(0)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(1000.0, monitor.hostGpuSupplied["H01"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } }, @@ -691,10 +691,10 @@ class FlowDistributorTest { { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, // GPU // task - { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(0)?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, - { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(0)?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(0)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } }, // host { assertEquals(2000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, @@ -737,10 +737,10 @@ class FlowDistributorTest { { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, // GPU // task - { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(1)?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, - { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1)?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(1)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(1)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, @@ -799,17 +799,17 @@ class FlowDistributorTest { { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, // GPU // task 0 - { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(0)?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, - { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(0)?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(0)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } }, // task 1 - { assert(monitor.taskGpuDemands["1"]?.get(0)?.isEmpty() ?: false) { "The gpu demanded by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(10)?.get(0)) { "The gpu demanded by task 1 is incorrect" } }, - { assert(monitor.taskGpuDemands["1"]?.get(19)?.isEmpty() ?: false) { "The gpu demanded by task 1 is incorrect" } }, - { assert(monitor.taskGpuSupplied["1"]?.get(0)?.isEmpty() ?: false) { "The gpu used by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(10)?.get(0)) { "The gpu used by task 1 is incorrect" } }, - { assert(monitor.taskGpuSupplied["1"]?.get(19)?.isEmpty() ?: false) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(0.0, monitor.taskGpuDemands["1"]?.get(0)) { "The gpu demanded by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(10)) { "The gpu demanded by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(19)) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(0.0, monitor.taskGpuSupplied["1"]?.get(0)) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(10)) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(19)) { "The gpu used by task 1 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, @@ -865,15 +865,15 @@ class FlowDistributorTest { { assertEquals(0.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, // GPU // task 0 - { assertEquals(0.0, monitor.taskGpuDemands["0"]?.get(0)?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, - { assert(monitor.taskGpuDemands["0"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(0.0, monitor.taskGpuSupplied["0"]?.get(0)?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assert(monitor.taskGpuSupplied["0"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskGpuDemands["0"]?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskGpuDemands["0"]?.get(9)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskGpuSupplied["0"]?.get(0)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(0.0, monitor.taskGpuSupplied["0"]?.get(9)) { "The gpu used by task 0 is incorrect" } }, // task 1 - { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(0)?.get(0)) { "The gpu demanded by task 1 is incorrect" } }, - { assert(monitor.taskGpuDemands["1"]?.get(9)?.isEmpty() ?: false) { "The gpu demanded by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(0)?.get(0)) { "The gpu used by task 1 is incorrect" } }, - { assert(monitor.taskGpuSupplied["1"]?.get(9)?.isEmpty() ?: false) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(0)) { "The gpu demanded by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuDemands["1"]?.get(9)) { "The gpu demanded by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(0)) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.taskGpuSupplied["1"]?.get(9)) { "The gpu used by task 1 is incorrect" } }, // host { assertEquals(1000.0, monitor.hostGpuDemands["H01"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, { assertEquals(0.0, monitor.hostGpuDemands["H01"]?.get(10)?.get(0)) { "The gpu demanded by the host is incorrect" } }, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/GpuTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/GpuTest.kt index 6e5a6b5e..2778e613 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/GpuTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/GpuTest.kt @@ -119,17 +119,16 @@ class GpuTest { @Test fun testGpuHostCreationMultiMinimal() { val topology = createTopology("Gpus/multi_gpu_no_vendor_no_memory.json") - val count = 3 assertGpuConfiguration( topology, - coreCount = 1 * count, + coreCount = 1, coreSpeed = 2000.0, - memorySize = -1L * count, + memorySize = -1L, memoryBandwidth = -1.0, vendor = "unknown", modelName = "unknown", architecture = "unknown", - gpuCount = 1, + gpuCount = 3, ) } @@ -139,18 +138,17 @@ class GpuTest { @Test fun testGpuHostCreationMultiWithMemoryNoVendor() { val topology = createTopology("Gpus/multi_gpu_no_vendor.json") - val count = 100 assertGpuConfiguration( topology, - coreCount = 1 * count, + coreCount = 1, coreSpeed = 2000.0, - memorySize = 4096L * count, + memorySize = 4096L, memoryBandwidth = 500.0, vendor = "unknown", modelName = "unknown", architecture = "unknown", - gpuCount = 1, + gpuCount = 100, ) } @@ -160,17 +158,16 @@ class GpuTest { @Test fun testGpuHostCreationMultiNoMemoryWithVendor() { val topology = createTopology("Gpus/multi_gpu_no_memory.json") - val count = 2 assertGpuConfiguration( topology, - coreCount = 1 * count, + coreCount = 1, coreSpeed = 2000.0, - memorySize = -1L * count, + memorySize = -1L, memoryBandwidth = -1.0, vendor = "NVIDIA", modelName = "Tesla V100", architecture = "Volta", - gpuCount = 1, + gpuCount = 2, ) } @@ -180,19 +177,20 @@ class GpuTest { @Test fun testGpuHostCreationMultiWithMemoryWithVendor() { val topology = createTopology("Gpus/multi_gpu_full.json") + // temporary implementation, to account for GPU concatenation val count = 5 assertGpuConfiguration( topology, // cuda cores - coreCount = 5120 * count, + coreCount = 5120, // fictional value coreSpeed = 5000.0, - memorySize = 30517578125 * count, + memorySize = 30517578125, memoryBandwidth = 7031250000.0, vendor = "NVIDIA", modelName = "Tesla V100", architecture = "Volta", - gpuCount = 1, + gpuCount = 5, ) } @@ -243,20 +241,26 @@ class GpuTest { { assertEquals(2000.0, monitor.hostCpuSupplied["DualGpuHost"]?.get(9)) { "The cpu used by the host is incorrect" } }, // GPU // task 0 - { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(1)?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(8)?.get(0)) { "The gpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(1)?.get(0)) { "The gpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(8)?.get(0)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(1)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuDemands["0"]?.get(8)) { "The gpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(1)) { "The gpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied["0"]?.get(8)) { "The gpu used by task 0 is incorrect" } }, // task 1 - { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(1)?.get(0)) { "The gpu demanded by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(8)?.get(0)) { "The gpu demanded by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(1)?.get(0)) { "The gpu used by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(8)?.get(0)) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(1)) { "The gpu demanded by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuDemands["1"]?.get(8)) { "The gpu demanded by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(1)) { "The gpu used by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskGpuSupplied["1"]?.get(8)) { "The gpu used by task 1 is incorrect" } }, // host - { assertEquals(4000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(9)?.get(0)) { "The gpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(9)?.get(0)) { "The gpu used by the host is incorrect" } }, + // GPU 0 + { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(0)) { "The gpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(9)?.get(0)) { "The gpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(0)) { "The gpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(9)?.get(0)) { "The gpu used by the host is incorrect" } }, + // GPU 1 + { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(1)?.get(1)) { "The gpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostGpuDemands["DualGpuHost"]?.get(9)?.get(1)) { "The gpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(1)?.get(1)) { "The gpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostGpuSupplied["DualGpuHost"]?.get(9)?.get(1)) { "The gpu used by the host is incorrect" } }, ) } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt index 59b8d070..7b3db348 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt @@ -148,8 +148,8 @@ fun runTest( class TestComputeMonitor : ComputeMonitor { var taskCpuDemands = mutableMapOf<String, ArrayList<Double>>() var taskCpuSupplied = mutableMapOf<String, ArrayList<Double>>() - var taskGpuDemands = mutableMapOf<String, ArrayList<DoubleArray?>>() - var taskGpuSupplied = mutableMapOf<String, ArrayList<DoubleArray?>>() + var taskGpuDemands = mutableMapOf<String, ArrayList<Double?>?>() + var taskGpuSupplied = mutableMapOf<String, ArrayList<Double?>?>() override fun record(reader: TaskTableReader) { val taskName: String = reader.taskInfo.name @@ -162,11 +162,11 @@ class TestComputeMonitor : ComputeMonitor { taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage) } if (taskName in taskGpuDemands) { - taskGpuDemands[taskName]?.add(reader.gpuDemands) - taskGpuSupplied[taskName]?.add(reader.gpuUsages) + taskGpuDemands[taskName]?.add(reader.gpuDemand) + taskGpuSupplied[taskName]?.add(reader.gpuUsage) } else { - taskGpuDemands[taskName] = arrayListOf(reader.gpuDemands) - taskGpuSupplied[taskName] = arrayListOf(reader.gpuUsages) + taskGpuDemands[taskName] = arrayListOf(reader.gpuDemand) + taskGpuSupplied[taskName] = arrayListOf(reader.gpuUsage) } } diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/experiments/experiment_2.json b/opendc-experiments/opendc-experiments-base/src/test/resources/experiments/experiment_2.json new file mode 100644 index 00000000..65fb4b5f --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/experiments/experiment_2.json @@ -0,0 +1,23 @@ +{ + "topologies": [ + { + "pathToFile": "src/test/resources/topologies/Gpus/small_gpu.json" + } + ], + "workloads": [{ + "pathToFile": "src/test/resources/workloadTraces/small_gpu", + "type": "ComputeWorkload" + }], + "allocationPolicies": [ + { + "type": "prefab", + "name": "ProvisionedCpuGpuCores" + } + ], + "exportModels": [ + { + "exportInterval": 3600, + "printFrequency": 24 + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/small_gpu.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/small_gpu.json new file mode 100644 index 00000000..4848c303 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/Gpus/small_gpu.json @@ -0,0 +1,37 @@ +{ + "clusters": [ + { + "name": "C01", + "hosts": [ + { + "name": "H01", + "cpu": { + "coreCount": 4, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + }, + "cpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "gpu": { + "coreCount": 2, + "coreSpeed": 2000, + "count": 3 + }, + "gpuPowerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 + }, + "count": 4 + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/fragments.parquet Binary files differnew file mode 100644 index 00000000..7dda2c97 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/fragments.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/tasks.parquet Binary files differnew file mode 100644 index 00000000..23331729 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/workloadTraces/small_gpu/tasks.parquet diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index 8792552e..c9e3ab8c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -211,27 +211,43 @@ public class SimMachine { ResourceType.CPU, new ArrayList<>(List.of(new SimCpu(engine, this.machineModel.getCpuModel(), cpuPowerModel, 0)))); - new FlowEdge((FlowConsumer) this.computeResources.get(ResourceType.CPU).getFirst(), this.psu); + // Connect the CPU to the PSU + new FlowEdge( + (FlowConsumer) this.computeResources.get(ResourceType.CPU).getFirst(), + this.psu, + ResourceType.POWER, + 0, + -1); // Create a FlowDistributor and add the cpu as supplier this.distributors.put(ResourceType.CPU, new FlowDistributor(engine)); - new FlowEdge(this.distributors.get(ResourceType.CPU), (FlowSupplier) - this.computeResources.get(ResourceType.CPU).getFirst()); + new FlowEdge( + this.distributors.get(ResourceType.CPU), + (FlowSupplier) this.computeResources.get(ResourceType.CPU).getFirst(), + ResourceType.CPU, + -1, + 0); // TODO: include memory as flow node this.memory = new Memory(engine, this.machineModel.getMemory()); if (this.availableResources.contains(ResourceType.GPU)) { this.distributors.put(ResourceType.GPU, new FlowDistributor(engine)); - short i = 0; ArrayList<ComputeResource> gpus = new ArrayList<>(); for (GpuModel gpuModel : machineModel.getGpuModels()) { - SimGpu gpu = new SimGpu(engine, gpuModel, gpuPowerModel, i); + // create a new GPU + SimGpu gpu = new SimGpu(engine, gpuModel, gpuPowerModel, gpuModel.getId()); gpus.add(gpu); - // suspends here without the distributor - new FlowEdge(this.distributors.get(ResourceType.GPU), gpu); - new FlowEdge(gpu, this.psu); + // Connect the GPU to the distributor + new FlowEdge( + this.distributors.get(ResourceType.GPU), + gpu, + ResourceType.GPU, + gpuModel.getId(), + gpuModel.getId()); + // Connect the GPU to the PSU + new FlowEdge(gpu, this.psu, ResourceType.POWER, gpuModel.getId(), gpuModel.getId()); } this.computeResources.put(ResourceType.GPU, gpus); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java index 874194f6..e11d9cf2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/models/MachineModel.java @@ -35,7 +35,6 @@ import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicy public final class MachineModel { private final CpuModel cpuModel; private final MemoryUnit memory; - // private final List<GpuModel> gpuModels = new ArrayList<>(); // TODO: Implement multi GPU support private final List<GpuModel> gpuModels; private final DistributionPolicy cpuDistributionStrategy; private final DistributionPolicy gpuDistributionPolicy; @@ -58,19 +57,8 @@ public final class MachineModel { this.gpuDistributionPolicy = gpuDistributionPolicy; this.availableResources.add(ResourceType.CPU); // TODO: Add Memory - // this.usedResources.add(ResourceType.Memory); if (gpuModels != null && !gpuModels.isEmpty()) { - // this.gpuModels = gpuModels; - this.gpuModels = new ArrayList<>(); - this.gpuModels.add(new GpuModel( - 0, - gpuModels.getFirst().getCoreCount() * gpuModels.size(), // merges multiple GPUs into one - gpuModels.getFirst().getCoreSpeed(), - gpuModels.getFirst().getMemoryBandwidth(), - gpuModels.getFirst().getMemorySize() * gpuModels.size(), // merges multiple GPUs into one - gpuModels.getFirst().getVendor(), - gpuModels.getFirst().getModelName(), - gpuModels.getFirst().getArchitecture())); + this.gpuModels = gpuModels; this.availableResources.add(ResourceType.GPU); } else { this.gpuModels = new ArrayList<>(); diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java index 1ea7c570..f40f4fec 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java @@ -43,11 +43,14 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer private static final Logger LOGGER = LoggerFactory.getLogger(SimPsu.class); private long lastUpdate; - private final HashMap<ResourceType, ArrayList<Double>> powerDemandsPerResource = new HashMap<>(); - private final HashMap<ResourceType, ArrayList<Double>> powerSuppliedPerResource = new HashMap<>(); + private final HashMap<ResourceType, HashMap<Integer, Double>> powerDemandsPerResource = new HashMap<>(); + private final HashMap<ResourceType, HashMap<Integer, Double>> powerSuppliedPerResource = new HashMap<>(); + + private double totalPowerDemand = 0.0; + private double totalPowerSupplied = 0.0; private double totalEnergyUsage = 0.0; - private final HashMap<ResourceType, ArrayList<FlowEdge>> resourceEdges = new HashMap<>(); + private final HashMap<ResourceType, HashMap<Integer, FlowEdge>> resourceEdges = new HashMap<>(); private FlowEdge powerSupplyEdge; private final double capacity = Long.MAX_VALUE; @@ -72,28 +75,51 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer * This method provides access to the power consumption of the machine before PSU losses are applied. */ public double getPowerDemand() { - return this.powerDemandsPerResource.values().stream() - .flatMap(List::stream) - .findFirst() - .orElse(0.0); + return this.totalPowerDemand; } + /** + * Return the power demand of the machine (in W) measured in the PSU for a specific resource type. + * <p> + * This method provides access to the power consumption of the machine before PSU losses are applied. + */ public double getPowerDemand(ResourceType resourceType) { - return this.powerDemandsPerResource.get(resourceType).getFirst(); + // return this.powerDemandsPerResource.get(resourceType).stream().mapToDouble(Double::doubleValue).sum(); + return this.powerDemandsPerResource.get(resourceType).values().stream() + .mapToDouble(Double::doubleValue) + .sum(); + } + + /** + * Return the power demand of the machine (in W) measured in the PSU for a specific resource type for a specific resource. + * <p> + * This method provides access to the power consumption of the machine before PSU losses are applied. + */ + public double getPowerDemand(ResourceType resourceType, int id) { + return this.powerDemandsPerResource.get(resourceType).get(id); } /** * Return the instantaneous power usage of the machine (in W) measured at the InPort of the power supply. */ public double getPowerDraw() { - return this.powerSuppliedPerResource.values().stream() - .flatMap(List::stream) - .findFirst() - .orElse(0.0); + return this.totalPowerSupplied; } + /** + * Return the instantaneous power usage of the machine (in W) measured at the InPort of the power supply for a specific resource type. + */ public double getPowerDraw(ResourceType resourceType) { - return this.powerSuppliedPerResource.get(resourceType).getFirst(); + return this.powerSuppliedPerResource.get(resourceType).values().stream() + .mapToDouble(Double::doubleValue) + .sum(); + } + + /** + * Return the instantaneous power usage of the machine (in W) measured at the InPort of the power supply for a specific resource type for a specific resource. + */ + public double getPowerDraw(ResourceType resourceType, int id) { + return this.powerSuppliedPerResource.get(resourceType).get(id); } /** @@ -127,16 +153,22 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer public long onUpdate(long now) { updateCounters(); for (ResourceType resourceType : this.resourceEdges.keySet()) { - ArrayList<FlowEdge> edges = this.resourceEdges.get(resourceType); + HashMap<Integer, FlowEdge> edges = this.resourceEdges.get(resourceType); if (edges != null && !edges.isEmpty()) { - double powerSupply = - this.powerDemandsPerResource.get(resourceType).getFirst(); - double powerSupplied = - this.powerSuppliedPerResource.get(resourceType).getFirst(); - - if (powerSupply != powerSupplied) { - for (FlowEdge edge : edges) { - edge.pushSupply(powerSupply); + for (FlowEdge edge : edges.values()) { + // If the edge is null, it means that the edge has been removed -> no update is needed + if (edge == null) { + continue; + } + + int consumerIndex = edge.getConsumerIndex() == -1 ? 0 : edge.getConsumerIndex(); + double powerDemand = + this.powerDemandsPerResource.get(resourceType).get(consumerIndex); + double powerSupplied = + this.powerSuppliedPerResource.get(resourceType).get(consumerIndex); + + if (powerDemand != powerSupplied) { + edge.pushSupply(powerDemand); } } } @@ -159,7 +191,8 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer long duration = now - lastUpdate; if (duration > 0) { for (ResourceType resourceType : this.powerSuppliedPerResource.keySet()) { - for (double powerSupplied : this.powerSuppliedPerResource.get(resourceType)) { + for (double powerSupplied : + this.powerSuppliedPerResource.get(resourceType).values()) { this.totalEnergyUsage += (powerSupplied * duration * 0.001); } } @@ -171,17 +204,8 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @Override - public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand, ResourceType resourceType) { - this.powerDemandsPerResource.put(resourceType, new ArrayList<>(List.of(newDemand))); - powerSupplyEdge.pushDemand(newDemand); - } - - @Override public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - double totalDemand = this.powerDemandsPerResource.values().stream() - .flatMap(List::stream) - .reduce(0.0, Double::sum); - this.powerSupplyEdge.pushDemand(totalDemand); + this.powerSupplyEdge.pushDemand(newDemand); } @Override @@ -191,7 +215,13 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer @Override public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply, ResourceType resourceType) { - this.powerSuppliedPerResource.put(resourceType, new ArrayList<>(List.of(newSupply))); + int consumerIndex = consumerEdge.getConsumerIndex() == -1 ? 0 : consumerEdge.getConsumerIndex(); + + double previousSupply = this.powerSuppliedPerResource.get(resourceType).get(consumerIndex); + this.totalPowerSupplied += newSupply - previousSupply; + + this.powerSuppliedPerResource.get(resourceType).put(consumerIndex, newSupply); + consumerEdge.pushSupply(newSupply, false, resourceType); } @@ -203,18 +233,29 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer @Override public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand, ResourceType resourceType) { updateCounters(); - this.powerDemandsPerResource.put(resourceType, new ArrayList<>(List.of(newPowerDemand))); + int consumerIndex = consumerEdge.getConsumerIndex() == -1 ? 0 : consumerEdge.getConsumerIndex(); + + double previousPowerDemand = + this.powerDemandsPerResource.get(resourceType).get(consumerIndex); + this.totalPowerDemand += newPowerDemand - previousPowerDemand; - pushOutgoingDemand(this.powerSupplyEdge, newPowerDemand); + this.powerDemandsPerResource.get(resourceType).put(consumerIndex, newPowerDemand); + + pushOutgoingDemand(this.powerSupplyEdge, totalPowerDemand); } @Override public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { updateCounters(); for (ResourceType resourceType : this.resourceEdges.keySet()) { - for (FlowEdge edge : this.resourceEdges.get(resourceType)) { + for (FlowEdge edge : this.resourceEdges.get(resourceType).values()) { + // If the edge is null, it means that the edge has been removed -> no update is needed + if (edge == null) { + continue; + } + int consumerIndex = edge.getConsumerIndex() == -1 ? 0 : edge.getConsumerIndex(); double outgoingSupply = - Math.min(this.powerDemandsPerResource.get(resourceType).getFirst(), newSupply); + Math.min(this.powerDemandsPerResource.get(resourceType).get(consumerIndex), newSupply); pushOutgoingSupply(edge, outgoingSupply, resourceType); } } @@ -222,10 +263,19 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer @Override public void addConsumerEdge(FlowEdge consumerEdge) { + ResourceType consumerResourceType = consumerEdge.getConsumerResourceType(); - this.resourceEdges.put(consumerResourceType, new ArrayList<>(List.of(consumerEdge))); - this.powerDemandsPerResource.put(consumerResourceType, new ArrayList<>(List.of(0.0))); - this.powerSuppliedPerResource.put(consumerResourceType, new ArrayList<>(List.of(0.0))); + int consumerIndex = consumerEdge.getConsumerIndex() == -1 ? 0 : consumerEdge.getConsumerIndex(); + + if (!this.resourceEdges.containsKey(consumerResourceType)) { + this.resourceEdges.put(consumerResourceType, new HashMap<>()); + this.powerDemandsPerResource.put(consumerResourceType, new HashMap<>()); + this.powerSuppliedPerResource.put(consumerResourceType, new HashMap<>()); + } + + this.resourceEdges.get(consumerResourceType).put(consumerIndex, consumerEdge); + this.powerDemandsPerResource.get(consumerResourceType).put(consumerIndex, 0.0); + this.powerSuppliedPerResource.get(consumerResourceType).put(consumerIndex, 0.0); } @Override @@ -236,10 +286,18 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer @Override public void removeConsumerEdge(FlowEdge consumerEdge) { ResourceType resourceType = consumerEdge.getConsumerResourceType(); + int consumerIndex = consumerEdge.getConsumerIndex() == -1 ? 0 : consumerEdge.getConsumerIndex(); + if (this.resourceEdges.containsKey(resourceType)) { - this.resourceEdges.remove(resourceType); - this.powerDemandsPerResource.remove(resourceType); - this.powerSuppliedPerResource.remove(resourceType); + this.resourceEdges.get(resourceType).put(consumerIndex, null); + + this.totalPowerDemand -= + this.powerDemandsPerResource.get(resourceType).get(consumerIndex); + this.powerDemandsPerResource.get(resourceType).put(consumerIndex, 0.0); + + this.totalPowerSupplied -= + this.powerSuppliedPerResource.get(resourceType).get(consumerIndex); + this.powerSuppliedPerResource.get(resourceType).put(consumerIndex, 0.0); } } @@ -252,7 +310,8 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { List<FlowEdge> supplyingEdges = new ArrayList<>(); for (ResourceType resourceType : this.resourceEdges.keySet()) { - List<FlowEdge> edges = this.resourceEdges.get(resourceType); + List<FlowEdge> edges = + this.resourceEdges.get(resourceType).values().stream().toList(); if (edges != null && !edges.isEmpty()) { supplyingEdges.addAll(edges); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java index 622d2b89..8922a97d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/VirtualMachine.java @@ -56,7 +56,7 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier { private final Hashtable<ResourceType, Double> resourceCapacities = new Hashtable<>(); private final Hashtable<ResourceType, Double> resourceTimeScalingFactor = new Hashtable<>(); // formerly known as d private final Hashtable<ResourceType, FlowEdge> distributorEdges = new Hashtable<>(); - private final Hashtable<ResourceType, List<PerformanceCounters>> resourcePerformanceCounters = new Hashtable<>(); + private final Hashtable<ResourceType, PerformanceCounters> resourcePerformanceCounters = new Hashtable<>(); private final long checkpointInterval; private final long checkpointDuration; @@ -108,21 +108,11 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier { } public PerformanceCounters getCpuPerformanceCounters() { - return this.resourcePerformanceCounters.get(ResourceType.CPU).getFirst(); + return this.resourcePerformanceCounters.get(ResourceType.CPU); } - public List<PerformanceCounters> getGpuPerformanceCounters() { - return this.resourcePerformanceCounters.get(ResourceType.GPU) != null - ? this.resourcePerformanceCounters.get(ResourceType.GPU) - : new ArrayList<>(); - } - - public PerformanceCounters getGpuPerformanceCounters(int gpuId) { - List<PerformanceCounters> gpuPerformanceCounters = this.resourcePerformanceCounters.get(ResourceType.GPU); - if (gpuId < 0 || gpuId >= gpuPerformanceCounters.size()) { - throw new IndexOutOfBoundsException("No such GPU id: " + gpuId); - } - return gpuPerformanceCounters.get(gpuId); + public PerformanceCounters getGpuPerformanceCounters() { + return this.resourcePerformanceCounters.get(ResourceType.GPU); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -171,13 +161,9 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier { this.resourceCapacities.put(resourceType, resources.getFirst().getCapacity()); - ArrayList<PerformanceCounters> performanceCounters = new ArrayList<>(); - - for (ComputeResource resource : resources) { - performanceCounters.add(new PerformanceCounters()); - this.resourceTimeScalingFactor.put(resourceType, 1.0 / resource.getCapacity()); - } - this.resourcePerformanceCounters.put(resourceType, performanceCounters); + this.resourceTimeScalingFactor.put( + resourceType, 1.0 / resources.getFirst().getCapacity()); + this.resourcePerformanceCounters.put(resourceType, new PerformanceCounters()); this.resourceDemands.put(resourceType, 0.0); this.resourceSupplies.put(resourceType, 0.0); } @@ -225,21 +211,24 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier { for (ResourceType resourceType : this.availableResources) { int i = 0; final double factor = this.resourceTimeScalingFactor.get(resourceType) * delta; - for (PerformanceCounters performanceCounter : this.resourcePerformanceCounters.get(resourceType)) { - if (delta > 0) { - performanceCounter.addActiveTime(Math.round(this.resourceSupplies.get(resourceType) * factor)); - performanceCounter.setIdleTime(Math.round( - (this.resourceCapacities.get(resourceType) - this.resourceSupplies.get(resourceType)) - * factor)); - performanceCounter.addStealTime(Math.round( - (this.resourceDemands.get(resourceType) - this.resourceSupplies.get(resourceType)) - * factor)); - } - performanceCounter.setDemand(this.resourceDemands.get(resourceType)); - performanceCounter.setSupply(this.resourceSupplies.get(resourceType)); - performanceCounter.setCapacity(this.resourceCapacities.get(resourceType)); - i++; + if (delta > 0) { + this.resourcePerformanceCounters + .get(resourceType) + .addActiveTime(Math.round(this.resourceSupplies.get(resourceType) * factor)); + this.resourcePerformanceCounters + .get(resourceType) + .setIdleTime(Math.round( + (this.resourceCapacities.get(resourceType) - this.resourceSupplies.get(resourceType)) + * factor)); + this.resourcePerformanceCounters + .get(resourceType) + .addStealTime(Math.round( + (this.resourceDemands.get(resourceType) - this.resourceSupplies.get(resourceType)) + * factor)); } + this.resourcePerformanceCounters.get(resourceType).setDemand(this.resourceDemands.get(resourceType)); + this.resourcePerformanceCounters.get(resourceType).setSupply(this.resourceSupplies.get(resourceType)); + this.resourcePerformanceCounters.get(resourceType).setCapacity(this.resourceCapacities.get(resourceType)); } } @@ -317,7 +306,12 @@ public final class VirtualMachine extends SimWorkload implements FlowSupplier { @Override public void addSupplierEdge(FlowEdge supplierEdge) { ResourceType resourceType = supplierEdge.getSupplierResourceType(); - this.resourceCapacities.put(resourceType, supplierEdge.getCapacity()); + if (this.resourceCapacities.containsKey(resourceType) && this.resourceCapacities.get(resourceType) > 0) { + this.resourceCapacities.put( + resourceType, this.resourceCapacities.get(resourceType) + supplierEdge.getCapacity()); + } else { + this.resourceCapacities.put(resourceType, supplierEdge.getCapacity()); + } this.distributorEdges.put(resourceType, supplierEdge); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 674db8ca..f7fc2728 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -23,6 +23,7 @@ package org.opendc.simulator.engine.graph; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -32,20 +33,30 @@ import org.opendc.common.ResourceType; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicy; import org.opendc.simulator.engine.graph.distributionPolicies.MaxMinFairnessPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class); private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); - private FlowEdge supplierEdge; + private HashMap<Integer, FlowEdge> supplierEdges = + new HashMap<>(); // The suppliers that provide supply to this distributor private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers private double totalIncomingDemand; // The total demand of all the consumers - private double currentIncomingSupply; // The current supply provided by the supplier + // AS index is based on the supplierIndex of the FlowEdge, ids of entries need to be stable + private HashMap<Integer, Double> currentIncomingSupplies = + new HashMap<>(); // The current supply provided by the suppliers + private Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers private boolean outgoingDemandUpdateNeeded = false; private Set<Integer> updatedDemands = new HashSet<>(); // Array of consumers that updated their demand in this cycle + private ResourceType supplierResourceType; + private ResourceType consumerResourceType; + private boolean overloaded = false; private double capacity; // What is the max capacity. Can probably be removed @@ -66,7 +77,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu } public double getCurrentIncomingSupply() { - return currentIncomingSupply; + return this.totalIncomingSupply; } public double getCapacity() { @@ -90,7 +101,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu } private void updateOutgoingDemand() { - this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand); + // equally distribute the demand to all suppliers + for (FlowEdge supplierEdge : this.supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size()); + // alternatively a relative share could be used, based on capacity minus current incoming supply + // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() - + // currentIncomingSupplies.get(idx) / supplierEdges.size())); + } this.outgoingDemandUpdateNeeded = false; @@ -102,11 +119,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu // If the demand is higher than the current supply, the system is overloaded. // The available supply is distributed based on the current distribution function. - if (this.totalIncomingDemand > this.currentIncomingSupply) { + if (this.totalIncomingDemand > this.totalIncomingSupply) { this.overloaded = true; - double[] supplies = - this.distributionPolicy.distributeSupply(this.incomingDemands, this.currentIncomingSupply); + double[] supplies = this.distributionPolicy.distributeSupply( + this.incomingDemands, + new ArrayList<>(this.currentIncomingSupplies.values()), + this.totalIncomingSupply); for (int idx = 0; idx < this.consumerEdges.size(); idx++) { this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); @@ -151,13 +170,18 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu this.consumerEdges.add(consumerEdge); this.incomingDemands.add(0.0); this.outgoingSupplies.add(0.0); + this.consumerResourceType = consumerEdge.getConsumerResourceType(); } @Override public void addSupplierEdge(FlowEdge supplierEdge) { - this.supplierEdge = supplierEdge; - this.capacity = supplierEdge.getCapacity(); - this.currentIncomingSupply = 0; + // supplierIndex not always set, so we use 0 as default to avoid index out of bounds + int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); + + this.supplierEdges.put(idx, supplierEdge); + this.capacity += supplierEdge.getCapacity(); + this.currentIncomingSupplies.put(idx, 0.0); + this.supplierResourceType = supplierEdge.getSupplierResourceType(); } @Override @@ -202,13 +226,16 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu @Override public void removeSupplierEdge(FlowEdge supplierEdge) { - this.supplierEdge = null; - this.capacity = 0; - this.currentIncomingSupply = 0; - - this.updatedDemands.clear(); - - this.closeNode(); + // supplierIndex not always set, so we use 0 as default to avoid index out of bounds + int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); + // to keep index consistent, entries are neutralized instead of removed + this.supplierEdges.put(idx, null); + this.capacity -= supplierEdge.getCapacity(); + this.currentIncomingSupplies.put(idx, 0.0); + + if (this.supplierEdges.isEmpty()) { + this.updatedDemands.clear(); + } } @Override @@ -216,7 +243,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu int idx = consumerEdge.getConsumerIndex(); if (idx == -1) { - System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); + LOGGER.warn("Demand {} pushed by an unknown consumer", newDemand); return; } @@ -224,6 +251,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu double prevDemand = incomingDemands.get(idx); incomingDemands.set(idx, newDemand); + // only update the total supply if the new supply is different from the previous one this.totalIncomingDemand += (newDemand - prevDemand); this.updatedDemands.add(idx); @@ -243,14 +271,20 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu @Override public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { - this.currentIncomingSupply = newSupply; + // supplierIndex not always set, so we use 0 as default to avoid index out of bounds + int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); + double prevSupply = currentIncomingSupplies.get(idx); + + currentIncomingSupplies.put(idx, newSupply); + // only update the total supply if the new supply is different from the previous one + this.totalIncomingSupply += (newSupply - prevSupply); this.invalidate(); } @Override public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - this.supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType()); + supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType()); } @Override @@ -267,23 +301,25 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu outgoingSupplies.set(idx, newSupply); consumerEdge.pushSupply(newSupply, false, this.getSupplierResourceType()); - consumerEdge.pushSupply(newSupply, false, this.getSupplierResourceType()); } @Override public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { - List<FlowEdge> supplyingEdges = (this.supplierEdge != null) ? List.of(this.supplierEdge) : List.of(); - - return Map.of(FlowEdge.NodeType.CONSUMING, supplyingEdges, FlowEdge.NodeType.SUPPLYING, this.consumerEdges); + return Map.of( + FlowEdge.NodeType.CONSUMING, + this.consumerEdges, + FlowEdge.NodeType.SUPPLYING, + new ArrayList<>(this.supplierEdges.values())); } @Override public ResourceType getSupplierResourceType() { - return this.supplierEdge.getSupplierResourceType(); + // return this.supplierEdge.getSupplierResourceType(); + return this.supplierResourceType; } @Override public ResourceType getConsumerResourceType() { - return this.consumerEdges.getFirst().getConsumerResourceType(); + return this.consumerResourceType; } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java index aa3894c1..db2a2944 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -54,6 +54,15 @@ public class FlowEdge { } public FlowEdge(FlowConsumer consumer, FlowSupplier supplier, ResourceType resourceType) { + this(consumer, supplier, resourceType, -1, -1); + } + + public FlowEdge( + FlowConsumer consumer, + FlowSupplier supplier, + ResourceType resourceType, + int consumerIndex, + int supplierIndex) { if (!(consumer instanceof FlowNode)) { throw new IllegalArgumentException("Flow consumer is not a FlowNode"); } @@ -67,6 +76,10 @@ public class FlowEdge { this.capacity = supplier.getCapacity(resourceType); + // to avoid race condition of setting indices and requiring them in the PSU + this.supplierIndex = supplierIndex; + this.consumerIndex = consumerIndex; + this.consumer.addSupplierEdge(this); this.supplier.addConsumerEdge(this); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java index 9d2246cd..3a8bebbc 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java @@ -25,5 +25,5 @@ package org.opendc.simulator.engine.graph.distributionPolicies; import java.util.ArrayList; public interface DistributionPolicy { - double[] distributeSupply(ArrayList<Double> supply, double currentSupply); + double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java index 40d70b5e..baa04975 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java @@ -42,7 +42,7 @@ public class FixedShare implements DistributionPolicy { } @Override - public double[] distributeSupply(ArrayList<Double> supply, double currentSupply) { + public double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply) { return new double[0]; } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java index 1d387349..484e7fe4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java @@ -34,7 +34,7 @@ public class MaxMinFairnessPolicy implements DistributionPolicy { private record Demand(int idx, double value) {} @Override - public double[] distributeSupply(ArrayList<Double> demands, double currentSupply) { + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { int inputSize = demands.size(); final double[] supplies = new double[inputSize]; @@ -50,7 +50,7 @@ public class MaxMinFairnessPolicy implements DistributionPolicy { return i1.compareTo(i2); }); - double availableCapacity = currentSupply; // totalSupply + double availableCapacity = totalSupply; for (int i = 0; i < inputSize; i++) { double d = tempDemands[i].value; diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt index e7d35630..53e594de 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt @@ -110,6 +110,12 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) : Types .required(PrimitiveType.PrimitiveTypeName.DOUBLE) .named("cpuUsage"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpuCount"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpuUsage"), ) .named("resource_state") @@ -137,6 +143,12 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) : Types .required(PrimitiveType.PrimitiveTypeName.DOUBLE) .named("cpu_usage"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_usage"), ) .named("resource_state") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt index 9ad786d5..a53dcdb2 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt @@ -87,7 +87,7 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate localCpuUsage = value } } - "gpu_count", "gpu_cores" -> + "gpu_count", "gpuCount", "gpu_cores", "gpuCores" -> object : PrimitiveConverter() { override fun addInt(value: Int) { localGpuCount = value |
