diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-10-25 13:32:41 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-10-25 13:32:41 +0200 |
| commit | 5a365dbc068f2a8cdfa9813c39cc84bb30e15637 (patch) | |
| tree | 72716d562787b85e03cdc7fe1d30c827054d25a0 /opendc-compute/opendc-compute-simulator | |
| parent | 27f5b7dcb05aefdab9b762175d538931face0aba (diff) | |
Rewrote the FlowEngine (#256)
* Removed unused components. Updated tests.
Improved checkpointing model
Improved model, started with SimPowerSource
implemented FailureModels and Checkpointing
First working version
midway commit
first update
All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability.
* fixed merge conflicts
* Updated M3SA paths.
* Fixed small typo
Diffstat (limited to 'opendc-compute/opendc-compute-simulator')
61 files changed, 5087 insertions, 1120 deletions
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index 20ceb93e..8cbddb44 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -25,22 +25,25 @@ description = "Simulator for OpenDC Compute" // Build configuration plugins { `kotlin-library-conventions` + kotlin("plugin.serialization") version "1.9.22" } dependencies { - api(projects.opendcCompute.opendcComputeService) api(projects.opendcSimulator.opendcSimulatorCompute) + api(projects.opendcTrace.opendcTraceParquet) api(libs.commons.math3) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0") api(libs.microprofile.config) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-topology"))) - implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-telemetry"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon"))) implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-api"))) + implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet"))) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.slf4j.simple) + testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostListener.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostListener.java new file mode 100644 index 00000000..01acfa97 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostListener.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.host; + +import org.opendc.compute.api.TaskState; +import org.opendc.compute.simulator.service.ServiceTask; + +/** + * Listener interface for events originating from a {@link SimHost}. + */ +public interface HostListener { + /** + * This method is invoked when the state of <code>task</code> on <code>host</code> changes. + */ + default void onStateChanged(SimHost host, ServiceTask task, TaskState newState) {} + + /** + * This method is invoked when the state of a {@link SimHost} has changed. + */ + default void onStateChanged(SimHost host, HostState newState) {} +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostModel.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostModel.java new file mode 100644 index 00000000..96236c5c --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostModel.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.host; + +/** + * Record describing the static machine properties of the host. + * + * @param cpuCapacity The total CPU capacity of the host in MHz. + * @param coreCount The number of logical processing cores available for this host. + * @param memoryCapacity The amount of memory available for this host in MB. + */ +public record HostModel(float cpuCapacity, int coreCount, long memoryCapacity) {} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostState.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostState.java new file mode 100644 index 00000000..29fc8cb4 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostState.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.host; + +/** + * The state of a host. + */ +public enum HostState { + /** + * The host is up and able to host guests. + */ + UP, + + /** + * The host is in a (forced) down state and unable to host any guests. + */ + DOWN, + + /** + * The host is in an error state and unable to host any guests. + */ + ERROR +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java new file mode 100644 index 00000000..84e23516 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -0,0 +1,652 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.service; + +import java.time.Duration; +import java.time.Instant; +import java.time.InstantSource; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.SplittableRandom; +import java.util.UUID; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.opendc.common.Dispatcher; +import org.opendc.common.util.Pacer; +import org.opendc.compute.api.Flavor; +import org.opendc.compute.api.Image; +import org.opendc.compute.api.TaskState; +import org.opendc.compute.simulator.host.HostListener; +import org.opendc.compute.simulator.host.HostModel; +import org.opendc.compute.simulator.host.HostState; +import org.opendc.compute.simulator.host.SimHost; +import org.opendc.compute.simulator.scheduler.ComputeScheduler; +import org.opendc.compute.simulator.telemetry.SchedulerStats; +import org.opendc.simulator.compute.workload.Workload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link ComputeService} hosts the API implementation of the OpenDC Compute Engine. + */ +public final class ComputeService implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(ComputeService.class); + + /** + * The {@link InstantSource} representing the clock tracking the (simulation) time. + */ + private final InstantSource clock; + + /** + * The {@link ComputeScheduler} responsible for placing the tasks onto hosts. + */ + private final ComputeScheduler scheduler; + + /** + * The {@link Pacer} used to pace the scheduling requests. + */ + private final Pacer pacer; + + /** + * The {@link SplittableRandom} used to generate the unique identifiers for the service resources. + */ + private final SplittableRandom random = new SplittableRandom(0); + + private final int maxNumFailures; + + /** + * A flag to indicate that the service is closed. + */ + private boolean isClosed; + + /** + * A mapping from host to host view. + */ + private final Map<SimHost, HostView> hostToView = new HashMap<>(); + + /** + * The available hypervisors. + */ + private final Set<HostView> availableHosts = new HashSet<>(); + + /** + * The tasks that should be launched by the service. + */ + private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>(); + + /** + * The active tasks in the system. + */ + private final Map<ServiceTask, SimHost> activeTasks = new HashMap<>(); + + /** + * The active tasks in the system. + */ + private final Map<ServiceTask, SimHost> completedTasks = new HashMap<>(); + + /** + * The registered flavors for this compute service. + */ + private final Map<UUID, ServiceFlavor> flavorById = new HashMap<>(); + + private final List<ServiceFlavor> flavors = new ArrayList<>(); + + /** + * The registered images for this compute service. + */ + private final Map<UUID, ServiceImage> imageById = new HashMap<>(); + + private final List<ServiceImage> images = new ArrayList<>(); + + /** + * The registered tasks for this compute service. + */ + private final Map<UUID, ServiceTask> taskById = new HashMap<>(); + + private final List<ServiceTask> tasks = new ArrayList<>(); + + private final List<ServiceTask> tasksToRemove = new ArrayList<>(); + + /** + * A [HostListener] used to track the active tasks. + */ + private final HostListener hostListener = new HostListener() { + @Override + public void onStateChanged(@NotNull SimHost host, @NotNull HostState newState) { + LOGGER.debug("Host {} state changed: {}", host, newState); + + final HostView hv = hostToView.get(host); + + if (hv != null) { + if (newState == HostState.UP) { + availableHosts.add(hv); + } else { + availableHosts.remove(hv); + } + } + + // Re-schedule on the new machine + requestSchedulingCycle(); + } + + @Override + public void onStateChanged(@NotNull SimHost host, @NotNull ServiceTask task, @NotNull TaskState newState) { + if (task.getHost() != host) { + // This can happen when a task is rescheduled and started on another machine, while being deleted from + // the old machine. + return; + } + + task.setState(newState); + + if (newState == TaskState.COMPLETED || newState == TaskState.TERMINATED || newState == TaskState.FAILED) { + LOGGER.info("task {} {} {} finished", task.getUid(), task.getName(), task.getFlavor()); + + if (activeTasks.remove(task) != null) { + tasksActive--; + } + + HostView hv = hostToView.get(host); + final ServiceFlavor flavor = task.getFlavor(); + if (hv != null) { + hv.provisionedCores -= flavor.getCoreCount(); + hv.instanceCount--; + hv.availableMemory += flavor.getMemorySize(); + } else { + LOGGER.error("Unknown host {}", host); + } + + task.setHost(null); + host.removeTask(task); + + if (newState == TaskState.COMPLETED) { + tasksCompleted++; + } + if (newState == TaskState.TERMINATED) { + tasksTerminated++; + } + + if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) { + tasksToRemove.add(task); + } + + // Try to reschedule if needed + requestSchedulingCycle(); + } + } + }; + + private int maxCores = 0; + private long maxMemory = 0L; + private long attemptsSuccess = 0L; + private long attemptsFailure = 0L; + private int tasksTotal = 0; + private int tasksPending = 0; + private int tasksActive = 0; + private int tasksTerminated = 0; + private int tasksCompleted = 0; + + /** + * Construct a {@link ComputeService} instance. + */ + public ComputeService(Dispatcher dispatcher, ComputeScheduler scheduler, Duration quantum, int maxNumFailures) { + this.clock = dispatcher.getTimeSource(); + this.scheduler = scheduler; + this.pacer = new Pacer(dispatcher, quantum.toMillis(), (time) -> doSchedule()); + this.maxNumFailures = maxNumFailures; + } + + /** + * Create a new {@link Builder} instance. + */ + public static Builder builder(Dispatcher dispatcher, ComputeScheduler scheduler) { + return new Builder(dispatcher, scheduler); + } + + /** + * Create a new {@link ComputeClient} to control the compute service. + */ + public ComputeClient newClient() { + if (isClosed) { + throw new IllegalStateException("Service is closed"); + } + return new ComputeClient(this); + } + + /** + * Return the {@link ServiceTask}s hosted by this service. + */ + public List<ServiceTask> getTasks() { + return Collections.unmodifiableList(tasks); + } + + /** + * Return the {@link ServiceTask}s hosted by this service. + */ + public List<ServiceTask> getTasksToRemove() { + return Collections.unmodifiableList(tasksToRemove); + } + + public void clearTasksToRemove() { + this.tasksToRemove.clear(); + } + + /** + * Add a {@link SimHost} to the scheduling pool of the compute service. + */ + public void addHost(SimHost host) { + // Check if host is already known + if (hostToView.containsKey(host)) { + return; + } + + HostView hv = new HostView(host); + HostModel model = host.getModel(); + + maxCores = Math.max(maxCores, model.coreCount()); + maxMemory = Math.max(maxMemory, model.memoryCapacity()); + hostToView.put(host, hv); + + if (host.getState() == HostState.UP) { + availableHosts.add(hv); + } + + scheduler.addHost(hv); + host.addListener(hostListener); + } + + /** + * Remove a {@link SimHost} from the scheduling pool of the compute service. + */ + public void removeHost(SimHost host) { + HostView view = hostToView.remove(host); + if (view != null) { + availableHosts.remove(view); + scheduler.removeHost(view); + host.removeListener(hostListener); + } + } + + /** + * Lookup the {@link SimHost} that currently hosts the specified {@link ServiceTask}. + */ + public SimHost lookupHost(ServiceTask task) { + return task.getHost(); + } + + /** + * Return the {@link SimHost}s that are registered with this service. + */ + public Set<SimHost> getHosts() { + return Collections.unmodifiableSet(hostToView.keySet()); + } + + public InstantSource getClock() { + return this.clock; + } + + /** + * Collect the statistics about the scheduler component of this service. + */ + public SchedulerStats getSchedulerStats() { + return new SchedulerStats( + availableHosts.size(), + hostToView.size() - availableHosts.size(), + attemptsSuccess, + attemptsFailure, + tasksTotal, + tasksPending, + tasksActive, + tasksCompleted, + tasksTerminated); + } + + @Override + public void close() { + if (isClosed) { + return; + } + + isClosed = true; + pacer.cancel(); + } + + /** + * Enqueue the specified [task] to be scheduled onto a host. + */ + SchedulingRequest schedule(ServiceTask task) { + LOGGER.debug("Enqueueing task {} to be assigned to host", task.getUid()); + + long now = clock.millis(); + SchedulingRequest request = new SchedulingRequest(task, now); + + task.launchedAt = Instant.ofEpochMilli(now); + taskQueue.add(request); + tasksPending++; + requestSchedulingCycle(); + return request; + } + + void delete(ServiceFlavor flavor) { + flavorById.remove(flavor.getUid()); + flavors.remove(flavor); + } + + void delete(ServiceImage image) { + imageById.remove(image.getUid()); + images.remove(image); + } + + void delete(ServiceTask task) { + completedTasks.remove(task); + taskById.remove(task.getUid()); + tasks.remove(task); + } + + /** + * Indicate that a new scheduling cycle is needed due to a change to the service's state. + */ + private void requestSchedulingCycle() { + // Bail out in case the queue is empty. + if (taskQueue.isEmpty()) { + return; + } + + pacer.enqueue(); + } + + /** + * Run a single scheduling iteration. + */ + private void doSchedule() { + // reorder tasks + + while (!taskQueue.isEmpty()) { + SchedulingRequest request = taskQueue.peek(); + + if (request.isCancelled) { + taskQueue.poll(); + tasksPending--; + continue; + } + + final ServiceTask task = request.task; + + if (task.getNumFailures() >= maxNumFailures) { + LOGGER.warn("task {} has been terminated because it failed {} times", task, task.getNumFailures()); + + taskQueue.poll(); + tasksPending--; + tasksTerminated++; + task.setState(TaskState.TERMINATED); + tasksToRemove.add(task); + continue; + } + + final ServiceFlavor flavor = task.getFlavor(); + final HostView hv = scheduler.select(request.task); + + if (hv == null || !hv.getHost().canFit(task)) { + LOGGER.trace("Task {} selected for scheduling but no capacity available for it at the moment", task); + + if (flavor.getMemorySize() > maxMemory || flavor.getCoreCount() > maxCores) { + // Remove the incoming image + taskQueue.poll(); + tasksPending--; + + LOGGER.warn("Failed to spawn {}: does not fit", task); + + task.setState(TaskState.FAILED); + continue; + } else { + break; + } + } + + SimHost host = hv.getHost(); + + // Remove request from queue + taskQueue.poll(); + tasksPending--; + + LOGGER.info("Assigned task {} to host {}", task, host); + + try { + task.host = host; + + host.spawn(task); + // host.start(task); + + tasksActive++; + attemptsSuccess++; + + hv.instanceCount++; + hv.provisionedCores += flavor.getCoreCount(); + hv.availableMemory -= flavor.getMemorySize(); + + activeTasks.put(task, host); + } catch (Exception cause) { + LOGGER.error("Failed to deploy VM", cause); + attemptsFailure++; + } + } + } + + /** + * Builder class for a {@link ComputeService}. + */ + public static class Builder { + private final Dispatcher dispatcher; + private final ComputeScheduler computeScheduler; + private Duration quantum = Duration.ofSeconds(1); + private int maxNumFailures = 10; + + Builder(Dispatcher dispatcher, ComputeScheduler computeScheduler) { + this.dispatcher = dispatcher; + this.computeScheduler = computeScheduler; + } + + /** + * Set the scheduling quantum of the service. + */ + public Builder withQuantum(Duration quantum) { + this.quantum = quantum; + return this; + } + + public Builder withMaxNumFailures(int maxNumFailures) { + this.maxNumFailures = maxNumFailures; + return this; + } + + /** + * Build a {@link ComputeService}. + */ + public ComputeService build() { + return new ComputeService(dispatcher, computeScheduler, quantum, maxNumFailures); + } + } + + /** + * Implementation of {@link ComputeClient} using a {@link ComputeService}. + */ + public static class ComputeClient { + private final ComputeService service; + private boolean isClosed; + + ComputeClient(ComputeService service) { + this.service = service; + } + + /** + * Method to check if the client is still open and throw an exception if it is not. + */ + private void checkOpen() { + if (isClosed) { + throw new IllegalStateException("Client is already closed"); + } + } + + @NotNull + public List<Flavor> queryFlavors() { + checkOpen(); + return new ArrayList<>(service.flavors); + } + + public Flavor findFlavor(@NotNull UUID id) { + checkOpen(); + + return service.flavorById.get(id); + } + + @NotNull + public Flavor newFlavor(@NotNull String name, int cpuCount, long memorySize, @NotNull Map<String, ?> meta) { + checkOpen(); + + final ComputeService service = this.service; + UUID uid = new UUID(service.clock.millis(), service.random.nextLong()); + ServiceFlavor flavor = new ServiceFlavor(service, uid, name, cpuCount, memorySize, meta); + + service.flavorById.put(uid, flavor); + service.flavors.add(flavor); + + return flavor; + } + + @NotNull + public List<Image> queryImages() { + checkOpen(); + + return new ArrayList<>(service.images); + } + + public Image findImage(@NotNull UUID id) { + checkOpen(); + + return service.imageById.get(id); + } + + public Image newImage(@NotNull String name) { + return newImage(name, Collections.emptyMap(), Collections.emptyMap()); + } + + @NotNull + public Image newImage(@NotNull String name, @NotNull Map<String, String> labels, @NotNull Map<String, ?> meta) { + checkOpen(); + + final ComputeService service = this.service; + UUID uid = new UUID(service.clock.millis(), service.random.nextLong()); + + ServiceImage image = new ServiceImage(service, uid, name, labels, meta); + + service.imageById.put(uid, image); + service.images.add(image); + + return image; + } + + @NotNull + public ServiceTask newTask( + @NotNull String name, + @NotNull Flavor flavor, + @NotNull Workload workload, + @NotNull Map<String, ?> meta) { + checkOpen(); + + final ComputeService service = this.service; + UUID uid = new UUID(service.clock.millis(), service.random.nextLong()); + + final ServiceFlavor internalFlavor = + Objects.requireNonNull(service.flavorById.get(flavor.getUid()), "Unknown flavor"); + + ServiceTask task = new ServiceTask(service, uid, name, internalFlavor, workload, meta); + + service.taskById.put(uid, task); + service.tasks.add(task); + + service.tasksTotal++; + + task.start(); + + return task; + } + + @Nullable + public ServiceTask findTask(@NotNull UUID id) { + checkOpen(); + return service.taskById.get(id); + } + + @NotNull + public List<ServiceTask> queryTasks() { + checkOpen(); + + return new ArrayList<>(service.tasks); + } + + public void close() { + isClosed = true; + } + + @Override + public String toString() { + return "ComputeService.Client"; + } + + @Nullable + public void rescheduleTask(@NotNull ServiceTask task, @NotNull Workload workload) { + ServiceTask internalTask = findTask(task.getUid()); + // SimHost from = service.lookupHost(internalTask); + + // from.delete(internalTask); + + internalTask.host = null; + + internalTask.setWorkload(workload); + internalTask.start(); + } + } + + /** + * A request to schedule a {@link ServiceTask} onto one of the {@link SimHost}s. + */ + static class SchedulingRequest { + final ServiceTask task; + final long submitTime; + + boolean isCancelled; + + SchedulingRequest(ServiceTask task, long submitTime) { + this.task = task; + this.submitTime = submitTime; + } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/HostView.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/HostView.java new file mode 100644 index 00000000..f4aa9c70 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/HostView.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.service; + +import org.opendc.compute.simulator.host.SimHost; + +/** + * A view of a {@link SimHost} as seen from the {@link ComputeService}. + */ +public class HostView { + private final SimHost host; + int instanceCount; + long availableMemory; + int provisionedCores; + + /** + * Construct a {@link HostView} instance. + * + * @param host The host to create a view of. + */ + public HostView(SimHost host) { + this.host = host; + this.availableMemory = host.getModel().memoryCapacity(); + } + + /** + * The {@link SimHost} this is a view of. + */ + public SimHost getHost() { + return host; + } + + /** + * Return the number of instances on this host. + */ + public int getInstanceCount() { + return instanceCount; + } + + /** + * Return the available memory of the host. + */ + public long getAvailableMemory() { + return availableMemory; + } + + /** + * Return the provisioned cores on the host. + */ + public int getProvisionedCores() { + return provisionedCores; + } + + @Override + public String toString() { + return "HostView[host=" + host + "]"; + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java new file mode 100644 index 00000000..eddde87e --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.service; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.jetbrains.annotations.NotNull; +import org.opendc.compute.api.Flavor; + +/** + * Implementation of {@link Flavor} provided by {@link ComputeService}. + */ +public final class ServiceFlavor implements Flavor { + private final ComputeService service; + private final UUID uid; + private final String name; + private final int coreCount; + private final long memorySize; + private final Map<String, ?> meta; + + ServiceFlavor(ComputeService service, UUID uid, String name, int coreCount, long memorySize, Map<String, ?> meta) { + this.service = service; + this.uid = uid; + this.name = name; + this.coreCount = coreCount; + this.memorySize = memorySize; + this.meta = meta; + } + + @Override + public int getCoreCount() { + return coreCount; + } + + @Override + public long getMemorySize() { + return memorySize; + } + + @NotNull + @Override + public UUID getUid() { + return uid; + } + + @NotNull + @Override + public String getName() { + return name; + } + + @NotNull + @Override + public Map<String, Object> getMeta() { + return Collections.unmodifiableMap(meta); + } + + @Override + public void reload() { + // No-op: this object is the source-of-truth + } + + @Override + public void delete() { + service.delete(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ServiceFlavor flavor = (ServiceFlavor) o; + return service.equals(flavor.service) && uid.equals(flavor.uid); + } + + @Override + public int hashCode() { + return Objects.hash(service, uid); + } + + @Override + public String toString() { + return "Flavor[uid=" + uid + ",name=" + name + "]"; + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceImage.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceImage.java new file mode 100644 index 00000000..dffa4356 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceImage.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.service; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.jetbrains.annotations.NotNull; +import org.opendc.compute.api.Image; + +/** + * Implementation of {@link Image} provided by {@link ComputeService}. + */ +public final class ServiceImage implements Image { + private final ComputeService service; + private final UUID uid; + private final String name; + private final Map<String, String> labels; + private final Map<String, ?> meta; + + ServiceImage(ComputeService service, UUID uid, String name, Map<String, String> labels, Map<String, ?> meta) { + this.service = service; + this.uid = uid; + this.name = name; + this.labels = labels; + this.meta = meta; + } + + @NotNull + @Override + public UUID getUid() { + return uid; + } + + @NotNull + @Override + public String getName() { + return name; + } + + @NotNull + @Override + public Map<String, Object> getMeta() { + return Collections.unmodifiableMap(meta); + } + + @Override + public void reload() { + // No-op: this object is the source-of-truth + } + + @Override + public void delete() { + service.delete(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ServiceImage image = (ServiceImage) o; + return service.equals(image.service) && uid.equals(image.uid); + } + + @Override + public int hashCode() { + return Objects.hash(service, uid); + } + + @Override + public String toString() { + return "Image[uid=" + uid + ",name=" + name + "]"; + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java new file mode 100644 index 00000000..f39142eb --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.service; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.opendc.compute.api.TaskState; +import org.opendc.compute.simulator.TaskWatcher; +import org.opendc.compute.simulator.host.SimHost; +import org.opendc.simulator.compute.workload.Workload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link ServiceTask} provided by {@link ComputeService}. + */ +public class ServiceTask { + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTask.class); + + private final ComputeService service; + private final UUID uid; + + private final String name; + private final ServiceFlavor flavor; + public Workload workload; + + private Map<String, ?> meta; // TODO: remove this + + private final List<TaskWatcher> watchers = new ArrayList<>(); + private TaskState state = TaskState.CREATED; + Instant launchedAt = null; + Instant createdAt; + Instant finishedAt; + SimHost host = null; + private ComputeService.SchedulingRequest request = null; + + private int numFailures = 0; + + ServiceTask( + ComputeService service, + UUID uid, + String name, + ServiceFlavor flavor, + Workload workload, + Map<String, ?> meta) { + this.service = service; + this.uid = uid; + this.name = name; + this.flavor = flavor; + this.workload = workload; + this.meta = meta; + + this.createdAt = this.service.getClock().instant(); + } + + @NotNull + public UUID getUid() { + return uid; + } + + @NotNull + public String getName() { + return name; + } + + @NotNull + public ServiceFlavor getFlavor() { + return flavor; + } + + @NotNull + public Map<String, Object> getMeta() { + return Collections.unmodifiableMap(meta); + } + + public void setWorkload(Workload newWorkload) { + this.workload = newWorkload; + } + + @NotNull + public TaskState getState() { + return state; + } + + @Nullable + public Instant getLaunchedAt() { + return launchedAt; + } + + @Nullable + public Instant getCreatedAt() { + return createdAt; + } + + @Nullable + public Instant getFinishedAt() { + return finishedAt; + } + + /** + * Return the {@link SimHost} on which the task is running or <code>null</code> if it is not running on a host. + */ + public SimHost getHost() { + return host; + } + + public void setHost(SimHost host) { + this.host = host; + } + + public int getNumFailures() { + return this.numFailures; + } + + public void start() { + switch (state) { + case PROVISIONING: + LOGGER.debug("User tried to start task but request is already pending: doing nothing"); + case RUNNING: + LOGGER.debug("User tried to start task but task is already running"); + break; + case COMPLETED: + case TERMINATED: + LOGGER.warn("User tried to start deleted task"); + throw new IllegalStateException("Task is deleted"); + case CREATED: + LOGGER.info("User requested to start task {}", uid); + setState(TaskState.PROVISIONING); + assert request == null : "Scheduling request already active"; + request = service.schedule(this); + break; + case FAILED: + LOGGER.info("User requested to start task after failure {}", uid); + setState(TaskState.PROVISIONING); + request = service.schedule(this); + break; + } + } + + public void watch(@NotNull TaskWatcher watcher) { + watchers.add(watcher); + } + + public void unwatch(@NotNull TaskWatcher watcher) { + watchers.remove(watcher); + } + + public void delete() { + cancelProvisioningRequest(); + final SimHost host = this.host; + if (host != null) { + host.delete(this); + } + service.delete(this); + + this.setState(TaskState.DELETED); + } + + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ServiceTask task = (ServiceTask) o; + return service.equals(task.service) && uid.equals(task.uid); + } + + public int hashCode() { + return Objects.hash(service, uid); + } + + public String toString() { + return "Task[uid=" + uid + ",name=" + name + ",state=" + state + "]"; + } + + void setState(TaskState newState) { + if (this.state == newState) { + return; + } + + for (TaskWatcher watcher : watchers) { + watcher.onStateChanged(this, newState); + } + if (newState == TaskState.FAILED) { + this.numFailures++; + } + + if ((newState == TaskState.COMPLETED) || newState == TaskState.FAILED) { + this.finishedAt = this.service.getClock().instant(); + } + + this.state = newState; + } + + /** + * Cancel the provisioning request if active. + */ + private void cancelProvisioningRequest() { + final ComputeService.SchedulingRequest request = this.request; + if (request != null) { + this.request = null; + request.isCancelled = true; + } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java new file mode 100644 index 00000000..ea37f5f2 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry; + +/** + * Statistics about the CPUs of a guest. + * + * @param activeTime The cumulative time (in seconds) that the CPUs of the guest were actively running. + * @param idleTime The cumulative time (in seconds) the CPUs of the guest were idle. + * @param stealTime The cumulative CPU time (in seconds) that the guest was ready to run, but not granted time by the host. + * @param lostTime The cumulative CPU time (in seconds) that was lost due to interference with other machines. + * @param capacity The available CPU capacity of the guest (in MHz). + * @param usage Amount of CPU resources (in MHz) actually used by the guest. + * @param utilization The utilization of the CPU resources (in %) relative to the total CPU capacity. + */ +public record GuestCpuStats( + long activeTime, + long idleTime, + long stealTime, + long lostTime, + float capacity, + float usage, + float utilization) {} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestSystemStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestSystemStats.java new file mode 100644 index 00000000..0d51e223 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestSystemStats.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry; + +import java.time.Duration; +import java.time.Instant; + +/** + * System-level statistics of a guest. + * + * @param uptime The cumulative uptime of the guest since last boot (in ms). + * @param downtime The cumulative downtime of the guest since last boot (in ms). + * @param bootTime The time at which the guest booted. + */ +public record GuestSystemStats(Duration uptime, Duration downtime, Instant bootTime) {} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostCpuStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostCpuStats.java new file mode 100644 index 00000000..3f2aab78 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostCpuStats.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry; + +/** + * Statistics about the CPUs of a host. + * + * @param activeTime The cumulative time (in seconds) that the CPUs of the host were actively running. + * @param idleTime The cumulative time (in seconds) the CPUs of the host were idle. + * @param stealTime The cumulative CPU time (in seconds) that virtual machines were ready to run, but were not able to. + * @param lostTime The cumulative CPU time (in seconds) that was lost due to interference between virtual machines. + * @param capacity The available CPU capacity of the host (in MHz). + * @param demand Amount of CPU resources (in MHz) the guests would use if there were no CPU contention or CPU + * limits. + * @param usage Amount of CPU resources (in MHz) actually used by the host. + * @param utilization The utilization of the CPU resources (in %) relative to the total CPU capacity. + */ +public record HostCpuStats( + long activeTime, + long idleTime, + long stealTime, + long lostTime, + float capacity, + float demand, + float usage, + float utilization) {} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostSystemStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostSystemStats.java new file mode 100644 index 00000000..353e62fa --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostSystemStats.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry; + +import java.time.Duration; +import java.time.Instant; + +/** + * System-level statistics of a host. + * + * @param uptime The cumulative uptime of the host since last boot (in ms). + * @param downtime The cumulative downtime of the host since last boot (in ms). + * @param bootTime The time at which the task started. + * @param powerDraw Instantaneous power draw of the system (in W). + * @param energyUsage The cumulative energy usage of the system (in J). + * @param guestsTerminated The number of guests that are in a terminated state. + * @param guestsRunning The number of guests that are in a running state. + * @param guestsError The number of guests that are in an error state. + * @param guestsInvalid The number of guests that are in an unknown state. + */ +public record HostSystemStats( + Duration uptime, + Duration downtime, + Instant bootTime, + float powerDraw, + float energyUsage, + int guestsTerminated, + int guestsRunning, + int guestsError, + int guestsInvalid) {} diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/SchedulerStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/SchedulerStats.java new file mode 100644 index 00000000..9d44a4b8 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/SchedulerStats.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry; + +/** + * Statistics about the scheduling component of the [ComputeService]. + * + * @param hostsAvailable The number of hosts currently available for scheduling. + * @param hostsUnavailable The number of hosts unavailable for scheduling. + * @param attemptsSuccess Scheduling attempts that resulted into an allocation onto a host. + * @param attemptsFailure The number of failed scheduling attempt due to any reason + * @param tasksTotal The number of tasks registered with the service. + * @param tasksPending The number of tasks that are pending to be scheduled. + * @param tasksActive The number of tasks that are currently managed by the service and running. + */ +public record SchedulerStats( + int hostsAvailable, + int hostsUnavailable, + long attemptsSuccess, + long attemptsFailure, + int tasksTotal, + int tasksPending, + int tasksActive, + int tasksCompleted, + int tasksTerminated) {} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt deleted file mode 100644 index ca72c910..00000000 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -/** - * A mutable [ServiceRegistry]. - */ -public interface MutableServiceRegistry : ServiceRegistry { - /** - * Register [service] for the specified [name] in this registry. - * - * @param name The name of the service to register, which should follow the rules for domain names as defined by - * DNS. - * @param type The interface provided by the service. - * @param service A reference to the actual implementation of the service. - */ - public fun <T : Any> register( - name: String, - type: Class<T>, - service: T, - ) - - /** - * Remove the service with [name] and [type] from this registry. - * - * @param name The name of the service to remove, which should follow the rules for domain names as defined by DNS. - * @param type The type of the service to remove. - */ - public fun remove( - name: String, - type: Class<*>, - ) - - /** - * Remove all services registered with [name]. - * - * @param name The name of the services to remove, which should follow the rules for domain names as defined by DNS. - */ - public fun remove(name: String) - - /** - * Create a copy of the registry. - */ - public override fun clone(): MutableServiceRegistry -} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt index 5a4bced1..e2f6c9d0 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt @@ -23,26 +23,53 @@ package org.opendc.compute.simulator /** - * A read-only registry of services used during experiments to resolve services. - * - * The service registry is similar conceptually to the Domain Name System (DNS), which is a naming system used to - * identify computers reachable via the Internet. The service registry should be used in a similar fashion. + * Implementation of the [ServiceRegistry] interface. */ -public interface ServiceRegistry { - /** - * Lookup the service with the specified [name] and [type]. - * - * @param name The name of the service to resolve, which should follow the rules for domain names as defined by DNS. - * @param type The type of the service to resolve, identified by the interface that is implemented by the service. - * @return The service with specified [name] and implementing [type] or `null` if it does not exist. - */ +public class ServiceRegistry(private val registry: MutableMap<String, MutableMap<Class<*>, Any>> = mutableMapOf()) { public fun <T : Any> resolve( name: String, type: Class<T>, - ): T? + ): T? { + val servicesForName = registry[name] ?: return null + + @Suppress("UNCHECKED_CAST") + return servicesForName[type] as T? + } + + public fun <T : Any> register( + name: String, + type: Class<T>, + service: T, + ) { + val services = registry.computeIfAbsent(name) { mutableMapOf() } + + if (type in services) { + throw IllegalStateException("Duplicate service $type registered for name $name") + } + + services[type] = service + } + + public fun remove( + name: String, + type: Class<*>, + ) { + val services = registry[name] ?: return + services.remove(type) + } + + public fun remove(name: String) { + registry.remove(name) + } + + public fun clone(): ServiceRegistry { + val res = mutableMapOf<String, MutableMap<Class<*>, Any>>() + registry.mapValuesTo(res) { (_, v) -> v.toMutableMap() } + return ServiceRegistry(res) + } - /** - * Create a copy of the registry. - */ - public fun clone(): ServiceRegistry + override fun toString(): String { + val entries = registry.map { "${it.key}=${it.value}" }.joinToString() + return "ServiceRegistry{$entries}" + } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt deleted file mode 100644 index bf3ee43f..00000000 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -/** - * Implementation of the [MutableServiceRegistry] interface. - */ -internal class ServiceRegistryImpl(private val registry: MutableMap<String, MutableMap<Class<*>, Any>> = mutableMapOf()) : - MutableServiceRegistry { - override fun <T : Any> resolve( - name: String, - type: Class<T>, - ): T? { - val servicesForName = registry[name] ?: return null - - @Suppress("UNCHECKED_CAST") - return servicesForName[type] as T? - } - - override fun <T : Any> register( - name: String, - type: Class<T>, - service: T, - ) { - val services = registry.computeIfAbsent(name) { mutableMapOf() } - - if (type in services) { - throw IllegalStateException("Duplicate service $type registered for name $name") - } - - services[type] = service - } - - override fun remove( - name: String, - type: Class<*>, - ) { - val services = registry[name] ?: return - services.remove(type) - } - - override fun remove(name: String) { - registry.remove(name) - } - - override fun clone(): MutableServiceRegistry { - val res = mutableMapOf<String, MutableMap<Class<*>, Any>>() - registry.mapValuesTo(res) { (_, v) -> v.toMutableMap() } - return ServiceRegistryImpl(res) - } - - override fun toString(): String { - val entries = registry.map { "${it.key}=${it.value}" }.joinToString() - return "ServiceRegistry{$entries}" - } -} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt deleted file mode 100644 index e681403c..00000000 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ /dev/null @@ -1,378 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Task -import org.opendc.compute.api.TaskState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostListener -import org.opendc.compute.service.driver.HostModel -import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.driver.telemetry.GuestCpuStats -import org.opendc.compute.service.driver.telemetry.GuestSystemStats -import org.opendc.compute.service.driver.telemetry.HostCpuStats -import org.opendc.compute.service.driver.telemetry.HostSystemStats -import org.opendc.compute.simulator.internal.DefaultWorkloadMapper -import org.opendc.compute.simulator.internal.Guest -import org.opendc.compute.simulator.internal.GuestListener -import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.compute.workload.SimWorkloads -import java.time.Duration -import java.time.Instant -import java.time.InstantSource -import java.util.UUID -import java.util.function.Supplier - -/** - * A [Host] implementation that simulates virtual machines on a physical machine using [SimHypervisor]. - * - * @param uid The unique identifier of the host. - * @param name The name of the host. - * @param meta The metadata of the host. - * @param clock The (virtual) clock used to track time. - * @param machine The [SimBareMetalMachine] on which the host runs. - * @param hypervisor The [SimHypervisor] to run on top of the machine. - * @param mapper A [SimWorkloadMapper] to map a [Task] to a [SimWorkload]. - * @param bootModel A [Supplier] providing the [SimWorkload] to execute during the boot procedure of the hypervisor. - * @param optimize A flag to indicate to optimize the machine models of the virtual machines. - */ -public class SimHost( - private val uid: UUID, - private val name: String, - private val meta: Map<String, Any>, - private val clock: InstantSource, - private val machine: SimBareMetalMachine, - private val hypervisor: SimHypervisor, - private val mapper: SimWorkloadMapper = DefaultWorkloadMapper, - private val bootModel: Supplier<SimWorkload?> = Supplier { null }, - private val optimize: Boolean = false, -) : Host, AutoCloseable { - /** - * The event listeners registered with this host. - */ - private val listeners = mutableListOf<HostListener>() - - /** - * The virtual machines running on the hypervisor. - */ - private val guests = HashMap<Task, Guest>() - private val localGuests = mutableListOf<Guest>() - - private var localState: HostState = HostState.DOWN - set(value) { - if (value != field) { - listeners.forEach { it.onStateChanged(this, value) } - } - field = value - } - - private val model: HostModel = - HostModel( - machine.model.cpu.totalCapacity, - machine.model.cpu.coreCount, - machine.model.memory.size, - ) - - /** - * The [GuestListener] that listens for guest events. - */ - private val guestListener = - object : GuestListener { - override fun onStart(guest: Guest) { - listeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) } - } - - override fun onStop(guest: Guest) { - listeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) } - } - } - - init { - launch() - } - - override fun getUid(): UUID { - return uid - } - - override fun getName(): String { - return name - } - - override fun getModel(): HostModel { - return model - } - - override fun getMeta(): Map<String, *> { - return meta - } - - override fun getState(): HostState { - return localState - } - - override fun getInstances(): Set<Task> { - return guests.keys - } - - override fun canFit(task: Task): Boolean { - val sufficientMemory = model.memoryCapacity >= task.flavor.memorySize - val enoughCpus = model.coreCount >= task.flavor.coreCount - val canFit = hypervisor.canFit(task.flavor.toMachineModel()) - - return sufficientMemory && enoughCpus && canFit - } - - override fun spawn(task: Task) { - guests.computeIfAbsent(task) { key -> - require(canFit(key)) { "Task does not fit" } - - val machine = hypervisor.newMachine(key.flavor.toMachineModel()) - val newGuest = - Guest( - clock, - this, - hypervisor, - mapper, - guestListener, - task, - machine, - ) - - localGuests.add(newGuest) - newGuest - } - } - - override fun contains(task: Task): Boolean { - return task in guests - } - - override fun start(task: Task) { - val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" } - guest.start() - } - - override fun stop(task: Task) { - val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" } - guest.stop() - } - - override fun delete(task: Task) { - val guest = guests[task] ?: return - guest.delete() - - guests.remove(task) - localGuests.remove(guest) - } - - override fun addListener(listener: HostListener) { - listeners.add(listener) - } - - override fun removeListener(listener: HostListener) { - listeners.remove(listener) - } - - override fun close() { - reset(HostState.DOWN) - machine.cancel() - } - - override fun getSystemStats(): HostSystemStats { - updateUptime() - - var terminated = 0 - var running = 0 - var error = 0 - var invalid = 0 - - val guests = localGuests.listIterator() - for (guest in guests) { - when (guest.state) { - TaskState.TERMINATED -> terminated++ - TaskState.RUNNING -> running++ - TaskState.ERROR -> error++ - TaskState.DELETED -> { - // Remove guests that have been deleted - this.guests.remove(guest.task) - guests.remove() - } - else -> invalid++ - } - } - - return HostSystemStats( - Duration.ofMillis(localUptime), - Duration.ofMillis(localDowntime), - localBootTime, - machine.psu.powerDraw, - machine.psu.energyUsage, - terminated, - running, - error, - invalid, - ) - } - - override fun getSystemStats(task: Task): GuestSystemStats { - val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" } - return guest.getSystemStats() - } - - override fun getCpuStats(): HostCpuStats { - val counters = hypervisor.counters - counters.sync() - - return HostCpuStats( - counters.cpuActiveTime, - counters.cpuIdleTime, - counters.cpuStealTime, - counters.cpuLostTime, - hypervisor.cpuCapacity, - hypervisor.cpuDemand, - hypervisor.cpuUsage, - hypervisor.cpuUsage / localCpuLimit, - ) - } - - override fun getCpuStats(task: Task): GuestCpuStats { - val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" } - return guest.getCpuStats() - } - - override fun hashCode(): Int = uid.hashCode() - - override fun equals(other: Any?): Boolean { - return other is SimHost && uid == other.uid - } - - override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" - - public fun fail() { - reset(HostState.ERROR) - - for (guest in localGuests) { - guest.fail() - } - } - - public fun recover() { - updateUptime() - - launch() - } - - /** - * The [SimMachineContext] that represents the machine running the hypervisor. - */ - private var ctx: SimMachineContext? = null - - /** - * Launch the hypervisor. - */ - private fun launch() { - check(ctx == null) { "Concurrent hypervisor running" } - - val bootWorkload = bootModel.get() - val hypervisor = hypervisor - val hypervisorWorkload = - object : SimWorkload by hypervisor { - override fun onStart(ctx: SimMachineContext) { - try { - localBootTime = clock.instant() - localState = HostState.UP - hypervisor.onStart(ctx) - - // Recover the guests that were running on the hypervisor. - for (guest in localGuests) { - guest.recover() - } - } catch (cause: Throwable) { - localState = HostState.ERROR - throw cause - } - } - } - - val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload - - // Launch hypervisor onto machine - ctx = - machine.startWorkload(workload, emptyMap()) { cause -> - localState = if (cause != null) HostState.ERROR else HostState.DOWN - ctx = null - } - } - - /** - * Reset the machine. - */ - private fun reset(state: HostState) { - updateUptime() - - // Stop the hypervisor - ctx?.shutdown() - localState = state - } - - /** - * Convert flavor to machine model. - */ - private fun Flavor.toMachineModel(): MachineModel { - return MachineModel(machine.model.cpu, MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - } - - private var localLastReport = clock.millis() - private var localUptime = 0L - private var localDowntime = 0L - private var localBootTime: Instant? = null - private val localCpuLimit = machine.model.cpu.totalCapacity - - /** - * Helper function to track the uptime of a machine. - */ - private fun updateUptime() { - val now = clock.millis() - val duration = now - localLastReport - localLastReport = now - - if (localState == HostState.UP) { - localUptime += duration - } else if (localState == HostState.ERROR) { - // Only increment downtime if the machine is in a failure state - localDowntime += duration - } - - val guests = localGuests - for (i in guests.indices) { - guests[i].updateUptime() - } - } -} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/TaskWatcher.kt index a85091a0..9fe4dff5 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/TaskWatcher.kt @@ -22,16 +22,21 @@ package org.opendc.compute.simulator -import org.opendc.compute.api.Image -import org.opendc.compute.api.Task -import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.compute.api.TaskState +import org.opendc.compute.simulator.service.ServiceTask /** - * A [SimWorkloadMapper] is responsible for mapping a [Task] and [Image] to a [SimWorkload] that can be simulated. + * An interface used to watch the state of [ServiceTask] instances. */ -public fun interface SimWorkloadMapper { +public interface TaskWatcher { /** - * Map the specified [task] to a [SimWorkload] that can be simulated. + * This method is invoked when the state of a [ServiceTask] changes. + * + * @param task The task whose state has changed. + * @param newState The new state of the task. */ - public fun createWorkload(task: Task): SimWorkload + public fun onStateChanged( + task: ServiceTask, + newState: TaskState, + ) {} } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt new file mode 100644 index 00000000..31ff384c --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -0,0 +1,369 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.host + +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.TaskState +import org.opendc.compute.simulator.internal.Guest +import org.opendc.compute.simulator.internal.GuestListener +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.compute.simulator.telemetry.GuestCpuStats +import org.opendc.compute.simulator.telemetry.GuestSystemStats +import org.opendc.compute.simulator.telemetry.HostCpuStats +import org.opendc.compute.simulator.telemetry.HostSystemStats +import org.opendc.simulator.compute.cpu.CpuPowerModel +import org.opendc.simulator.compute.machine.SimMachine +import org.opendc.simulator.compute.models.MachineModel +import org.opendc.simulator.compute.models.MemoryUnit +import org.opendc.simulator.engine.FlowGraph +import java.time.Duration +import java.time.Instant +import java.time.InstantSource +import java.util.UUID + +/** + * A [Host] implementation that simulates virtual machines on a physical machine. + * + * @param uid The unique identifier of the host. + * @param name The name of the host. + * @param meta The metadata of the host. + * @param clock The (virtual) clock used to track time. + * @param graph The Flow Graph that the Host is part of + * @param machineModel The static model of the host + * @param powerModel The static powerModel of the CPU TODO: can this be combined with machinemodel? + * @constructor Create empty Sim host + */ +public class SimHost( + private val uid: UUID, + private val name: String, + private val meta: Map<String, Any>, + private val clock: InstantSource, + private val graph: FlowGraph, + private val machineModel: MachineModel, + private val powerModel: CpuPowerModel, +) : AutoCloseable { + /** + * The event listeners registered with this host. + */ + private val hostListeners = mutableListOf<HostListener>() + + /** + * The virtual machines running on the hypervisor. + */ + private val taskToGuestMap = HashMap<ServiceTask, Guest>() + private val guests = mutableListOf<Guest>() + + private var hostState: HostState = HostState.DOWN + set(value) { + if (value != field) { + hostListeners.forEach { it.onStateChanged(this, value) } + } + field = value + } + + private val model: HostModel = + HostModel( + machineModel.cpu.totalCapacity, + machineModel.cpu.coreCount, + machineModel.memory.size, + ) + + private var simMachine: SimMachine? = null + + /** + * The [GuestListener] that listens for guest events. + */ + private val guestListener = + object : GuestListener { + override fun onStart(guest: Guest) { + hostListeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) } + } + + override fun onStop(guest: Guest) { + hostListeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) } + } + } + + private var lastReport = clock.millis() + private var totalUptime = 0L + private var totalDowntime = 0L + private var bootTime: Instant? = null + private val cpuLimit = machineModel.cpu.totalCapacity + + init { + launch() + } + + /** + * Launch the hypervisor. + */ + private fun launch() { + bootTime = this.clock.instant() + hostState = HostState.UP + + if (this.simMachine != null) { + return + } + + this.simMachine = + SimMachine( + this.graph, + this.machineModel, + this.powerModel, + ) { cause -> + hostState = if (cause != null) HostState.ERROR else HostState.DOWN + } + } + + override fun close() { + reset(HostState.DOWN) + } + + public fun fail() { + reset(HostState.ERROR) + + // Fail the guest and delete them + // This weird loop is the only way I have been able to make it work. + while (guests.size > 0) { + val guest = guests[0] + guest.fail() + this.delete(guest.task) + } + } + + public fun recover() { + updateUptime() + + launch() + } + + /** + * Reset the machine. + */ + private fun reset(state: HostState) { + updateUptime() + + // Stop the hypervisor + hostState = state + } + + public fun getUid(): UUID { + return uid + } + + public fun getName(): String { + return name + } + + public fun getModel(): HostModel { + return model + } + + public fun getMeta(): Map<String, *> { + return meta + } + + public fun getState(): HostState { + return hostState + } + + public fun getInstances(): Set<ServiceTask> { + return taskToGuestMap.keys + } + + public fun getGuests(): List<Guest> { + return this.guests + } + + public fun canFit(task: ServiceTask): Boolean { + val sufficientMemory = model.memoryCapacity >= task.flavor.memorySize + val enoughCpus = model.coreCount >= task.flavor.coreCount + val canFit = simMachine!!.canFit(task.flavor.toMachineModel()) + + return sufficientMemory && enoughCpus && canFit + } + + /** + * Spawn A Virtual machine that run the Task and put this Task as a Guest on it + * + * @param task + */ + public fun spawn(task: ServiceTask) { + assert(simMachine != null) { "Tried start task $task while no SimMachine is active" } + + require(canFit(task)) { "Task does not fit" } + + val newGuest = + Guest( + clock, + this, + guestListener, + task, + simMachine!!, + ) + + guests.add(newGuest) + newGuest.start() + + taskToGuestMap.computeIfAbsent(task) { newGuest } + } + + public fun contains(task: ServiceTask): Boolean { + return task in taskToGuestMap + } + + public fun start(task: ServiceTask) { + val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.uid} at host $uid" } + guest.start() + } + + public fun stop(task: ServiceTask) { + val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.uid} at host $uid" } + guest.stop() + } + + public fun delete(task: ServiceTask) { + val guest = taskToGuestMap[task] ?: return + guest.delete() + + taskToGuestMap.remove(task) + guests.remove(guest) + task.host = null + } + + public fun removeTask(task: ServiceTask) { + val guest = taskToGuestMap[task] ?: return + guest.delete() + + taskToGuestMap.remove(task) + guests.remove(guest) + } + + public fun addListener(listener: HostListener) { + hostListeners.add(listener) + } + + public fun removeListener(listener: HostListener) { + hostListeners.remove(listener) + } + + public fun getSystemStats(): HostSystemStats { + updateUptime() + this.simMachine!!.psu.updateCounters() + + var terminated = 0 + var running = 0 + var failed = 0 + var invalid = 0 + var completed = 0 + + val guests = guests.listIterator() + for (guest in guests) { + when (guest.state) { + TaskState.RUNNING -> running++ + TaskState.COMPLETED, TaskState.FAILED, TaskState.TERMINATED -> { + failed++ + // Remove guests that have been deleted + this.taskToGuestMap.remove(guest.task) + guests.remove() + } + else -> invalid++ + } + } + + return HostSystemStats( + Duration.ofMillis(totalUptime), + Duration.ofMillis(totalDowntime), + bootTime, + simMachine!!.psu.powerDraw, + simMachine!!.psu.energyUsage, + terminated, + running, + failed, + invalid, + ) + } + + public fun getSystemStats(task: ServiceTask): GuestSystemStats { + val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.uid} at host $uid" } + return guest.getSystemStats() + } + + public fun getCpuStats(): HostCpuStats { + simMachine!!.cpu.updateCounters(this.clock.millis()) + + val counters = simMachine!!.performanceCounters + + return HostCpuStats( + counters.cpuActiveTime, + counters.cpuIdleTime, + counters.cpuStealTime, + counters.cpuLostTime, + counters.cpuCapacity, + counters.cpuDemand, + counters.cpuSupply, + counters.cpuSupply / cpuLimit, + ) + } + + public fun getCpuStats(task: ServiceTask): GuestCpuStats { + val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.uid} at host $uid" } + return guest.getCpuStats() + } + + override fun hashCode(): Int = uid.hashCode() + + override fun equals(other: Any?): Boolean { + return other is SimHost && uid == other.uid + } + + override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" + + /** + * Convert flavor to machine model. + */ + private fun Flavor.toMachineModel(): MachineModel { + return MachineModel(simMachine!!.machineModel.cpu, MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + } + + /** + * Helper function to track the uptime of a machine. + */ + private fun updateUptime() { + val now = clock.millis() + val duration = now - lastReport + lastReport = now + + if (hostState == HostState.UP) { + totalUptime += duration + } else if (hostState == HostState.ERROR) { + // Only increment downtime if the machine is in a failure state + totalDowntime += duration + } + + val guests = guests + for (i in guests.indices) { + guests[i].updateUptime() + } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index cf6c146a..3a923222 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -23,15 +23,16 @@ package org.opendc.compute.simulator.internal import mu.KotlinLogging -import org.opendc.compute.api.Task import org.opendc.compute.api.TaskState -import org.opendc.compute.service.driver.telemetry.GuestCpuStats -import org.opendc.compute.service.driver.telemetry.GuestSystemStats -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.SimWorkloadMapper -import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.compute.simulator.host.SimHost +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.compute.simulator.telemetry.GuestCpuStats +import org.opendc.compute.simulator.telemetry.GuestSystemStats +import org.opendc.simulator.compute.machine.SimMachine +import org.opendc.simulator.compute.machine.VirtualMachine +import org.opendc.simulator.compute.workload.ChainWorkload +import org.opendc.simulator.compute.workload.TraceFragment +import org.opendc.simulator.compute.workload.TraceWorkload import java.time.Duration import java.time.Instant import java.time.InstantSource @@ -39,14 +40,12 @@ import java.time.InstantSource /** * A virtual machine instance that is managed by a [SimHost]. */ -internal class Guest( +public class Guest( private val clock: InstantSource, - val host: SimHost, - private val hypervisor: SimHypervisor, - private val mapper: SimWorkloadMapper, + public val host: SimHost, private val listener: GuestListener, - val task: Task, - val machine: SimHypervisor.SimVirtualMachine, + public val task: ServiceTask, + public val simMachine: SimMachine, ) { /** * The state of the [Guest]. @@ -54,50 +53,133 @@ internal class Guest( * [TaskState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for * a task. */ - var state: TaskState = TaskState.TERMINATED + public var state: TaskState = TaskState.CREATED private set /** + * The [VirtualMachine] on which the task is currently running + */ + public var virtualMachine: VirtualMachine? = null + + private var uptime = 0L + private var downtime = 0L + private var lastReport = clock.millis() + private var bootTime: Instant? = null + private val cpuLimit = simMachine.cpu.cpuModel.totalCapacity + + /** * Start the guest. */ - fun start() { + public fun start() { when (state) { - TaskState.TERMINATED, TaskState.ERROR -> { + TaskState.CREATED, TaskState.FAILED -> { LOGGER.info { "User requested to start task ${task.uid}" } doStart() } TaskState.RUNNING -> return - TaskState.DELETED -> { - LOGGER.warn { "User tried to start deleted task" } - throw IllegalArgumentException("Task is deleted") + TaskState.COMPLETED, TaskState.TERMINATED -> { + LOGGER.warn { "User tried to start a finished task" } + throw IllegalArgumentException("Task is already finished") } else -> assert(false) { "Invalid state transition" } } } /** + * Launch the guest on the simulated Virtual machine + */ + private fun doStart() { + assert(virtualMachine == null) { "Concurrent job is already running" } + + onStart() + + val bootworkload = + TraceWorkload( + ArrayList( + listOf( + TraceFragment( + 1000000L, + 100000.0, + 1, + ), + ), + ), + 0, + 0, + 0.0, + ) + val newChainWorkload = + ChainWorkload( + ArrayList(listOf(task.workload)), + task.workload.checkpointInterval, + task.workload.checkpointDuration, + task.workload.checkpointIntervalScaling, + ) + + virtualMachine = + simMachine.startWorkload(newChainWorkload) { cause -> + onStop(if (cause != null) TaskState.FAILED else TaskState.COMPLETED) + } + } + + /** + * This method is invoked when the guest was started on the host and has booted into a running state. + */ + private fun onStart() { + bootTime = clock.instant() + state = TaskState.RUNNING + listener.onStart(this) + } + + /** * Stop the guest. */ - fun stop() { + public fun stop() { when (state) { - TaskState.RUNNING -> doStop(TaskState.TERMINATED) - TaskState.ERROR -> doRecover() - TaskState.TERMINATED, TaskState.DELETED -> return + TaskState.RUNNING -> doStop(TaskState.COMPLETED) + TaskState.FAILED -> state = TaskState.TERMINATED + TaskState.COMPLETED, TaskState.TERMINATED -> return else -> assert(false) { "Invalid state transition" } } } /** + * Attempt to stop the task and put it into [target] state. + */ + private fun doStop(target: TaskState) { + assert(virtualMachine != null) { "Invalid job state" } + val virtualMachine = this.virtualMachine ?: return + if (target == TaskState.FAILED) { + virtualMachine.shutdown(Exception("Task has failed")) + } else { + virtualMachine.shutdown() + } + + this.virtualMachine = null + + this.state = target + } + + /** + * This method is invoked when the guest stopped. + */ + private fun onStop(target: TaskState) { + updateUptime() + + state = target + listener.onStop(this) + } + + /** * Delete the guest. * * This operation will stop the guest if it is running on the host and remove all resources associated with the * guest. */ - fun delete() { + public fun delete() { stop() - state = TaskState.DELETED - hypervisor.removeMachine(machine) + state = TaskState.FAILED } /** @@ -105,19 +187,19 @@ internal class Guest( * * This operation forcibly stops the guest and puts the task into an error state. */ - fun fail() { + public fun fail() { if (state != TaskState.RUNNING) { return } - doStop(TaskState.ERROR) + doStop(TaskState.FAILED) } /** * Recover the guest if it is in an error state. */ - fun recover() { - if (state != TaskState.ERROR) { + public fun recover() { + if (state != TaskState.FAILED) { return } @@ -127,117 +209,46 @@ internal class Guest( /** * Obtain the system statistics of this guest. */ - fun getSystemStats(): GuestSystemStats { + public fun getSystemStats(): GuestSystemStats { updateUptime() return GuestSystemStats( - Duration.ofMillis(localUptime), - Duration.ofMillis(localDowntime), - localBootTime, + Duration.ofMillis(uptime), + Duration.ofMillis(downtime), + bootTime, ) } /** * Obtain the CPU statistics of this guest. */ - fun getCpuStats(): GuestCpuStats { - val counters = machine.counters - counters.sync() + public fun getCpuStats(): GuestCpuStats { + virtualMachine!!.updateCounters(this.clock.millis()) + val counters = virtualMachine!!.performanceCounters return GuestCpuStats( counters.cpuActiveTime / 1000L, counters.cpuIdleTime / 1000L, counters.cpuStealTime / 1000L, counters.cpuLostTime / 1000L, - machine.cpuCapacity, - machine.cpuUsage, - machine.cpuUsage / localCpuLimit, + counters.cpuCapacity, + counters.cpuSupply, + counters.cpuSupply / cpuLimit, ) } /** - * The [SimMachineContext] representing the current active virtual machine instance or `null` if no virtual machine - * is active. - */ - private var ctx: SimMachineContext? = null - - /** - * Launch the guest on the simulated - */ - private fun doStart() { - assert(ctx == null) { "Concurrent job running" } - - onStart() - - val workload: SimWorkload = mapper.createWorkload(task) - workload.setOffset(clock.millis()) - val meta = mapOf("driver" to host, "task" to task) + task.meta - ctx = - machine.startWorkload(workload, meta) { cause -> - onStop(if (cause != null) TaskState.ERROR else TaskState.TERMINATED) - ctx = null - } - } - - /** - * Attempt to stop the task and put it into [target] state. - */ - private fun doStop(target: TaskState) { - assert(ctx != null) { "Invalid job state" } - val ctx = ctx ?: return - if (target == TaskState.ERROR) { - ctx.shutdown(Exception("Stopped because of ERROR")) - } else { - ctx.shutdown() - } - - state = target - } - - /** - * Attempt to recover from an error state. - */ - private fun doRecover() { - state = TaskState.TERMINATED - } - - /** - * This method is invoked when the guest was started on the host and has booted into a running state. - */ - private fun onStart() { - localBootTime = clock.instant() - state = TaskState.RUNNING - listener.onStart(this) - } - - /** - * This method is invoked when the guest stopped. - */ - private fun onStop(target: TaskState) { - updateUptime() - - state = target - listener.onStop(this) - } - - private var localUptime = 0L - private var localDowntime = 0L - private var localLastReport = clock.millis() - private var localBootTime: Instant? = null - private val localCpuLimit = machine.model.cpu.totalCapacity - - /** * Helper function to track the uptime and downtime of the guest. */ - fun updateUptime() { + public fun updateUptime() { val now = clock.millis() - val duration = now - localLastReport - localLastReport = now + val duration = now - lastReport + lastReport = now if (state == TaskState.RUNNING) { - localUptime += duration - } else if (state == TaskState.ERROR) { - localDowntime += duration + uptime += duration + } else if (state == TaskState.FAILED) { + downtime += duration } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt index e6d0fdad..895d78f9 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt @@ -25,14 +25,14 @@ package org.opendc.compute.simulator.internal /** * Helper interface to listen for [Guest] events. */ -internal interface GuestListener { +public interface GuestListener { /** * This method is invoked when the guest machine is running. */ - fun onStart(guest: Guest) + public fun onStart(guest: Guest) /** * This method is invoked when the guest machine is stopped. */ - fun onStop(guest: Guest) + public fun onStop(guest: Guest) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt index f1123742..f295f522 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt @@ -23,9 +23,9 @@ package org.opendc.compute.simulator.provisioner import org.opendc.compute.carbon.CarbonTrace -import org.opendc.compute.service.ComputeService -import org.opendc.compute.telemetry.ComputeMetricReader -import org.opendc.compute.telemetry.ComputeMonitor +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.ComputeMetricReader +import org.opendc.compute.simulator.telemetry.ComputeMonitor import java.time.Duration /** @@ -44,7 +44,15 @@ public class ComputeMonitorProvisioningStep( requireNotNull( ctx.registry.resolve(serviceDomain, ComputeService::class.java), ) { "Compute service $serviceDomain does not exist" } - val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval, startTime, carbonTrace) + val metricReader = + ComputeMetricReader( + ctx.dispatcher, + service, + monitor, + exportInterval, + startTime, + carbonTrace, + ) return AutoCloseable { metricReader.close() } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt index 645c9d46..6bdb131f 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt @@ -22,8 +22,8 @@ package org.opendc.compute.simulator.provisioner -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.scheduler.ComputeScheduler +import org.opendc.compute.simulator.service.ComputeService import java.time.Duration /** diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt index afde8219..07db3d26 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt @@ -25,9 +25,8 @@ package org.opendc.compute.simulator.provisioner import org.opendc.compute.carbon.CarbonTrace -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.telemetry.ComputeMonitor +import org.opendc.compute.simulator.scheduler.ComputeScheduler +import org.opendc.compute.simulator.telemetry.ComputeMonitor import org.opendc.compute.topology.specs.HostSpec import java.time.Duration @@ -41,7 +40,7 @@ import java.time.Duration public fun setupComputeService( serviceDomain: String, scheduler: (ProvisioningContext) -> ComputeScheduler, - schedulingQuantum: Duration = Duration.ofMinutes(5), + schedulingQuantum: Duration = Duration.ofSeconds(1), maxNumFailures: Int = 10, ): ProvisioningStep { return ComputeServiceProvisioningStep(serviceDomain, scheduler, schedulingQuantum, maxNumFailures) @@ -76,7 +75,6 @@ public fun registerComputeMonitor( public fun setupHosts( serviceDomain: String, specs: List<HostSpec>, - optimize: Boolean = false, ): ProvisioningStep { - return HostsProvisioningStep(serviceDomain, specs, optimize) + return HostsProvisioningStep(serviceDomain, specs) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index a80be634..19674d5e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -22,13 +22,10 @@ package org.opendc.compute.simulator.provisioner -import org.opendc.compute.service.ComputeService -import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.host.SimHost +import org.opendc.compute.simulator.service.ComputeService import org.opendc.compute.topology.specs.HostSpec -import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.flow2.FlowEngine -import java.util.SplittableRandom +import org.opendc.simulator.engine.FlowEngine /** * A [ProvisioningStep] that provisions a list of hosts for a [ComputeService]. @@ -40,30 +37,27 @@ import java.util.SplittableRandom public class HostsProvisioningStep internal constructor( private val serviceDomain: String, private val specs: List<HostSpec>, - private val optimize: Boolean, ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { val service = requireNotNull( ctx.registry.resolve(serviceDomain, ComputeService::class.java), ) { "Compute service $serviceDomain does not exist" } - val engine = FlowEngine.create(ctx.dispatcher) - val graph = engine.newGraph() val hosts = mutableSetOf<SimHost>() - for (spec in specs) { - val machine = SimBareMetalMachine.create(graph, spec.model, spec.psuFactory) - val hypervisor = SimHypervisor.create(spec.multiplexerFactory, SplittableRandom(ctx.seeder.nextLong())) + val flowEngine = FlowEngine.create(ctx.dispatcher) + val flowGraph = flowEngine.newGraph() + for (spec in specs) { val host = SimHost( spec.uid, spec.name, spec.meta, ctx.dispatcher.timeSource, - machine, - hypervisor, - optimize = optimize, + flowGraph, + spec.model, + spec.cpuPowerModel, ) require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt index 58d3a8c2..2e76478e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt @@ -23,9 +23,7 @@ package org.opendc.compute.simulator.provisioner import org.opendc.common.Dispatcher -import org.opendc.compute.simulator.MutableServiceRegistry import org.opendc.compute.simulator.ServiceRegistry -import org.opendc.compute.simulator.ServiceRegistryImpl import java.util.ArrayDeque import java.util.SplittableRandom @@ -47,7 +45,7 @@ public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable { object : ProvisioningContext { override val dispatcher: Dispatcher = dispatcher override val seeder: SplittableRandom = SplittableRandom(seed) - override val registry: MutableServiceRegistry = ServiceRegistryImpl() + override val registry: ServiceRegistry = ServiceRegistry() override fun toString(): String = "Provisioner.ProvisioningContext" } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt index 1788c8e2..20c441c4 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt @@ -23,7 +23,7 @@ package org.opendc.compute.simulator.provisioner import org.opendc.common.Dispatcher -import org.opendc.compute.simulator.MutableServiceRegistry +import org.opendc.compute.simulator.ServiceRegistry import java.util.SplittableRandom import java.util.random.RandomGenerator @@ -44,7 +44,7 @@ public interface ProvisioningContext { public val seeder: RandomGenerator /** - * A [MutableServiceRegistry] where the provisioned services are registered. + * A [ServiceRegistry] where the provisioned services are registered. */ - public val registry: MutableServiceRegistry + public val registry: ServiceRegistry } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt new file mode 100644 index 00000000..f0a2c3b4 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A generic scheduler interface used by the [ComputeService] to select hosts to place [ServiceTask]s on. + */ +public interface ComputeScheduler { + /** + * Register the specified [host] to be used for scheduling. + */ + public fun addHost(host: HostView) + + /** + * Remove the specified [host] to be removed from the scheduling pool. + */ + public fun removeHost(host: HostView) + + /** + * Select a host for the specified [task]. + * + * @param task The server to select a host for. + * @return The host to schedule the server on or `null` if no server is available. + */ + public fun select(task: ServiceTask): HostView? +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt new file mode 100644 index 00000000..ec3aedcb --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") + +package org.opendc.compute.simulator.scheduler + +import org.opendc.compute.simulator.scheduler.filters.ComputeFilter +import org.opendc.compute.simulator.scheduler.filters.RamFilter +import org.opendc.compute.simulator.scheduler.filters.VCpuFilter +import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher +import org.opendc.compute.simulator.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.simulator.scheduler.weights.RamWeigher +import org.opendc.compute.simulator.scheduler.weights.VCpuWeigher +import java.util.SplittableRandom +import java.util.random.RandomGenerator + +public enum class ComputeSchedulerEnum { + Mem, + MemInv, + CoreMem, + CoreMemInv, + ActiveServers, + ActiveServersInv, + ProvisionedCores, + ProvisionedCoresInv, + Random, + Replay, +} + +public fun createComputeScheduler( + name: String, + seeder: RandomGenerator, + placements: Map<String, String> = emptyMap(), +): ComputeScheduler { + return createComputeScheduler(ComputeSchedulerEnum.valueOf(name.uppercase()), seeder, placements) +} + +/** + * Create a [ComputeScheduler] for the experiment. + */ +public fun createComputeScheduler( + name: ComputeSchedulerEnum, + seeder: RandomGenerator, + placements: Map<String, String> = emptyMap(), +): ComputeScheduler { + val cpuAllocationRatio = 1.0 + val ramAllocationRatio = 1.5 + return when (name) { + ComputeSchedulerEnum.Mem -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)), + ) + ComputeSchedulerEnum.MemInv -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)), + ) + ComputeSchedulerEnum.CoreMem -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) + ComputeSchedulerEnum.CoreMemInv -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)), + ) + ComputeSchedulerEnum.ActiveServers -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)), + ) + ComputeSchedulerEnum.ActiveServersInv -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)), + ) + ComputeSchedulerEnum.ProvisionedCores -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)), + ) + ComputeSchedulerEnum.ProvisionedCoresInv -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)), + ) + ComputeSchedulerEnum.Random -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = SplittableRandom(seeder.nextLong()), + ) + ComputeSchedulerEnum.Replay -> ReplayScheduler(placements) + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt new file mode 100644 index 00000000..9fd3a862 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler + +import org.opendc.compute.simulator.scheduler.filters.HostFilter +import org.opendc.compute.simulator.scheduler.weights.HostWeigher +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask +import java.util.SplittableRandom +import java.util.random.RandomGenerator +import kotlin.math.min + +/** + * A [ComputeScheduler] implementation that uses filtering and weighing passes to select + * the host to schedule a [ServiceTask] on. + * + * This implementation is based on the filter scheduler from OpenStack Nova. + * See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html + * + * @param filters The list of filters to apply when searching for an appropriate host. + * @param weighers The list of weighers to apply when searching for an appropriate host. + * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen. + * @param random A [RandomGenerator] instance for selecting + */ +public class FilterScheduler( + private val filters: List<HostFilter>, + private val weighers: List<HostWeigher>, + private val subsetSize: Int = 1, + private val random: RandomGenerator = SplittableRandom(0), +) : ComputeScheduler { + /** + * The pool of hosts available to the scheduler. + */ + private val hosts = mutableListOf<HostView>() + + init { + require(subsetSize >= 1) { "Subset size must be one or greater" } + } + + override fun addHost(host: HostView) { + hosts.add(host) + } + + override fun removeHost(host: HostView) { + hosts.remove(host) + } + + override fun select(task: ServiceTask): HostView? { + val hosts = hosts + val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } } + + val subset = + if (weighers.isNotEmpty()) { + val results = weighers.map { it.getWeights(filteredHosts, task) } + val weights = DoubleArray(filteredHosts.size) + + for (result in results) { + val min = result.min + val range = (result.max - min) + + // Skip result if all weights are the same + if (range == 0.0) { + continue + } + + val multiplier = result.multiplier + val factor = multiplier / range + + for ((i, weight) in result.weights.withIndex()) { + weights[i] += factor * (weight - min) + } + } + + weights.indices + .asSequence() + .sortedByDescending { weights[it] } + .map { filteredHosts[it] } + .take(subsetSize) + .toList() + } else { + filteredHosts + } + + // fixme: currently finding no matching hosts can result in an error + return when (val maxSize = min(subsetSize, subset.size)) { + 0 -> null + 1 -> subset[0] + else -> subset[random.nextInt(maxSize)] + } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt new file mode 100644 index 00000000..43e366d9 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler + +import mu.KotlinLogging +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * Policy replaying VM-cluster assignment. + * + * Within each cluster, the active servers on each node determine which node gets + * assigned the VM image. + */ +public class ReplayScheduler(private val vmPlacements: Map<String, String>) : ComputeScheduler { + private val logger = KotlinLogging.logger {} + + /** + * The pool of hosts available to the scheduler. + */ + private val hosts = mutableListOf<HostView>() + + override fun addHost(host: HostView) { + hosts.add(host) + } + + override fun removeHost(host: HostView) { + hosts.remove(host) + } + + override fun select(task: ServiceTask): HostView? { + val clusterName = + vmPlacements[task.name] + ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${task.name}") + val machinesInCluster = hosts.filter { it.host.getName().contains(clusterName) } + + if (machinesInCluster.isEmpty()) { + logger.info { "Could not find any machines belonging to cluster $clusterName for image ${task.name}, assigning randomly." } + return hosts.maxByOrNull { it.availableMemory } + } + + return machinesInCluster.maxByOrNull { it.availableMemory } + ?: throw IllegalStateException("Cloud not find any machine and could not randomly assign") + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/ComputeFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/ComputeFilter.kt new file mode 100644 index 00000000..99a9390e --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/ComputeFilter.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.filters + +import org.opendc.compute.simulator.host.HostState +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostFilter] that filters on active hosts. + */ +public class ComputeFilter : HostFilter { + override fun test( + host: HostView, + task: ServiceTask, + ): Boolean { + val result = host.host.getState() == HostState.UP + return result + } + + override fun toString(): String = "ComputeFilter" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/DifferentHostFilter.kt index 412da37f..279a2717 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/DifferentHostFilter.kt @@ -20,27 +20,22 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.internal +package org.opendc.compute.simulator.scheduler.filters -import org.opendc.compute.api.Task -import org.opendc.compute.simulator.SimMetaWorkloadMapper -import org.opendc.compute.simulator.SimWorkloadMapper -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.compute.workload.SimWorkloads -import java.time.Duration +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask +import java.util.UUID /** - * A [SimWorkloadMapper] to introduces a boot delay of 1 ms. This object exists to retain the old behavior while - * introducing the possibility of adding custom boot delays. + * A [HostFilter] that ensures an instance is scheduled on a different host from a set of instances. */ -internal object DefaultWorkloadMapper : SimWorkloadMapper { - private val delegate = SimMetaWorkloadMapper() - - override fun createWorkload(task: Task): SimWorkload { - val workload = delegate.createWorkload(task) - - // FIXME: look at connecting this to frontend. This does currently not work correctly - val bootWorkload = SimWorkloads.runtime(Duration.ofMillis(0), 1.0, 0L, 0L) - return SimWorkloads.chain(bootWorkload, workload) +public class DifferentHostFilter : HostFilter { + override fun test( + host: HostView, + task: ServiceTask, + ): Boolean { + @Suppress("UNCHECKED_CAST") + val affinityUUIDs = task.meta["scheduler_hint:different_host"] as? Set<UUID> ?: return true + return host.host.getInstances().none { it.uid in affinityUUIDs } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/HostFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/HostFilter.kt new file mode 100644 index 00000000..bb9c1cbf --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/HostFilter.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.filters + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A filter used by the [FilterScheduler] to filter hosts. + */ +public fun interface HostFilter { + /** + * Test whether the specified [host] should be included in the selection + * for scheduling the specified [task]. + */ + public fun test( + host: HostView, + task: ServiceTask, + ): Boolean +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/InstanceCountFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/InstanceCountFilter.kt new file mode 100644 index 00000000..53d68acf --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/InstanceCountFilter.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.filters + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostFilter] that filters hosts based on the number of instances on the host. + * + * @param limit The maximum number of instances on the host. + */ +public class InstanceCountFilter(private val limit: Int) : HostFilter { + override fun test( + host: HostView, + task: ServiceTask, + ): Boolean { + return host.instanceCount < limit + } + + override fun toString(): String = "InstanceCountFilter[limit=$limit]" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/RamFilter.kt new file mode 100644 index 00000000..0b570d52 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/RamFilter.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.filters + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostFilter] that filters hosts based on the memory requirements of a [ServiceTask] and the RAM available on the host. + * + * @param allocationRatio Virtual RAM to physical RAM allocation ratio. + */ +public class RamFilter(private val allocationRatio: Double) : HostFilter { + override fun test( + host: HostView, + task: ServiceTask, + ): Boolean { + val requestedMemory = task.flavor.memorySize + val availableMemory = host.availableMemory + val memoryCapacity = host.host.getModel().memoryCapacity + + // Do not allow an instance to overcommit against itself, only against + // other instances. + if (requestedMemory > memoryCapacity) { + return false + } + + val limit = memoryCapacity * allocationRatio + val used = memoryCapacity - availableMemory + val usable = limit - used + + val result = usable >= requestedMemory + return result + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/SameHostFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/SameHostFilter.kt new file mode 100644 index 00000000..761b125d --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/SameHostFilter.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.filters + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask +import java.util.UUID + +/** + * A [HostFilter] that ensures an instance is scheduled on the same host as all other instances in a set of instances. + */ +public class SameHostFilter : HostFilter { + override fun test( + host: HostView, + task: ServiceTask, + ): Boolean { + @Suppress("UNCHECKED_CAST") + val affinityUUIDs = task.meta["scheduler_hint:same_host"] as? Set<UUID> ?: return true + return host.host.getInstances().any { it.uid in affinityUUIDs } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt new file mode 100644 index 00000000..256caa94 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.filters + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostFilter] that filters hosts based on the vCPU speed requirements of a [ServiceTask] and the available + * capacity on the host. + */ +public class VCpuCapacityFilter : HostFilter { + override fun test( + host: HostView, + task: ServiceTask, + ): Boolean { + val requiredCapacity = task.flavor.meta["cpu-capacity"] as? Double + val availableCapacity = host.host.getModel().cpuCapacity + + return requiredCapacity == null || availableCapacity >= (requiredCapacity / task.flavor.coreCount) + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuFilter.kt new file mode 100644 index 00000000..c179a7bf --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuFilter.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.filters + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostFilter] that filters hosts based on the vCPU requirements of a [ServiceTask] and the available vCPUs on the host. + * + * @param allocationRatio Virtual CPU to physical CPU allocation ratio. + */ +public class VCpuFilter(private val allocationRatio: Double) : HostFilter { + override fun test( + host: HostView, + task: ServiceTask, + ): Boolean { + val requested = task.flavor.coreCount + val totalCores = host.host.getModel().coreCount + val limit = totalCores * allocationRatio + + // Do not allow an instance to overcommit against itself, only against other instances + if (requested > totalCores) { + return false + } + + val availableCores = limit - host.provisionedCores + return availableCores >= requested + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt new file mode 100644 index 00000000..b6c43c10 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.weights + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostWeigher] that weighs the hosts based on the available memory per core on the host. + * + * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more + * available core memory, and a negative number will result in the scheduler preferring hosts with less available core + * memory. + */ +public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher { + override fun getWeight( + host: HostView, + task: ServiceTask, + ): Double { + return host.availableMemory.toDouble() + } + + override fun toString(): String = "CoreRamWeigher" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/HostWeigher.kt new file mode 100644 index 00000000..c1e0c590 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/HostWeigher.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.weights + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request. + */ +public interface HostWeigher { + /** + * The multiplier for the weigher. + */ + public val multiplier: Double + + /** + * Obtain the weight of the specified [host] when scheduling the specified [ServiceTask]. + */ + public fun getWeight( + host: HostView, + task: ServiceTask, + ): Double + + /** + * Obtain the weights for [hosts] when scheduling the specified [task]. + */ + public fun getWeights( + hosts: List<HostView>, + task: ServiceTask, + ): Result { + val weights = DoubleArray(hosts.size) + var min = Double.MAX_VALUE + var max = Double.MIN_VALUE + + for ((i, host) in hosts.withIndex()) { + val weight = getWeight(host, task) + weights[i] = weight + min = kotlin.math.min(min, weight) + max = kotlin.math.max(max, weight) + } + + return Result(weights, min, max, multiplier) + } + + /** + * A result returned by the weigher. + * + * @param weights The weights returned by the weigher. + * @param min The minimum weight returned. + * @param max The maximum weight returned. + * @param multiplier The weight multiplier to use. + */ + public class Result( + public val weights: DoubleArray, + public val min: Double, + public val max: Double, + public val multiplier: Double, + ) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/InstanceCountWeigher.kt new file mode 100644 index 00000000..9277c1ae --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/InstanceCountWeigher.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.weights + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostWeigher] that weighs the hosts based on the number of instances on the host. + */ +public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher { + override fun getWeight( + host: HostView, + task: ServiceTask, + ): Double { + return host.instanceCount.toDouble() + } + + override fun toString(): String = "InstanceCountWeigher" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/RamWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/RamWeigher.kt new file mode 100644 index 00000000..1cbfea59 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/RamWeigher.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.weights + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostWeigher] that weighs the hosts based on the available RAM (memory) on the host. + * + * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more + * available memory, and a negative number will result in the scheduler preferring hosts with less memory. + */ +public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher { + override fun getWeight( + host: HostView, + task: ServiceTask, + ): Double { + return host.availableMemory.toDouble() + } + + override fun toString(): String = "RamWeigher" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuCapacityWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuCapacityWeigher.kt new file mode 100644 index 00000000..4f52e11a --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuCapacityWeigher.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.weights + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostWeigher] that weighs the hosts based on the difference required vCPU capacity and the available CPU capacity. + */ +public class VCpuCapacityWeigher(override val multiplier: Double = 1.0) : HostWeigher { + override fun getWeight( + host: HostView, + task: ServiceTask, + ): Double { + val model = host.host.getModel() + val requiredCapacity = task.flavor.meta["cpu-capacity"] as? Double ?: 0.0 + return model.cpuCapacity - requiredCapacity / task.flavor.coreCount + } + + override fun toString(): String = "VCpuWeigher" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuWeigher.kt new file mode 100644 index 00000000..3f9a7f03 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuWeigher.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.weights + +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask + +/** + * A [HostWeigher] that weighs the hosts based on the remaining number of vCPUs available. + * + * @param allocationRatio Virtual CPU to physical CPU allocation ratio. + */ +public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher { + init { + require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" } + } + + override fun getWeight( + host: HostView, + task: ServiceTask, + ): Double { + return allocationRatio - host.provisionedCores + } + + override fun toString(): String = "VCpuWeigher" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt new file mode 100644 index 00000000..d5fb991d --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt @@ -0,0 +1,664 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.common.Dispatcher +import org.opendc.common.asCoroutineDispatcher +import org.opendc.compute.api.TaskState +import org.opendc.compute.carbon.CarbonTrace +import org.opendc.compute.simulator.host.SimHost +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.compute.simulator.telemetry.table.HostInfo +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader +import org.opendc.compute.simulator.telemetry.table.TaskInfo +import org.opendc.compute.simulator.telemetry.table.TaskTableReader +import java.time.Duration +import java.time.Instant + +/** + * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every + * export interval. + * + * @param dispatcher A [Dispatcher] for scheduling the future events. + * @param service The [ComputeService] to monitor. + * @param monitor The monitor to export the metrics to. + * @param exportInterval The export interval. + */ +public class ComputeMetricReader( + dispatcher: Dispatcher, + private val service: ComputeService, + private val monitor: ComputeMonitor, + private val exportInterval: Duration = Duration.ofMinutes(5), + private val startTime: Duration = Duration.ofMillis(0), + private val carbonTrace: CarbonTrace = CarbonTrace(null), +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) + private val clock = dispatcher.timeSource + + /** + * Aggregator for service metrics. + */ + private val serviceTableReader = + ServiceTableReaderImpl( + service, + startTime, + ) + + private var loggCounter = 0 + + /** + * Mapping from [Host] instances to [HostTableReaderImpl] + */ + private val hostTableReaders = mutableMapOf<SimHost, HostTableReaderImpl>() + + /** + * Mapping from [Task] instances to [TaskTableReaderImpl] + */ + private val taskTableReaders = mutableMapOf<ServiceTask, TaskTableReaderImpl>() + + /** + * The background job that is responsible for collecting the metrics every cycle. + */ + private val job = + scope.launch { + val intervalMs = exportInterval.toMillis() + try { + while (isActive) { + delay(intervalMs) + + loggState() + } + } finally { + loggState() + + if (monitor is AutoCloseable) { + monitor.close() + } + } + } + + private fun loggState() { + loggCounter++ + try { + val now = this.clock.instant() + + for (host in this.service.hosts) { + val reader = + this.hostTableReaders.computeIfAbsent(host) { + HostTableReaderImpl( + it, + startTime, + carbonTrace, + ) + } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } + + for (task in this.service.tasks) { + val reader = + this.taskTableReaders.computeIfAbsent(task) { + TaskTableReaderImpl( + service, + it, + startTime, + ) + } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } + + for (task in this.service.tasksToRemove) { + task.delete() + } + this.service.clearTasksToRemove() + + this.serviceTableReader.record(now) + monitor.record(this.serviceTableReader.copy()) + + if (loggCounter >= 100) { + var loggString = "\n\t\t\t\t\tMetrics after ${now.toEpochMilli() / 1000 / 60 / 60} hours:\n" + loggString += "\t\t\t\t\t\tTasks Total: ${this.serviceTableReader.tasksTotal}\n" + loggString += "\t\t\t\t\t\tTasks Active: ${this.serviceTableReader.tasksActive}\n" + loggString += "\t\t\t\t\t\tTasks Pending: ${this.serviceTableReader.tasksPending}\n" + loggString += "\t\t\t\t\t\tTasks Completed: ${this.serviceTableReader.tasksCompleted}\n" + loggString += "\t\t\t\t\t\tTasks Terminated: ${this.serviceTableReader.tasksTerminated}\n" + + this.logger.warn { loggString } + loggCounter = 0 + } + } catch (cause: Throwable) { + this.logger.warn(cause) { "Exporter threw an Exception" } + } + } + + override fun close() { + job.cancel() + } + + /** + * An aggregator for service metrics before they are reported. + */ + private class ServiceTableReaderImpl( + private val service: ComputeService, + private val startTime: Duration = Duration.ofMillis(0), + ) : ServiceTableReader { + override fun copy(): ServiceTableReader { + val newServiceTable = + ServiceTableReaderImpl( + service, + ) + newServiceTable.setValues(this) + + return newServiceTable + } + + override fun setValues(table: ServiceTableReader) { + _timestamp = table.timestamp + _timestampAbsolute = table.timestampAbsolute + + _hostsUp = table.hostsUp + _hostsDown = table.hostsDown + _tasksTotal = table.tasksTotal + _tasksPending = table.tasksPending + _tasksActive = table.tasksActive + _tasksCompleted = table.tasksCompleted + _tasksTerminated = table.tasksTerminated + _attemptsSuccess = table.attemptsSuccess + _attemptsFailure = table.attemptsFailure + } + + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + private var _timestampAbsolute: Instant = Instant.MIN + override val timestampAbsolute: Instant + get() = _timestampAbsolute + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val tasksTotal: Int + get() = _tasksTotal + private var _tasksTotal = 0 + + override val tasksPending: Int + get() = _tasksPending + private var _tasksPending = 0 + + override val tasksCompleted: Int + get() = _tasksCompleted + private var _tasksCompleted = 0 + + override val tasksActive: Int + get() = _tasksActive + private var _tasksActive = 0 + + override val tasksTerminated: Int + get() = _tasksTerminated + private var _tasksTerminated = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + _timestamp = now + _timestampAbsolute = now + startTime + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _tasksTotal = stats.tasksTotal + _tasksPending = stats.tasksPending + _tasksCompleted = stats.tasksCompleted + _tasksActive = stats.tasksActive + _tasksTerminated = stats.tasksTerminated + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostTableReaderImpl( + host: SimHost, + private val startTime: Duration = Duration.ofMillis(0), + private val carbonTrace: CarbonTrace = CarbonTrace(null), + ) : HostTableReader { + override fun copy(): HostTableReader { + val newHostTable = + HostTableReaderImpl(_host) + newHostTable.setValues(this) + + return newHostTable + } + + override fun setValues(table: HostTableReader) { + _timestamp = table.timestamp + _timestampAbsolute = table.timestampAbsolute + + _guestsTerminated = table.guestsTerminated + _guestsRunning = table.guestsRunning + _guestsError = table.guestsError + _guestsInvalid = table.guestsInvalid + _cpuLimit = table.cpuLimit + _cpuDemand = table.cpuDemand + _cpuUsage = table.cpuUsage + _cpuUtilization = table.cpuUtilization + _cpuActiveTime = table.cpuActiveTime + _cpuIdleTime = table.cpuIdleTime + _cpuStealTime = table.cpuStealTime + _cpuLostTime = table.cpuLostTime + _powerDraw = table.powerDraw + _energyUsage = table.energyUsage + _carbonIntensity = table.carbonIntensity + _carbonEmission = table.carbonEmission + _uptime = table.uptime + _downtime = table.downtime + _bootTime = table.bootTime + _bootTimeAbsolute = table.bootTimeAbsolute + } + + private val _host = host + + override val host: HostInfo = + HostInfo( + host.getUid().toString(), + host.getName(), + "x86", + host.getModel().coreCount, + host.getModel().cpuCapacity, + host.getModel().memoryCapacity, + ) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val timestampAbsolute: Instant + get() = _timestampAbsolute + private var _timestampAbsolute = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Float + get() = _cpuLimit + private var _cpuLimit = 0.0f + + override val cpuUsage: Float + get() = _cpuUsage + private var _cpuUsage = 0.0f + + override val cpuDemand: Float + get() = _cpuDemand + private var _cpuDemand = 0.0f + + override val cpuUtilization: Float + get() = _cpuUtilization + private var _cpuUtilization = 0.0f + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerDraw: Float + get() = _powerDraw + private var _powerDraw = 0.0f + + override val energyUsage: Float + get() = _energyUsage - previousEnergyUsage + private var _energyUsage = 0.0f + private var previousEnergyUsage = 0.0f + + override val carbonIntensity: Float + get() = _carbonIntensity + private var _carbonIntensity = 0.0f + + override val carbonEmission: Float + get() = _carbonEmission + private var _carbonEmission = 0.0f + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val bootTimeAbsolute: Instant? + get() = _bootTimeAbsolute + private var _bootTimeAbsolute: Instant? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _timestampAbsolute = now + startTime + + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerDraw = hostSysStats.powerDraw + _energyUsage = hostSysStats.energyUsage + _carbonIntensity = carbonTrace.getCarbonIntensity(timestampAbsolute) + + _carbonEmission = carbonIntensity * (energyUsage / 3600000.0f) // convert energy usage from J to kWh + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + _bootTime = hostSysStats.bootTime + startTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousEnergyUsage = _energyUsage + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0f + _cpuUsage = 0.0f + _cpuDemand = 0.0f + _cpuUtilization = 0.0f + + _powerDraw = 0.0f + _energyUsage = 0.0f + _carbonIntensity = 0.0f + _carbonEmission = 0.0f + } + } + + /** + * An aggregator for task metrics before they are reported. + */ + private class TaskTableReaderImpl( + private val service: ComputeService, + private val task: ServiceTask, + private val startTime: Duration = Duration.ofMillis(0), + ) : TaskTableReader { + override fun copy(): TaskTableReader { + val newTaskTable = + TaskTableReaderImpl( + service, + task, + ) + newTaskTable.setValues(this) + + return newTaskTable + } + + override fun setValues(table: TaskTableReader) { + host = table.host + + _timestamp = table.timestamp + _timestampAbsolute = table.timestampAbsolute + + _cpuLimit = table.cpuLimit + _cpuActiveTime = table.cpuActiveTime + _cpuIdleTime = table.cpuIdleTime + _cpuStealTime = table.cpuStealTime + _cpuLostTime = table.cpuLostTime + _uptime = table.uptime + _downtime = table.downtime + _provisionTime = table.provisionTime + _bootTime = table.bootTime + _bootTimeAbsolute = table.bootTimeAbsolute + + _creationTime = table.creationTime + _finishTime = table.finishTime + + _taskState = table.taskState + } + + /** + * The static information about this task. + */ + override val taskInfo = + TaskInfo( + task.uid.toString(), + task.name, + "vm", + "x86", + task.flavor.coreCount, + task.flavor.memorySize, + ) + + /** + * The [HostInfo] of the host on which the task is hosted. + */ + override var host: HostInfo? = null + private var _host: SimHost? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + private var _timestampAbsolute = Instant.MIN + override val timestampAbsolute: Instant + get() = _timestampAbsolute + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val creationTime: Instant? + get() = _creationTime + private var _creationTime: Instant? = null + + override val finishTime: Instant? + get() = _finishTime + private var _finishTime: Instant? = null + + override val cpuLimit: Float + get() = _cpuLimit + private var _cpuLimit = 0.0f + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val bootTimeAbsolute: Instant? + get() = _bootTimeAbsolute + private var _bootTimeAbsolute: Instant? = null + + override val taskState: TaskState? + get() = _taskState + private var _taskState: TaskState? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val newHost = service.lookupHost(task) + if (newHost != null && newHost.getUid() != _host?.getUid()) { + _host = newHost + host = + HostInfo( + newHost.getUid().toString(), + newHost.getName(), + "x86", + newHost.getModel().coreCount, + newHost.getModel().cpuCapacity, + newHost.getModel().memoryCapacity, + ) + } + + val cpuStats = _host?.getCpuStats(task) + val sysStats = _host?.getSystemStats(task) + + _timestamp = now + _timestampAbsolute = now + startTime + + _cpuLimit = cpuStats?.capacity ?: 0.0f + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = task.launchedAt + _bootTime = sysStats?.bootTime + _creationTime = task.createdAt + _finishTime = task.finishedAt + + _taskState = task.state + + if (sysStats != null) { + _bootTimeAbsolute = sysStats.bootTime + startTime + } else { + _bootTimeAbsolute = null + } + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0f + } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt new file mode 100644 index 00000000..534bcc09 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry + +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader +import org.opendc.compute.simulator.telemetry.table.TaskTableReader + +/** + * A monitor that tracks the metrics and events of the OpenDC Compute service. + */ +public interface ComputeMonitor { + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: TaskTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: HostTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: ServiceTableReader) {} +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt new file mode 100644 index 00000000..3f220ad1 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.parquet + +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.builtins.ListSerializer +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.encoding.encodeStructure +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonDecoder +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.jsonObject +import org.opendc.common.logger.logger +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader +import org.opendc.compute.simulator.telemetry.table.TaskTableReader +import org.opendc.trace.util.parquet.exporter.ColListSerializer +import org.opendc.trace.util.parquet.exporter.ExportColumn +import org.opendc.trace.util.parquet.exporter.Exportable +import org.opendc.trace.util.parquet.exporter.columnSerializer + +/** + * Aggregates the necessary settings to personalize the output + * parquet files for compute workloads. + * + * @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file. + * @param[taskExportColumns] the columns that will be included in the `task.parquet` raw output file. + * @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file. + */ +@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class) +public data class ComputeExportConfig( + public val hostExportColumns: Set<ExportColumn<HostTableReader>>, + public val taskExportColumns: Set<ExportColumn<TaskTableReader>>, + public val serviceExportColumns: Set<ExportColumn<ServiceTableReader>>, +) { + public constructor( + hostExportColumns: Collection<ExportColumn<HostTableReader>>, + taskExportColumns: Collection<ExportColumn<TaskTableReader>>, + serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>, + ) : this( + hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS, + taskExportColumns.toSet() + DfltTaskExportColumns.BASE_EXPORT_COLUMNS, + serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS, + ) + + /** + * @return formatted string representing the export config. + */ + public fun fmt(): String = + """ + | === Compute Export Config === + | Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')} + | Task columns : ${taskExportColumns.map { it.name }.toString().trim('[', ']')} + | Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')} + """.trimIndent() + + public companion object { + internal val LOG by logger() + + /** + * Force the jvm to load the default [ExportColumn]s relevant to compute export, + * so that they are available for deserialization. + */ + public fun loadDfltColumns() { + DfltHostExportColumns + DfltTaskExportColumns + DfltServiceExportColumns + } + + /** + * Config that includes all columns defined in [DfltHostExportColumns], + * [DfltTaskExportColumns], [DfltServiceExportColumns] among all other loaded + * columns for [HostTableReader], [TaskTableReader] and [ServiceTableReader]. + */ + public val ALL_COLUMNS: ComputeExportConfig by lazy { + ComputeExportConfig.Companion.loadDfltColumns() + ComputeExportConfig( + hostExportColumns = ExportColumn.getAllLoadedColumns(), + taskExportColumns = ExportColumn.getAllLoadedColumns(), + serviceExportColumns = ExportColumn.getAllLoadedColumns(), + ) + } + + /** + * A runtime [KSerializer] is needed for reasons explained in [columnSerializer] docs. + * + * This serializer makes use of reified column serializers for the 2 properties. + */ + internal object ComputeExportConfigSerializer : KSerializer<ComputeExportConfig> { + override val descriptor: SerialDescriptor = + buildClassSerialDescriptor("org.opendc.compute.telemetry.export.parquet.ComputeExportConfig") { + element( + "hostExportColumns", + ListSerializer(columnSerializer<HostTableReader>()).descriptor, + ) + element( + "taskExportColumns", + ListSerializer(columnSerializer<TaskTableReader>()).descriptor, + ) + element( + "serviceExportColumns", + ListSerializer(columnSerializer<ServiceTableReader>()).descriptor, + ) + } + + override fun deserialize(decoder: Decoder): ComputeExportConfig { + val jsonDec = + (decoder as? JsonDecoder) ?: let { + // Basically a recursive call with a JsonDecoder. + return json.decodeFromString(decoder.decodeString().trim('"')) + } + + // Loads the default columns so that they are available for deserialization. + ComputeExportConfig.Companion.loadDfltColumns() + val elem = jsonDec.decodeJsonElement().jsonObject + + val hostFields: List<ExportColumn<HostTableReader>> = elem["hostExportColumns"].toFieldList() + val taskFields: List<ExportColumn<TaskTableReader>> = elem["taskExportColumns"].toFieldList() + val serviceFields: List<ExportColumn<ServiceTableReader>> = elem["serviceExportColumns"].toFieldList() + + return ComputeExportConfig( + hostExportColumns = hostFields, + taskExportColumns = taskFields, + serviceExportColumns = serviceFields, + ) + } + + override fun serialize( + encoder: Encoder, + value: ComputeExportConfig, + ) { + encoder.encodeStructure(ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor) { + encodeSerializableElement( + ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor, + 0, + ColListSerializer(columnSerializer<HostTableReader>()), + value.hostExportColumns.toList(), + ) + encodeSerializableElement( + ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor, + 1, + ColListSerializer(columnSerializer<TaskTableReader>()), + value.taskExportColumns.toList(), + ) + encodeSerializableElement( + ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor, + 2, + ColListSerializer(columnSerializer<ServiceTableReader>()), + value.serviceExportColumns.toList(), + ) + } + } + } + } +} + +private val json = Json { ignoreUnknownKeys = true } + +private inline fun <reified T : Exportable> JsonElement?.toFieldList(): List<ExportColumn<T>> = + this?.let { + json.decodeFromJsonElement(ColListSerializer(columnSerializer<T>()), it) + }?.ifEmpty { + ComputeExportConfig.Companion.LOG.warn( + "deserialized list of export columns for exportable ${T::class.simpleName} " + + "produced empty list, falling back to all loaded columns", + ) + ExportColumn.getAllLoadedColumns<T>() + } ?: ExportColumn.getAllLoadedColumns<T>() diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt new file mode 100644 index 00000000..1b76da6b --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable. + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltHostExportColumns + * ... + * ``` + */ +public object DfltHostExportColumns { + public val TIMESTAMP: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val HOST_ID: ExportColumn<HostTableReader> = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + ) { Binary.fromString(it.host.id) } + + public val HOST_NAME: ExportColumn<HostTableReader> = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_name"), + ) { Binary.fromString(it.host.name) } + + public val CPU_COUNT: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("core_count"), + ) { it.host.coreCount } + + public val MEM_CAPACITY: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("mem_capacity"), + ) { it.host.memCapacity } + + public val GUESTS_TERMINATED: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("guests_terminated"), + ) { it.guestsTerminated } + + public val GUESTS_RUNNING: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("guests_running"), + ) { it.guestsRunning } + + public val GUESTS_ERROR: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("guests_error"), + ) { it.guestsError } + + public val GUESTS_INVALID: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("guests_invalid"), + ) { it.guestsInvalid } + + public val CPU_LIMIT: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("cpu_limit"), + ) { it.cpuLimit } + + public val CPU_USAGE: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("cpu_usage"), + ) { it.cpuUsage } + + public val CPU_DEMAND: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("cpu_demand"), + ) { it.cpuDemand } + + public val CPU_UTILIZATION: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("cpu_utilization"), + ) { it.cpuUtilization } + + public val CPU_TIME_ACTIVE: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_active"), + ) { it.cpuActiveTime } + + public val CPU_TIME_IDLE: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_idle"), + ) { it.cpuIdleTime } + + public val CPU_TIME_STEAL: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_steal"), + ) { it.cpuStealTime } + + public val CPU_TIME_LOST: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_lost"), + ) { it.cpuLostTime } + + public val POWER_DRAW: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("power_draw"), + ) { it.powerDraw } + + public val ENERGY_USAGE: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("energy_usage"), + ) { it.energyUsage } + + public val CARBON_INTENSITY: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("carbon_intensity"), + ) { it.carbonIntensity } + + public val CARBON_EMISSION: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("carbon_emission"), + ) { it.carbonEmission } + + public val UP_TIME: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("uptime"), + ) { it.uptime } + + public val DOWN_TIME: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("downtime"), + ) { it.downtime } + + public val BOOT_TIME: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.optional(INT64).named("boot_time"), + ) { it.bootTime?.toEpochMilli() } + + public val BOOT_TIME_ABS: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.optional(INT64).named("boot_time_absolute"), + ) { it.bootTimeAbsolute?.toEpochMilli() } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltServiceExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltServiceExportColumns.kt new file mode 100644 index 00000000..aa08e8ff --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltServiceExportColumns.kt @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.parquet + +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable. + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltServiceExportColumns + * ... + * ``` + */ +public object DfltServiceExportColumns { + public val TIMESTAMP: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val HOSTS_UP: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("hosts_up"), + ) { it.hostsUp } + + public val TASKS_PENDING: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("tasks_pending"), + ) { it.tasksPending } + + public val TASKS_TOTAL: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("tasks_total"), + ) { it.tasksTotal } + + public val TASKS_ACTIVE: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("tasks_active"), + ) { it.tasksActive } + + public val TASKS_COMPLETED: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("tasks_completed"), + ) { it.tasksCompleted } + + public val TASKS_FAILED: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("tasks_terminated"), + ) { it.tasksTerminated } + + public val ATTEMPTS_SUCCESS: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("attempts_success"), + ) { it.attemptsSuccess } + + public val ATTEMPTS_FAILURE: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("attempts_failure"), + ) { it.attemptsFailure } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt new file mode 100644 index 00000000..6658e444 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.simulator.telemetry.table.TaskTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltTaskExportColumns + * ... + * ``` + */ +public object DfltTaskExportColumns { + public val TIMESTAMP: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val TASK_ID: ExportColumn<TaskTableReader> = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("task_id"), + ) { Binary.fromString(it.taskInfo.id) } + + public val HOST_ID: ExportColumn<TaskTableReader> = + ExportColumn( + field = + Types.optional(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + ) { it.host?.id?.let { Binary.fromString(it) } } + + public val TASK_NAME: ExportColumn<TaskTableReader> = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("task_name"), + ) { Binary.fromString(it.taskInfo.name) } + + public val CPU_COUNT: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT32).named("cpu_count"), + ) { it.taskInfo.cpuCount } + + public val MEM_CAPACITY: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("mem_capacity"), + ) { it.taskInfo.memCapacity } + + public val CPU_LIMIT: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("cpu_limit"), + ) { it.cpuLimit } + + public val CPU_TIME_ACTIVE: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_active"), + ) { it.cpuActiveTime } + + public val CPU_TIME_IDLE: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_idle"), + ) { it.cpuIdleTime } + + public val CPU_TIME_STEAL: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_steal"), + ) { it.cpuStealTime } + + public val CPU_TIME_LOST: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_lost"), + ) { it.cpuLostTime } + + public val UP_TIME: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("uptime"), + ) { it.uptime } + + public val DOWN_TIME: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("downtime"), + ) { it.downtime } + + public val PROVISION_TIME: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.optional(INT64).named("provision_time"), + ) { it.provisionTime?.toEpochMilli() } + + public val BOOT_TIME: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.optional(INT64).named("boot_time"), + ) { it.bootTime?.toEpochMilli() } + + public val CREATION_TIME: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.optional(INT64).named("creation_time"), + ) { it.creationTime?.toEpochMilli() } + + public val FINISH_TIME: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.optional(INT64).named("finish_time"), + ) { it.finishTime?.toEpochMilli() } + + public val BOOT_TIME_ABS: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.optional(INT64).named("boot_time_absolute"), + ) { it.bootTimeAbsolute?.toEpochMilli() } + + public val TASK_STATE: ExportColumn<TaskTableReader> = + ExportColumn( + field = + Types.optional(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("task_state"), + ) { Binary.fromString(it.taskState?.name) } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt new file mode 100644 index 00000000..4cd920c4 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.parquet + +import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader +import org.opendc.compute.simulator.telemetry.table.TaskTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn +import org.opendc.trace.util.parquet.exporter.Exportable +import org.opendc.trace.util.parquet.exporter.Exporter +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetComputeMonitor( + private val hostExporter: Exporter<HostTableReader>, + private val taskExporter: Exporter<TaskTableReader>, + private val serviceExporter: Exporter<ServiceTableReader>, +) : ComputeMonitor, AutoCloseable { + override fun record(reader: HostTableReader) { + hostExporter.write(reader) + } + + override fun record(reader: TaskTableReader) { + taskExporter.write(reader) + } + + override fun record(reader: ServiceTableReader) { + serviceExporter.write(reader) + } + + override fun close() { + hostExporter.close() + taskExporter.close() + serviceExporter.close() + } + + public companion object { + /** + * Overloaded constructor with [ComputeExportConfig] as parameter. + * + * @param[base] parent pathname for output file. + * @param[partition] child pathname for output file. + * @param[bufferSize] size of the buffer used by the writer thread. + */ + public operator fun invoke( + base: File, + partition: String, + bufferSize: Int, + computeExportConfig: ComputeExportConfig, + ): ParquetComputeMonitor = + invoke( + base = base, + partition = partition, + bufferSize = bufferSize, + hostExportColumns = computeExportConfig.hostExportColumns, + taskExportColumns = computeExportConfig.taskExportColumns, + serviceExportColumns = computeExportConfig.serviceExportColumns, + ) + + /** + * Constructor that loads default [ExportColumn]s defined in + * [DfltHostExportColumns], [DfltTaskExportColumns], [DfltServiceExportColumns] + * in case optional parameters are omitted and all fields need to be retrieved. + * + * @param[base] parent pathname for output file. + * @param[partition] child pathname for output file. + * @param[bufferSize] size of the buffer used by the writer thread. + */ + public operator fun invoke( + base: File, + partition: String, + bufferSize: Int, + hostExportColumns: Collection<ExportColumn<HostTableReader>>? = null, + taskExportColumns: Collection<ExportColumn<TaskTableReader>>? = null, + serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>? = null, + ): ParquetComputeMonitor { + // Loads the fields in case they need to be retrieved if optional params are omitted. + ComputeExportConfig.loadDfltColumns() + + return ParquetComputeMonitor( + hostExporter = + Exporter( + outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, + columns = hostExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + taskExporter = + Exporter( + outputFile = File(base, "$partition/task.parquet").also { it.parentFile.mkdirs() }, + columns = taskExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + serviceExporter = + Exporter( + outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, + columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + ) + } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/README.md b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/README.md new file mode 100644 index 00000000..3baafed4 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/README.md @@ -0,0 +1,70 @@ +### Summary +Added output configuration, that can be defined in the scenario `.json` file, that allows to select which columns are to be included in the raw oputput files `host.parquet`, `task.parquet` and `service.parquet`. + +### Columns +The 'default' columns are defined in `DfltHostExportcolumns`, `DfltTaskExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be definied anywhere (`ExportColumn<Exportable>`) and it is going to be deserializable as long as it is loaded by the jvm. + +### Deserialization +Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex is provided, the default one is used. The default regex matches the column name in case-insensitive manner, either with `_` as in the name or with ` ` (blank space). + +###### E.g.: +***column name*** = "cpuModel\_count" +***default column regex*** = "\\s*(?:cpu_count|cpuModel count)\\s*" (case insensitive) +***matches*** = "cpuModel\_count", "cpuModel count", "CpU/_cOuNt" etc. + +### JSON Schema +```json +// scenario.json +{ + ... + "computeExportConfig": { + "type": "object", + "properties": { + "hostExportColumns": { "type": "array" }, + "taskExportColumns": { "type": "array" } , + "serviceExportColumns": { "type": "array" } , + "required": [ /* NONE REQUIRED */ ] + } + }, + ... + "required": [ + ... + // NOT REQUIRED + ] +} +``` + + +###### Bad Formatting Cases +- If a column name (and type) does not match any deserializable column, the entry is ignored and error message is logged. +- If an empty list of columns is provided or those that are provided were not deserializable, then all loaded columns for that `Exportable` are used, and a warning message is logged. +- If no list is provided, then all loaded columns for that `Exportable` are used. + + +### Example + +```json +// scenario.json +{ + ... + "computeExportConfig": { + "hostExportColumns": ["timestamp", "timestamp_absolute", "invalid-entry1", "guests_invalid"], + "taskExportColumns": ["invalid-entry2"], + "serviceExportColumns": ["timestamp", "tasks_active", "tasks_pending"] + }, + ... +``` + +```json +// console output +10:51:56.561 [ERROR] ColListSerializer - no match found for column "invalid-entry1", ignoring... +10:51:56.563 [ERROR] ColListSerializer - no match found for column "invalid-entry2", ignoring... +10:51:56.564 [WARN] ComputeExportConfig - deserialized list of export columns for exportable TaskTableReader produced empty list, falling back to all loaded columns +10:51:56.584 [INFO] ScenariosSpec - +| === Compute Export Config === +| Host columns : timestamp, timestamp_absolute, guests_invalid +| Task columns : timestamp, timestamp_absolute, task_id, task_name, cpu_count, mem_capacity, cpu_limit, cpu_time_active, cpu_time_idle, cpu_time_steal, cpu_time_lost, uptime, downtime, provision_time, boot_time, boot_time_absolute +| Service columns : timestamp, tasks_active, tasks_pending + +``` + diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostInfo.kt index 907f6acd..1f1b9522 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostInfo.kt @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.simulator - -import org.opendc.compute.api.Task -import org.opendc.simulator.compute.workload.SimWorkload +package org.opendc.compute.simulator.telemetry.table /** - * A [SimWorkloadMapper] that maps a [Task] to a workload via the meta-data. + * Information about a host exposed to the telemetry service. */ -public class SimMetaWorkloadMapper(private val key: String = "workload") : SimWorkloadMapper { - override fun createWorkload(task: Task): SimWorkload { - return requireNotNull(task.meta[key] ?: task.image.meta[key]) as SimWorkload - } -} +public data class HostInfo( + val id: String, + val name: String, + val arch: String, + val coreCount: Int, + val coreSpeed: Float, + val memCapacity: Long, +) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt new file mode 100644 index 00000000..5f09e7f5 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +import org.opendc.trace.util.parquet.exporter.Exportable +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface HostTableReader : Exportable { + public fun copy(): HostTableReader + + public fun setValues(table: HostTableReader) + + /** + * The [HostInfo] of the host to which the row belongs to. + */ + public val host: HostInfo + + /** + * The timestamp of the current entry of the reader relative to the start of the workload. + */ + public val timestamp: Instant + + /** + * The timestamp of the current entry of the reader. + */ + public val timestampAbsolute: Instant + + /** + * The number of guests that are in a terminated state. + */ + public val guestsTerminated: Int + + /** + * The number of guests that are in a running state. + */ + public val guestsRunning: Int + + /** + * The number of guests that are in an error state. + */ + public val guestsError: Int + + /** + * The number of guests that are in an unknown state. + */ + public val guestsInvalid: Int + + /** + * The capacity of the CPUs in the host (in MHz). + */ + public val cpuLimit: Float + + /** + * The usage of all CPUs in the host (in MHz). + */ + public val cpuUsage: Float + + /** + * The demand of all vCPUs of the guests (in MHz) + */ + public val cpuDemand: Float + + /** + * The CPU utilization of the host. + */ + public val cpuUtilization: Float + + /** + * The duration (in ms) that a CPU was active in the host. + */ + public val cpuActiveTime: Long + + /** + * The duration (in ms) that a CPU was idle in the host. + */ + public val cpuIdleTime: Long + + /** + * The duration (in ms) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in ms) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The current power draw of the host in W. + */ + public val powerDraw: Float + + /** + * The total energy consumption of the host since last sample in J. + */ + public val energyUsage: Float + + /** + * The current carbon intensity of the host in gCO2 / kW. + */ + public val carbonIntensity: Float + + /** + * The current carbon emission since the last deadline in g. + */ + public val carbonEmission: Float + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the host booted relative to the start of the workload. + */ + public val bootTime: Instant? + + /** + * The [Instant] at which the host booted. + */ + public val bootTimeAbsolute: Instant? +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceData.kt new file mode 100644 index 00000000..16c38297 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceData.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +import java.time.Instant + +/** + * A trace entry for the compute service. + */ +public data class ServiceData( + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val tasksTotal: Int, + val tasksPending: Int, + val tasksActive: Int, + val attemptsSuccess: Int, + val attemptsTerminated: Int, +) + +/** + * Convert a [ServiceTableReader] into a persistent object. + */ +public fun ServiceTableReader.toServiceData(): ServiceData { + return ServiceData( + timestamp, + hostsUp, + hostsDown, + tasksTotal, + tasksPending, + tasksActive, + attemptsSuccess, + attemptsFailure, + ) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt new file mode 100644 index 00000000..690dfe0a --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +import org.opendc.trace.util.parquet.exporter.Exportable +import java.time.Instant + +/** + * An interface that is used to read a row of a service trace entry. + */ +public interface ServiceTableReader : Exportable { + public fun copy(): ServiceTableReader + + public fun setValues(table: ServiceTableReader) + + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The timestamp of the current entry of the reader. + */ + public val timestampAbsolute: Instant + + /** + * The number of hosts that are up at this instant. + */ + public val hostsUp: Int + + /** + * The number of hosts that are down at this instant. + */ + public val hostsDown: Int + + /** + * The number of tasks that are registered with the compute service. + */ + public val tasksTotal: Int + + /** + * The number of tasks that are pending to be scheduled. + */ + public val tasksPending: Int + + /** + * The number of tasks that are currently active. + */ + public val tasksActive: Int + + /** + * The number of tasks that completed the tasks successfully + */ + public val tasksCompleted: Int + + /** + * The number of tasks that failed more times than allowed and are thus terminated + */ + public val tasksTerminated: Int + + /** + * The scheduling attempts that were successful. + */ + public val attemptsSuccess: Int + + /** + * The scheduling attempts that were unsuccessful due to client error. + */ + public val attemptsFailure: Int +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskInfo.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskInfo.kt new file mode 100644 index 00000000..6ff56541 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskInfo.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +/** + * Static information about a task exposed to the telemetry service. + */ +public data class TaskInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val cpuCount: Int, + val memCapacity: Long, +) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt new file mode 100644 index 00000000..bc6a4edd --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +import org.opendc.compute.api.TaskState +import org.opendc.compute.simulator.telemetry.parquet.DfltTaskExportColumns +import org.opendc.trace.util.parquet.exporter.Exportable +import java.time.Instant + +/** + * An interface that is used to read a row of a task trace entry. + */ +public interface TaskTableReader : Exportable { + public fun copy(): TaskTableReader + + public fun setValues(table: TaskTableReader) + + /** + * The timestamp of the current entry of the reader relative to the start of the workload. + */ + public val timestamp: Instant + + /** + * The timestamp of the current entry of the reader. + */ + public val timestampAbsolute: Instant + + /** + * The [TaskInfo] of the task to which the row belongs to. + */ + public val taskInfo: TaskInfo + + /** + * The [HostInfo] of the host on which the task is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the task was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the task booted relative to the start of the workload. + */ + public val bootTime: Instant? + + /** + * The [Instant] at which the task booted. + */ + public val bootTimeAbsolute: Instant? + + /** + * The [Instant] at which the task booted relative to the start of the workload. + */ + public val creationTime: Instant? + + /** + * The [Instant] at which the task booted relative to the start of the workload. + */ + public val finishTime: Instant? + + /** + * The capacity of the CPUs of Host on which the task is running (in MHz). + */ + public val cpuLimit: Float + + /** + * The duration (in seconds) that a CPU was active in the task. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the task. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The state of the task + */ + public val taskState: TaskState? +} + +// Loads the default export fields for deserialization whenever this file is loaded. +private val _ignore = DfltTaskExportColumns diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt deleted file mode 100644 index b5bc66a9..00000000 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ /dev/null @@ -1,393 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.suspendCancellableCoroutine -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.compute.api.Task -import org.opendc.compute.api.TaskState -import org.opendc.compute.api.TaskWatcher -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostListener -import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.compute.model.Cpu -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.workload.SimTrace -import org.opendc.simulator.compute.workload.SimTraceFragment -import org.opendc.simulator.flow2.FlowEngine -import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory -import org.opendc.simulator.kotlin.runSimulation -import java.time.Instant -import java.util.SplittableRandom -import java.util.UUID -import kotlin.coroutines.resume - -/** - * Basic test-suite for the hypervisor. - */ -internal class SimHostTest { - private lateinit var machineModel: MachineModel - - @BeforeEach - fun setUp() { - machineModel = - MachineModel( - Cpu( - 0, - 2, - 3200.0, - "Intel", - "Xeon", - "amd64", - ), - // memory - MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000 * 4), - ) - } - - /** - * Test a single virtual machine hosted by the hypervisor. - */ - @Test - fun testSingle() = - runSimulation { - val duration = 5 * 60L - - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - - val host = - SimHost( - uid = UUID.randomUUID(), - name = "test", - meta = emptyMap(), - timeSource, - machine, - hypervisor, - ) - val vmImage = - MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to - SimTrace.ofFragments( - SimTraceFragment(0, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 1000, duration * 1000, 3200.0, 2), - SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 3000, duration * 1000, 6500.0, 2), - ).createWorkload(0), - ), - ) - - val flavor = MockFlavor(2, 0) - - suspendCancellableCoroutine { cont -> - host.addListener( - object : HostListener { - private var finished = 0 - - override fun onStateChanged( - host: Host, - task: Task, - newState: TaskState, - ) { - if (newState == TaskState.TERMINATED && ++finished == 1) { - cont.resume(Unit) - } - } - }, - ) - val server = MockTask(UUID.randomUUID(), "a", flavor, vmImage) - host.spawn(server) - host.start(server) - } - - // Ensure last cycle is collected -// delay(1000L * duration) - host.close() - - val cpuStats = host.getCpuStats() - - assertAll( - { assertEquals(450000, cpuStats.activeTime, "Active time does not match") }, - { assertEquals(750000, cpuStats.idleTime, "Idle time does not match") }, - { assertEquals(4688, cpuStats.stealTime, "Steal time does not match") }, - { assertEquals(1200000, timeSource.millis()) }, - ) - } - - /** - * Test overcommitting of resources by the hypervisor. - */ - @Test - fun testOvercommitted() = - runSimulation { - val duration = 5 * 60L - - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - - val host = - SimHost( - uid = UUID.randomUUID(), - name = "test", - meta = emptyMap(), - timeSource, - machine, - hypervisor, - ) - val vmImageA = - MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to - SimTrace.ofFragments( - SimTraceFragment(0, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 1000, duration * 1000, 3200.0, 2), - SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 3000, duration * 1000, 6500.0, 2), - ).createWorkload(0), - ), - ) - val vmImageB = - MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to - SimTrace.ofFragments( - SimTraceFragment(0, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 1000, duration * 1000, 3200.0, 2), - SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 3000, duration * 1000, 6500.0, 2), - ).createWorkload(0), - ), - ) - - val flavor = MockFlavor(2, 0) - - coroutineScope { - suspendCancellableCoroutine { cont -> - host.addListener( - object : HostListener { - private var finished = 0 - - override fun onStateChanged( - host: Host, - task: Task, - newState: TaskState, - ) { - if (newState == TaskState.TERMINATED && ++finished == 2) { - cont.resume(Unit) - } - } - }, - ) - val serverA = MockTask(UUID.randomUUID(), "a", flavor, vmImageA) - host.spawn(serverA) - val serverB = MockTask(UUID.randomUUID(), "b", flavor, vmImageB) - host.spawn(serverB) - - host.start(serverA) - host.start(serverB) - } - } - - // Ensure last cycle is collected - delay(1000L * duration) - host.close() - - val cpuStats = host.getCpuStats() - - assertAll( - { assertEquals(600000, cpuStats.activeTime, "Active time does not match") }, - { assertEquals(900000, cpuStats.idleTime, "Idle time does not match") }, - { assertEquals(309375, cpuStats.stealTime, "Steal time does not match") }, - { assertEquals(1500000, timeSource.millis()) }, - ) - } - - /** - * Test failure of the host. - */ - @Test - fun testFailure() = - runSimulation { - val duration = 5 * 60L - - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - val host = - SimHost( - uid = UUID.randomUUID(), - name = "test", - meta = emptyMap(), - timeSource, - machine, - hypervisor, - ) - val image = - MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to - SimTrace.ofFragments( - SimTraceFragment(0, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 1000, duration * 1000, 3200.0, 2), - SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 3000, duration * 1000, 6500.0, 2), - ).createWorkload(0), - ), - ) - val flavor = MockFlavor(2, 0) - val server = MockTask(UUID.randomUUID(), "a", flavor, image) - - coroutineScope { - host.spawn(server) - host.start(server) - delay(5000L) - host.fail() - delay(duration * 1000) - host.recover() - - suspendCancellableCoroutine { cont -> - host.addListener( - object : HostListener { - override fun onStateChanged( - host: Host, - task: Task, - newState: TaskState, - ) { - if (newState == TaskState.TERMINATED) { - cont.resume(Unit) - } - } - }, - ) - } - } - - host.close() - // Ensure last cycle is collected - delay(1000L * duration) - - val cpuStats = host.getCpuStats() - val sysStats = host.getSystemStats() - val guestSysStats = host.getSystemStats(server) - - assertAll( - { assertEquals(755000, cpuStats.idleTime, "Idle time does not match") }, - { assertEquals(450000, cpuStats.activeTime, "Active time does not match") }, - { assertEquals(1205000, sysStats.uptime.toMillis(), "Uptime does not match") }, - { assertEquals(300000, sysStats.downtime.toMillis(), "Downtime does not match") }, - { assertEquals(1205000, guestSysStats.uptime.toMillis(), "Guest uptime does not match") }, - { assertEquals(300000, guestSysStats.downtime.toMillis(), "Guest downtime does not match") }, - ) - } - - private class MockFlavor( - override val coreCount: Int, - override val memorySize: Long, - ) : Flavor { - override val uid: UUID = UUID.randomUUID() - override val name: String = "test" - override val labels: Map<String, String> = emptyMap() - override val meta: Map<String, Any> = emptyMap() - - override fun delete() { - throw NotImplementedError() - } - - override fun reload() { - throw NotImplementedError() - } - } - - private class MockImage( - override val uid: UUID, - override val name: String, - override val labels: Map<String, String>, - override val meta: Map<String, Any>, - ) : Image { - override fun delete() { - throw NotImplementedError() - } - - override fun reload() { - throw NotImplementedError() - } - } - - private class MockTask( - override val uid: UUID, - override val name: String, - override val flavor: Flavor, - override val image: Image, - override val numFailures: Int = 10, - ) : Task { - override val labels: Map<String, String> = emptyMap() - - override val meta: Map<String, Any> = emptyMap() - - override val state: TaskState = TaskState.TERMINATED - - override val launchedAt: Instant? = null - - override fun start() {} - - override fun stop() {} - - override fun delete() {} - - override fun watch(watcher: TaskWatcher) {} - - override fun unwatch(watcher: TaskWatcher) {} - - override fun reload() {} - } -} |
