diff options
Diffstat (limited to 'opendc-compute/opendc-compute-simulator/src')
10 files changed, 335 insertions, 6 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java index eb8d3377..69625306 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -25,6 +25,7 @@ package org.opendc.compute.simulator.service; import java.time.Duration; import java.time.Instant; import java.time.InstantSource; +import java.time.temporal.TemporalAmount; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -499,7 +500,6 @@ public final class ComputeService implements AutoCloseable { SimHost host = hv.getHost(); // Remove request from queue - taskQueue.remove(req); tasksPending--; LOGGER.info("Assigned task {} to host {}", task, host); @@ -642,6 +642,9 @@ public final class ComputeService implements AutoCloseable { @NotNull public ServiceTask newTask( @NotNull String name, + @NotNull TaskNature nature, + @NotNull TemporalAmount duration, + @NotNull Long deadline, @NotNull ServiceFlavor flavor, @NotNull Workload workload, @NotNull Map<String, ?> meta) { @@ -652,9 +655,9 @@ public final class ComputeService implements AutoCloseable { // final ServiceFlavor internalFlavor = // Objects.requireNonNull(service.flavorById.get(flavor.getUid()), "Unknown flavor"); - // ServiceTask task = new ServiceTask(service, uid, name, internalFlavor, workload, meta); - ServiceTask task = new ServiceTask(service, uid, name, flavor, workload, meta); + + ServiceTask task = new ServiceTask(service, uid, name, nature, duration, deadline, flavor, workload, meta); service.taskById.put(uid, task); diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java index dac65d67..4d5611a8 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java @@ -23,6 +23,7 @@ package org.opendc.compute.simulator.service; import java.time.Instant; +import java.time.temporal.TemporalAmount; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -49,6 +50,9 @@ public class ServiceTask { private final UUID uid; private final String name; + private final TaskNature nature; + private final TemporalAmount duration; + private final Long deadline; private ServiceFlavor flavor; public Workload workload; @@ -68,12 +72,18 @@ public class ServiceTask { ComputeService service, UUID uid, String name, + TaskNature nature, + TemporalAmount duration, + Long deadline, ServiceFlavor flavor, Workload workload, Map<String, ?> meta) { this.service = service; this.uid = uid; this.name = name; + this.nature = nature; + this.duration = duration; + this.deadline = deadline; this.flavor = flavor; this.workload = workload; this.meta = meta; @@ -87,6 +97,21 @@ public class ServiceTask { } @NotNull + public TaskNature getNature() { + return nature; + } + + @NotNull + public TemporalAmount getDuration() { + return duration; + } + + @NotNull + public Long getDeadline() { + return deadline; + } + + @NotNull public String getName() { return name; } diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/TaskNature.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/TaskNature.java new file mode 100644 index 00000000..ffb49143 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/TaskNature.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.service; + +public class TaskNature { + + public final boolean deferrable; + + public TaskNature(boolean deferrable) { + this.deferrable = deferrable; + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt index a18856f8..1145c15b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt @@ -29,6 +29,7 @@ import org.opendc.compute.simulator.telemetry.ComputeMonitor import org.opendc.compute.simulator.telemetry.OutputFiles import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.topology.specs.HostSpec +import org.opendc.simulator.compute.power.CarbonReceiver import java.time.Duration /** @@ -84,7 +85,8 @@ public fun registerComputeMonitor( public fun setupHosts( serviceDomain: String, specs: List<ClusterSpec>, + carbonReceivers: List<CarbonReceiver>, startTime: Long = 0L, ): ProvisioningStep { - return HostsProvisioningStep(serviceDomain, specs, startTime) + return HostsProvisioningStep(serviceDomain, specs, carbonReceivers, startTime) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index 68e263f8..c347e866 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -29,6 +29,7 @@ import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.topology.specs.HostSpec import org.opendc.compute.topology.specs.createSimBatteryPolicy import org.opendc.simulator.compute.power.CarbonModel +import org.opendc.simulator.compute.power.CarbonReceiver import org.opendc.simulator.compute.power.SimPowerSource import org.opendc.simulator.compute.power.batteries.BatteryAggregator import org.opendc.simulator.compute.power.batteries.SimBattery @@ -46,6 +47,7 @@ import org.opendc.simulator.engine.graph.FlowEdge public class HostsProvisioningStep internal constructor( private val serviceDomain: String, private val clusterSpecs: List<ClusterSpec>, + private val carbonReceivers: List<CarbonReceiver>, private val startTime: Long = 0L, ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { @@ -75,6 +77,9 @@ public class HostsProvisioningStep internal constructor( if (carbonFragments != null) { carbonModel = CarbonModel(engine, carbonFragments, startTime) carbonModel.addReceiver(simPowerSource) + for (receiver in carbonReceivers) { + carbonModel.addReceiver(receiver) + } } if (cluster.battery != null) { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt index 8f2369dc..b83eafdd 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt @@ -31,6 +31,7 @@ import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher import org.opendc.compute.simulator.scheduler.weights.InstanceCountWeigher import org.opendc.compute.simulator.scheduler.weights.RamWeigher import org.opendc.compute.simulator.scheduler.weights.VCpuWeigher +import java.time.InstantSource import java.util.SplittableRandom import java.util.random.RandomGenerator @@ -45,13 +46,15 @@ public enum class ComputeSchedulerEnum { ProvisionedCoresInv, Random, TaskNumMemorizing, + Timeshift, } public fun createComputeScheduler( name: String, seeder: RandomGenerator, + clock: InstantSource, ): ComputeScheduler { - return createComputeScheduler(ComputeSchedulerEnum.valueOf(name.uppercase()), seeder) + return createComputeScheduler(ComputeSchedulerEnum.valueOf(name.uppercase()), seeder, clock) } /** @@ -60,6 +63,7 @@ public fun createComputeScheduler( public fun createComputeScheduler( name: ComputeSchedulerEnum, seeder: RandomGenerator, + clock: InstantSource, ): ComputeScheduler { val cpuAllocationRatio = 1.0 val ramAllocationRatio = 1.5 @@ -116,5 +120,13 @@ public fun createComputeScheduler( filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), random = SplittableRandom(seeder.nextLong()), ) + ComputeSchedulerEnum.Timeshift -> + TimeshiftScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)), + windowSize = 168, + clock = clock, + random = SplittableRandom(seeder.nextLong()), + ) } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt index 832482eb..386e0d63 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt @@ -79,7 +79,6 @@ public class FilterScheduler( } val task = req.task - val hosts = hosts val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } } val subset = diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt index c48e6fb0..faa1be6b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt @@ -88,6 +88,7 @@ public class MemorizingScheduler( taskloop@ for (req in iter) { if (req.isCancelled) { iter.remove() + continue } for (chosenListIndex in minAvailableHost until hostsQueue.size) { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftScheduler.kt new file mode 100644 index 00000000..a856f366 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftScheduler.kt @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler + +import org.opendc.compute.simulator.scheduler.filters.HostFilter +import org.opendc.compute.simulator.scheduler.weights.HostWeigher +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.simulator.compute.power.CarbonModel +import org.opendc.simulator.compute.power.CarbonReceiver +import java.time.Instant +import java.time.InstantSource +import java.util.LinkedList +import java.util.SplittableRandom +import java.util.random.RandomGenerator +import kotlin.math.min + +public class TimeshiftScheduler( + private val filters: List<HostFilter>, + private val weighers: List<HostWeigher>, + private val windowSize: Int, + private val clock: InstantSource, + private val subsetSize: Int = 1, + private val random: RandomGenerator = SplittableRandom(0), +) : ComputeScheduler, CarbonReceiver { + /** + * The pool of hosts available to the scheduler. + */ + private val hosts = mutableListOf<HostView>() + + init { + require(subsetSize >= 1) { "Subset size must be one or greater" } + } + + private val pastCarbonIntensities = LinkedList<Double>() + private var carbonRunningSum = 0.0 + + private var carbonIntensity = 0.0 + +// private var lowerCarbonIntensity = 0.0 + private var thresholdCarbonIntensity = 0.0 + + override fun addHost(host: HostView) { + hosts.add(host) + } + + override fun removeHost(host: HostView) { + hosts.remove(host) + } + + override fun select(iter: MutableIterator<SchedulingRequest>): SchedulingResult { + var result: SchedulingResult? = null + for (req in iter) { + if (req.isCancelled) { + iter.remove() + continue + } + + val task = req.task + + if (carbonIntensity > thresholdCarbonIntensity) { + if (task.nature.deferrable) { + val currentTime = clock.instant() + val estimatedCompletion = currentTime.plus(task.duration) + val deadline = Instant.ofEpochMilli(task.deadline) + if (estimatedCompletion.isBefore(deadline)) { + // No need to schedule this task in a high carbon intensity period + continue + } + } + } + + val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } } + + val subset = + if (weighers.isNotEmpty()) { + val filterResults = weighers.map { it.getWeights(filteredHosts, task) } + val weights = DoubleArray(filteredHosts.size) + + for (fr in filterResults) { + val min = fr.min + val range = (fr.max - min) + + // Skip result if all weights are the same + if (range == 0.0) { + continue + } + + val multiplier = fr.multiplier + val factor = multiplier / range + + for ((i, weight) in fr.weights.withIndex()) { + weights[i] += factor * (weight - min) + } + } + + weights.indices + .asSequence() + .sortedByDescending { weights[it] } + .map { filteredHosts[it] } + .take(subsetSize) + .toList() + } else { + filteredHosts + } + + val maxSize = min(subsetSize, subset.size) + if (maxSize == 0) { + result = SchedulingResult(SchedulingResultType.FAILURE, null, req) + break + } else { + iter.remove() + result = SchedulingResult(SchedulingResultType.SUCCESS, subset[random.nextInt(maxSize)], req) + break + } + } + + if (result == null) return SchedulingResult(SchedulingResultType.EMPTY) + + return result + } + + override fun removeTask( + task: ServiceTask, + host: HostView?, + ) {} + + override fun updateCarbonIntensity(newCarbonIntensity: Double) { + this.carbonIntensity = newCarbonIntensity + this.pastCarbonIntensities.addLast(newCarbonIntensity) + this.carbonRunningSum += newCarbonIntensity + if (this.pastCarbonIntensities.size > this.windowSize) { + this.carbonRunningSum -= this.pastCarbonIntensities.removeFirst() + } + this.thresholdCarbonIntensity = this.carbonRunningSum / this.pastCarbonIntensities.size + } + + override fun setCarbonModel(carbonModel: CarbonModel?) {} + + override fun removeCarbonModel(carbonModel: CarbonModel?) {} +} diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt new file mode 100644 index 00000000..b893c1aa --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.compute.simulator.service.TaskNature +import java.time.Duration +import java.time.Instant +import java.time.InstantSource + +class TimeshiftSchedulerTest { + @Test + fun testBasicDeferring() { + val clock = mockk<InstantSource>() + every { clock.instant() } returns Instant.ofEpochMilli(10) + + val scheduler = + TimeshiftScheduler( + filters = emptyList(), + weighers = emptyList(), + windowSize = 2, + clock = clock, + ) + + val req = mockk<SchedulingRequest>() + every { req.task.flavor.coreCount } returns 2 + every { req.task.flavor.memorySize } returns 1024 + every { req.isCancelled } returns false + every { req.task.nature } returns TaskNature(true) + every { req.task.duration } returns Duration.ofMillis(10) + every { req.task.deadline } returns 50 + + scheduler.updateCarbonIntensity(100.0) + scheduler.updateCarbonIntensity(200.0) + + assertEquals(SchedulingResultType.EMPTY, scheduler.select(mutableListOf(req).iterator()).resultType) + } + + @Test + fun testRespectDeadline() { + val clock = mockk<InstantSource>() + every { clock.instant() } returns Instant.ofEpochMilli(10) + + val scheduler = + TimeshiftScheduler( + filters = emptyList(), + weighers = emptyList(), + windowSize = 2, + clock = clock, + ) + + val req = mockk<SchedulingRequest>() + every { req.task.flavor.coreCount } returns 2 + every { req.task.flavor.memorySize } returns 1024 + every { req.isCancelled } returns false + every { req.task.nature } returns TaskNature(true) + every { req.task.duration } returns Duration.ofMillis(10) + every { req.task.deadline } returns 20 + + scheduler.updateCarbonIntensity(100.0) + scheduler.updateCarbonIntensity(200.0) + + // The scheduler tries to schedule the task, but fails as there are no hosts. + assertEquals(SchedulingResultType.FAILURE, scheduler.select(mutableListOf(req).iterator()).resultType) + } +} |
