diff options
| author | Radu Nicolae <rnicolae04@gmail.com> | 2026-03-13 16:52:30 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-03-13 16:52:30 +0100 |
| commit | 8bb98c773bc982a0dab9cf9fb20d62f60a36a5d7 (patch) | |
| tree | cabec38c12a7ce1d8b4b513d7153a1a617823b3b | |
| parent | c7b473279714cf83dd8a4bca0d3c9a08511d021a (diff) | |
| parent | 3feb66c9fc1934aa3137f09d5a27eb2cfea138f0 (diff) | |
Merge pull request #389 from vlogic/add-feature-to-check-available-memory-on-simhosts
Refactor `SimHost` for improved memory tracking, error handling, and …
4 files changed, 261 insertions, 27 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt index cb7d028c..164bbee7 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -229,8 +229,26 @@ public class SimHost( return this.guests.toList() } + /** + * Calculates the total memory used by the currently running tasks on the host. + * + * Iterates through the tasks mapped to guests in `taskToGuestMap`. For tasks that are in the + * `TaskState.RUNNING` state, their memory consumption is summed up. + * + * @return Total memory used by tasks currently in the RUNNING state, in bytes. + */ + private fun usedMemoryByRunningTasks(): Long { + var usedMemory: Long = 0 + for (vm in this.taskToGuestMap) { + if (vm.value.state == TaskState.RUNNING) { + usedMemory += vm.key.memorySize + } + } + return usedMemory + } + public fun canFit(task: ServiceTask): Boolean { - val sufficientMemory = model.memoryCapacity >= task.memorySize + val sufficientMemory = (model.memoryCapacity - this.usedMemoryByRunningTasks()) >= task.memorySize val enoughCpus = model.coreCount >= task.cpuCoreCount val canFit = simMachine!!.canFit(task.toMachineModel()) @@ -307,17 +325,8 @@ public class SimHost( for (guest in guests) { when (guest.state) { TaskState.RUNNING -> running++ - TaskState.FAILED, TaskState.TERMINATED -> { - failed++ - // Remove guests that have been deleted - this.taskToGuestMap.remove(guest.task) - guests.remove(guest) - } - TaskState.COMPLETED -> { - completed++ - this.taskToGuestMap.remove(guest.task) - guests.remove(guest) - } + TaskState.FAILED, TaskState.TERMINATED -> failed++ + TaskState.COMPLETED -> completed++ TaskState.PAUSED -> {} else -> invalid++ } @@ -337,8 +346,8 @@ public class SimHost( ) } - public fun getSystemStats(task: ServiceTask): GuestSystemStats { - val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" } + public fun getSystemStats(task: ServiceTask): GuestSystemStats? { + val guest = taskToGuestMap[task] ?: return null return guest.getSystemStats() } @@ -359,8 +368,8 @@ public class SimHost( ) } - public fun getCpuStats(task: ServiceTask): GuestCpuStats { - val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" } + public fun getCpuStats(task: ServiceTask): GuestCpuStats? { + val guest = taskToGuestMap[task] ?: return null return guest.getCpuStats() } @@ -388,7 +397,7 @@ public class SimHost( } public fun getGpuStats(task: ServiceTask): GuestGpuStats? { - val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" } + val guest = taskToGuestMap[task] ?: return null return guest.getGpuStats() } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt index be858e4f..5bf917dd 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt @@ -235,17 +235,19 @@ public class TaskTableReaderImpl( */ override fun record(now: Instant) { val newHost = service.lookupHost(task) - if (newHost != null && newHost.getName() != simHost?.getName()) { + if (newHost != simHost) { simHost = newHost - hostInfo = - HostInfo( - newHost.getName(), - newHost.getClusterName(), - "x86", - newHost.getModel().coreCount, - newHost.getModel().cpuCapacity, - newHost.getModel().memoryCapacity, - ) + if (newHost != null) { + hostInfo = + HostInfo( + newHost.getName(), + newHost.getClusterName(), + "x86", + newHost.getModel().coreCount, + newHost.getModel().cpuCapacity, + newHost.getModel().memoryCapacity, + ) + } } val cpuStats = simHost?.getCpuStats(task) diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/host/SimHostMemoryTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/host/SimHostMemoryTest.kt new file mode 100644 index 00000000..dc6d6a32 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/host/SimHostMemoryTest.kt @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2026 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.host + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.opendc.compute.api.TaskState +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.simulator.compute.machine.SimMachine +import org.opendc.simulator.compute.models.CpuModel +import org.opendc.simulator.compute.models.MachineModel +import org.opendc.simulator.compute.models.MemoryUnit +import org.opendc.simulator.compute.power.PowerModel +import org.opendc.simulator.engine.engine.FlowEngine +import org.opendc.simulator.engine.graph.FlowDistributor +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory +import java.time.InstantSource + +class SimHostMemoryTest { + @Test + fun testUsedMemoryByRunningTasks() { + val clock = InstantSource.fixed(java.time.Instant.EPOCH) + val engine = mockk<FlowEngine>(relaxed = true) + val cpuModel = CpuModel(0, 4, 2600.0, "vendor", "model", "arch") + val memoryUnit = MemoryUnit("vendor", "model", 3200.0, 1024) // 1024 MB memory + val machineModel = + MachineModel( + cpuModel, + memoryUnit, + null, + FlowDistributorFactory.DistributionPolicy.MAX_MIN_FAIRNESS, + FlowDistributorFactory.DistributionPolicy.MAX_MIN_FAIRNESS, + ) + val powerModel = mockk<PowerModel>(relaxed = true) + val distributor = mockk<FlowDistributor>(relaxed = true) + val simMachine = mockk<SimMachine>(relaxed = true) + every { simMachine.canFit(any()) } returns true + + val host = + SimHost( + name = "H01", + type = "host", + clusterName = "C01", + clock = clock, + engine = engine, + machineModel = machineModel, + cpuPowerModel = powerModel, + gpuPowerModel = null, + embodiedCarbon = 0.0, + expectedLifetime = 0.0, + powerDistributor = distributor, + ) + + // Use reflection to set simMachine if needed, but SimHost.launch() sets it. + // Actually SimHost has it as private var simMachine: SimMachine? = null + // Let's try to trigger launch or just use reflection for testing private state. + val simMachineField = host.javaClass.getDeclaredField("simMachine") + simMachineField.isAccessible = true + simMachineField.set(host, simMachine) + + val task1 = mockk<ServiceTask>(relaxed = true) + every { task1.memorySize } returns 512L + every { task1.cpuCoreCount } returns 1 + + val task2 = mockk<ServiceTask>(relaxed = true) + every { task2.memorySize } returns 512L + every { task2.cpuCoreCount } returns 1 + + val task3 = mockk<ServiceTask>(relaxed = true) + every { task3.memorySize } returns 256L + every { task3.cpuCoreCount } returns 1 + + // Initially can fit task1 and task2 (512 + 512 = 1024) + assertTrue(host.canFit(task1), "Task 1 should fit initially") + + // Mock taskToGuestMap to simulate running tasks + val taskToGuestMapField = host.javaClass.getDeclaredField("taskToGuestMap") + taskToGuestMapField.isAccessible = true + val taskToGuestMap = taskToGuestMapField.get(host) as MutableMap<ServiceTask, Any> + + val guest1 = mockk<org.opendc.compute.simulator.internal.Guest>(relaxed = true) + every { guest1.state } returns TaskState.RUNNING + + taskToGuestMap[task1] = guest1 + + // After task1 is RUNNING, used memory is 512. host capacity is 1024. + // canFit(task2) should be true (1024 - 512 >= 512) + assertTrue(host.canFit(task2), "Task 2 should fit when Task 1 is running") + + val guest2 = mockk<org.opendc.compute.simulator.internal.Guest>(relaxed = true) + every { guest2.state } returns TaskState.RUNNING + taskToGuestMap[task2] = guest2 + + // After task1 and task2 are RUNNING, used memory is 1024. + // canFit(task3) should be false (1024 - 1024 < 256) + assertFalse(host.canFit(task3), "Task 3 should not fit when Task 1 and 2 are running") + + // If guest2 stops + every { guest2.state } returns TaskState.COMPLETED + assertTrue(host.canFit(task3), "Task 3 should fit after Task 2 stops running") + + // If guest2 fails + every { guest2.state } returns TaskState.FAILED + assertTrue(host.canFit(task3), "Task 3 should fit after Task 2 fails") + + // If guest1 is paused + every { guest1.state } returns TaskState.PAUSED + assertTrue(host.canFit(task1), "Task 1 should fit when only task 1 is on host and it's paused") + // But task1 is in taskToGuestMap. Memory calculation should not include it if it's not RUNNING. + // Wait, if task1 is PAUSED, usedMemoryByRunningTasks() will sum 0 for it. + // So host.canFit(task1) should return (1024 - 0) >= 512, which is true. Correct. + + // Add a running task3 + val guest3 = mockk<org.opendc.compute.simulator.internal.Guest>(relaxed = true) + every { guest3.state } returns TaskState.RUNNING + taskToGuestMap[task3] = guest3 + // Memory: task1 (PAUSED, 0) + task2 (FAILED, 0) + task3 (RUNNING, 256) = 256 + // canFit task1 (512): (1024 - 256) >= 512 -> 768 >= 512 (true) + assertTrue(host.canFit(task1), "Task 1 should fit with only task 3 running") + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/host/SimHostTelemetryTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/host/SimHostTelemetryTest.kt new file mode 100644 index 00000000..b743dc0e --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/host/SimHostTelemetryTest.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2026 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.host + +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertDoesNotThrow +import org.junit.jupiter.api.Test +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.simulator.compute.models.CpuModel +import org.opendc.simulator.compute.models.MachineModel +import org.opendc.simulator.compute.models.MemoryUnit +import org.opendc.simulator.compute.power.PowerModel +import org.opendc.simulator.engine.engine.FlowEngine +import org.opendc.simulator.engine.graph.FlowDistributor +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory +import java.time.InstantSource + +class SimHostTelemetryTest { + @Test + fun testGetCpuStatsForMissingTask() { + val clock = InstantSource.fixed(java.time.Instant.EPOCH) + val engine = mockk<FlowEngine>(relaxed = true) + val cpuModel = CpuModel(0, 4, 2600.0, "vendor", "model", "arch") + val memoryUnit = MemoryUnit("vendor", "model", 3200.0, 1024) + val machineModel = + MachineModel( + cpuModel, + memoryUnit, + null, + FlowDistributorFactory.DistributionPolicy.MAX_MIN_FAIRNESS, + FlowDistributorFactory.DistributionPolicy.MAX_MIN_FAIRNESS, + ) + val powerModel = mockk<PowerModel>(relaxed = true) + val distributor = mockk<FlowDistributor>(relaxed = true) + + val host = + SimHost( + name = "H01", + type = "host", + clusterName = "C01", + clock = clock, + engine = engine, + machineModel = machineModel, + cpuPowerModel = powerModel, + gpuPowerModel = null, + embodiedCarbon = 0.0, + expectedLifetime = 0.0, + powerDistributor = distributor, + ) + + val task = mockk<ServiceTask>(relaxed = true) + io.mockk.every { task.name } returns "T01" + + // This should not throw even if the task is not on the host + assertDoesNotThrow { + host.getCpuStats(task) + } + } +} |
