summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-simulator/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute/opendc-compute-simulator/src')
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostListener.java41
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostModel.java32
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostState.java43
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java652
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/HostView.java78
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java107
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceImage.java95
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java230
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java43
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestSystemStats.java35
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostCpuStats.java46
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostSystemStats.java50
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/SchedulerStats.java45
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt65
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt61
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt76
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt378
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/TaskWatcher.kt (renamed from opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt)19
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt369
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt247
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt6
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt16
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt10
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt24
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt6
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt49
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt118
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt111
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt65
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/ComputeFilter.kt42
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/DifferentHostFilter.kt (renamed from opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt)31
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/HostFilter.kt40
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/InstanceCountFilter.kt42
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/RamFilter.kt55
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/SameHostFilter.kt41
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt42
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuFilter.kt50
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt44
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/HostWeigher.kt80
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/InstanceCountWeigher.kt40
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/RamWeigher.kt43
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuCapacityWeigher.kt42
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuWeigher.kt46
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt664
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt47
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt192
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt195
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltServiceExportColumns.kt105
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt171
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt125
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/README.md70
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostInfo.kt (renamed from opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt)20
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt150
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceData.kt55
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt90
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskInfo.kt35
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt125
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt393
60 files changed, 5082 insertions, 1118 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostListener.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostListener.java
new file mode 100644
index 00000000..01acfa97
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostListener.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.host;
+
+import org.opendc.compute.api.TaskState;
+import org.opendc.compute.simulator.service.ServiceTask;
+
+/**
+ * Listener interface for events originating from a {@link SimHost}.
+ */
+public interface HostListener {
+ /**
+ * This method is invoked when the state of <code>task</code> on <code>host</code> changes.
+ */
+ default void onStateChanged(SimHost host, ServiceTask task, TaskState newState) {}
+
+ /**
+ * This method is invoked when the state of a {@link SimHost} has changed.
+ */
+ default void onStateChanged(SimHost host, HostState newState) {}
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostModel.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostModel.java
new file mode 100644
index 00000000..96236c5c
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostModel.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.host;
+
+/**
+ * Record describing the static machine properties of the host.
+ *
+ * @param cpuCapacity The total CPU capacity of the host in MHz.
+ * @param coreCount The number of logical processing cores available for this host.
+ * @param memoryCapacity The amount of memory available for this host in MB.
+ */
+public record HostModel(float cpuCapacity, int coreCount, long memoryCapacity) {}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostState.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostState.java
new file mode 100644
index 00000000..29fc8cb4
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/host/HostState.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.host;
+
+/**
+ * The state of a host.
+ */
+public enum HostState {
+ /**
+ * The host is up and able to host guests.
+ */
+ UP,
+
+ /**
+ * The host is in a (forced) down state and unable to host any guests.
+ */
+ DOWN,
+
+ /**
+ * The host is in an error state and unable to host any guests.
+ */
+ ERROR
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
new file mode 100644
index 00000000..84e23516
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
@@ -0,0 +1,652 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.service;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.InstantSource;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SplittableRandom;
+import java.util.UUID;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.opendc.common.Dispatcher;
+import org.opendc.common.util.Pacer;
+import org.opendc.compute.api.Flavor;
+import org.opendc.compute.api.Image;
+import org.opendc.compute.api.TaskState;
+import org.opendc.compute.simulator.host.HostListener;
+import org.opendc.compute.simulator.host.HostModel;
+import org.opendc.compute.simulator.host.HostState;
+import org.opendc.compute.simulator.host.SimHost;
+import org.opendc.compute.simulator.scheduler.ComputeScheduler;
+import org.opendc.compute.simulator.telemetry.SchedulerStats;
+import org.opendc.simulator.compute.workload.Workload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link ComputeService} hosts the API implementation of the OpenDC Compute Engine.
+ */
+public final class ComputeService implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ComputeService.class);
+
+ /**
+ * The {@link InstantSource} representing the clock tracking the (simulation) time.
+ */
+ private final InstantSource clock;
+
+ /**
+ * The {@link ComputeScheduler} responsible for placing the tasks onto hosts.
+ */
+ private final ComputeScheduler scheduler;
+
+ /**
+ * The {@link Pacer} used to pace the scheduling requests.
+ */
+ private final Pacer pacer;
+
+ /**
+ * The {@link SplittableRandom} used to generate the unique identifiers for the service resources.
+ */
+ private final SplittableRandom random = new SplittableRandom(0);
+
+ private final int maxNumFailures;
+
+ /**
+ * A flag to indicate that the service is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * A mapping from host to host view.
+ */
+ private final Map<SimHost, HostView> hostToView = new HashMap<>();
+
+ /**
+ * The available hypervisors.
+ */
+ private final Set<HostView> availableHosts = new HashSet<>();
+
+ /**
+ * The tasks that should be launched by the service.
+ */
+ private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>();
+
+ /**
+ * The active tasks in the system.
+ */
+ private final Map<ServiceTask, SimHost> activeTasks = new HashMap<>();
+
+ /**
+ * The active tasks in the system.
+ */
+ private final Map<ServiceTask, SimHost> completedTasks = new HashMap<>();
+
+ /**
+ * The registered flavors for this compute service.
+ */
+ private final Map<UUID, ServiceFlavor> flavorById = new HashMap<>();
+
+ private final List<ServiceFlavor> flavors = new ArrayList<>();
+
+ /**
+ * The registered images for this compute service.
+ */
+ private final Map<UUID, ServiceImage> imageById = new HashMap<>();
+
+ private final List<ServiceImage> images = new ArrayList<>();
+
+ /**
+ * The registered tasks for this compute service.
+ */
+ private final Map<UUID, ServiceTask> taskById = new HashMap<>();
+
+ private final List<ServiceTask> tasks = new ArrayList<>();
+
+ private final List<ServiceTask> tasksToRemove = new ArrayList<>();
+
+ /**
+ * A [HostListener] used to track the active tasks.
+ */
+ private final HostListener hostListener = new HostListener() {
+ @Override
+ public void onStateChanged(@NotNull SimHost host, @NotNull HostState newState) {
+ LOGGER.debug("Host {} state changed: {}", host, newState);
+
+ final HostView hv = hostToView.get(host);
+
+ if (hv != null) {
+ if (newState == HostState.UP) {
+ availableHosts.add(hv);
+ } else {
+ availableHosts.remove(hv);
+ }
+ }
+
+ // Re-schedule on the new machine
+ requestSchedulingCycle();
+ }
+
+ @Override
+ public void onStateChanged(@NotNull SimHost host, @NotNull ServiceTask task, @NotNull TaskState newState) {
+ if (task.getHost() != host) {
+ // This can happen when a task is rescheduled and started on another machine, while being deleted from
+ // the old machine.
+ return;
+ }
+
+ task.setState(newState);
+
+ if (newState == TaskState.COMPLETED || newState == TaskState.TERMINATED || newState == TaskState.FAILED) {
+ LOGGER.info("task {} {} {} finished", task.getUid(), task.getName(), task.getFlavor());
+
+ if (activeTasks.remove(task) != null) {
+ tasksActive--;
+ }
+
+ HostView hv = hostToView.get(host);
+ final ServiceFlavor flavor = task.getFlavor();
+ if (hv != null) {
+ hv.provisionedCores -= flavor.getCoreCount();
+ hv.instanceCount--;
+ hv.availableMemory += flavor.getMemorySize();
+ } else {
+ LOGGER.error("Unknown host {}", host);
+ }
+
+ task.setHost(null);
+ host.removeTask(task);
+
+ if (newState == TaskState.COMPLETED) {
+ tasksCompleted++;
+ }
+ if (newState == TaskState.TERMINATED) {
+ tasksTerminated++;
+ }
+
+ if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) {
+ tasksToRemove.add(task);
+ }
+
+ // Try to reschedule if needed
+ requestSchedulingCycle();
+ }
+ }
+ };
+
+ private int maxCores = 0;
+ private long maxMemory = 0L;
+ private long attemptsSuccess = 0L;
+ private long attemptsFailure = 0L;
+ private int tasksTotal = 0;
+ private int tasksPending = 0;
+ private int tasksActive = 0;
+ private int tasksTerminated = 0;
+ private int tasksCompleted = 0;
+
+ /**
+ * Construct a {@link ComputeService} instance.
+ */
+ public ComputeService(Dispatcher dispatcher, ComputeScheduler scheduler, Duration quantum, int maxNumFailures) {
+ this.clock = dispatcher.getTimeSource();
+ this.scheduler = scheduler;
+ this.pacer = new Pacer(dispatcher, quantum.toMillis(), (time) -> doSchedule());
+ this.maxNumFailures = maxNumFailures;
+ }
+
+ /**
+ * Create a new {@link Builder} instance.
+ */
+ public static Builder builder(Dispatcher dispatcher, ComputeScheduler scheduler) {
+ return new Builder(dispatcher, scheduler);
+ }
+
+ /**
+ * Create a new {@link ComputeClient} to control the compute service.
+ */
+ public ComputeClient newClient() {
+ if (isClosed) {
+ throw new IllegalStateException("Service is closed");
+ }
+ return new ComputeClient(this);
+ }
+
+ /**
+ * Return the {@link ServiceTask}s hosted by this service.
+ */
+ public List<ServiceTask> getTasks() {
+ return Collections.unmodifiableList(tasks);
+ }
+
+ /**
+ * Return the {@link ServiceTask}s hosted by this service.
+ */
+ public List<ServiceTask> getTasksToRemove() {
+ return Collections.unmodifiableList(tasksToRemove);
+ }
+
+ public void clearTasksToRemove() {
+ this.tasksToRemove.clear();
+ }
+
+ /**
+ * Add a {@link SimHost} to the scheduling pool of the compute service.
+ */
+ public void addHost(SimHost host) {
+ // Check if host is already known
+ if (hostToView.containsKey(host)) {
+ return;
+ }
+
+ HostView hv = new HostView(host);
+ HostModel model = host.getModel();
+
+ maxCores = Math.max(maxCores, model.coreCount());
+ maxMemory = Math.max(maxMemory, model.memoryCapacity());
+ hostToView.put(host, hv);
+
+ if (host.getState() == HostState.UP) {
+ availableHosts.add(hv);
+ }
+
+ scheduler.addHost(hv);
+ host.addListener(hostListener);
+ }
+
+ /**
+ * Remove a {@link SimHost} from the scheduling pool of the compute service.
+ */
+ public void removeHost(SimHost host) {
+ HostView view = hostToView.remove(host);
+ if (view != null) {
+ availableHosts.remove(view);
+ scheduler.removeHost(view);
+ host.removeListener(hostListener);
+ }
+ }
+
+ /**
+ * Lookup the {@link SimHost} that currently hosts the specified {@link ServiceTask}.
+ */
+ public SimHost lookupHost(ServiceTask task) {
+ return task.getHost();
+ }
+
+ /**
+ * Return the {@link SimHost}s that are registered with this service.
+ */
+ public Set<SimHost> getHosts() {
+ return Collections.unmodifiableSet(hostToView.keySet());
+ }
+
+ public InstantSource getClock() {
+ return this.clock;
+ }
+
+ /**
+ * Collect the statistics about the scheduler component of this service.
+ */
+ public SchedulerStats getSchedulerStats() {
+ return new SchedulerStats(
+ availableHosts.size(),
+ hostToView.size() - availableHosts.size(),
+ attemptsSuccess,
+ attemptsFailure,
+ tasksTotal,
+ tasksPending,
+ tasksActive,
+ tasksCompleted,
+ tasksTerminated);
+ }
+
+ @Override
+ public void close() {
+ if (isClosed) {
+ return;
+ }
+
+ isClosed = true;
+ pacer.cancel();
+ }
+
+ /**
+ * Enqueue the specified [task] to be scheduled onto a host.
+ */
+ SchedulingRequest schedule(ServiceTask task) {
+ LOGGER.debug("Enqueueing task {} to be assigned to host", task.getUid());
+
+ long now = clock.millis();
+ SchedulingRequest request = new SchedulingRequest(task, now);
+
+ task.launchedAt = Instant.ofEpochMilli(now);
+ taskQueue.add(request);
+ tasksPending++;
+ requestSchedulingCycle();
+ return request;
+ }
+
+ void delete(ServiceFlavor flavor) {
+ flavorById.remove(flavor.getUid());
+ flavors.remove(flavor);
+ }
+
+ void delete(ServiceImage image) {
+ imageById.remove(image.getUid());
+ images.remove(image);
+ }
+
+ void delete(ServiceTask task) {
+ completedTasks.remove(task);
+ taskById.remove(task.getUid());
+ tasks.remove(task);
+ }
+
+ /**
+ * Indicate that a new scheduling cycle is needed due to a change to the service's state.
+ */
+ private void requestSchedulingCycle() {
+ // Bail out in case the queue is empty.
+ if (taskQueue.isEmpty()) {
+ return;
+ }
+
+ pacer.enqueue();
+ }
+
+ /**
+ * Run a single scheduling iteration.
+ */
+ private void doSchedule() {
+ // reorder tasks
+
+ while (!taskQueue.isEmpty()) {
+ SchedulingRequest request = taskQueue.peek();
+
+ if (request.isCancelled) {
+ taskQueue.poll();
+ tasksPending--;
+ continue;
+ }
+
+ final ServiceTask task = request.task;
+
+ if (task.getNumFailures() >= maxNumFailures) {
+ LOGGER.warn("task {} has been terminated because it failed {} times", task, task.getNumFailures());
+
+ taskQueue.poll();
+ tasksPending--;
+ tasksTerminated++;
+ task.setState(TaskState.TERMINATED);
+ tasksToRemove.add(task);
+ continue;
+ }
+
+ final ServiceFlavor flavor = task.getFlavor();
+ final HostView hv = scheduler.select(request.task);
+
+ if (hv == null || !hv.getHost().canFit(task)) {
+ LOGGER.trace("Task {} selected for scheduling but no capacity available for it at the moment", task);
+
+ if (flavor.getMemorySize() > maxMemory || flavor.getCoreCount() > maxCores) {
+ // Remove the incoming image
+ taskQueue.poll();
+ tasksPending--;
+
+ LOGGER.warn("Failed to spawn {}: does not fit", task);
+
+ task.setState(TaskState.FAILED);
+ continue;
+ } else {
+ break;
+ }
+ }
+
+ SimHost host = hv.getHost();
+
+ // Remove request from queue
+ taskQueue.poll();
+ tasksPending--;
+
+ LOGGER.info("Assigned task {} to host {}", task, host);
+
+ try {
+ task.host = host;
+
+ host.spawn(task);
+ // host.start(task);
+
+ tasksActive++;
+ attemptsSuccess++;
+
+ hv.instanceCount++;
+ hv.provisionedCores += flavor.getCoreCount();
+ hv.availableMemory -= flavor.getMemorySize();
+
+ activeTasks.put(task, host);
+ } catch (Exception cause) {
+ LOGGER.error("Failed to deploy VM", cause);
+ attemptsFailure++;
+ }
+ }
+ }
+
+ /**
+ * Builder class for a {@link ComputeService}.
+ */
+ public static class Builder {
+ private final Dispatcher dispatcher;
+ private final ComputeScheduler computeScheduler;
+ private Duration quantum = Duration.ofSeconds(1);
+ private int maxNumFailures = 10;
+
+ Builder(Dispatcher dispatcher, ComputeScheduler computeScheduler) {
+ this.dispatcher = dispatcher;
+ this.computeScheduler = computeScheduler;
+ }
+
+ /**
+ * Set the scheduling quantum of the service.
+ */
+ public Builder withQuantum(Duration quantum) {
+ this.quantum = quantum;
+ return this;
+ }
+
+ public Builder withMaxNumFailures(int maxNumFailures) {
+ this.maxNumFailures = maxNumFailures;
+ return this;
+ }
+
+ /**
+ * Build a {@link ComputeService}.
+ */
+ public ComputeService build() {
+ return new ComputeService(dispatcher, computeScheduler, quantum, maxNumFailures);
+ }
+ }
+
+ /**
+ * Implementation of {@link ComputeClient} using a {@link ComputeService}.
+ */
+ public static class ComputeClient {
+ private final ComputeService service;
+ private boolean isClosed;
+
+ ComputeClient(ComputeService service) {
+ this.service = service;
+ }
+
+ /**
+ * Method to check if the client is still open and throw an exception if it is not.
+ */
+ private void checkOpen() {
+ if (isClosed) {
+ throw new IllegalStateException("Client is already closed");
+ }
+ }
+
+ @NotNull
+ public List<Flavor> queryFlavors() {
+ checkOpen();
+ return new ArrayList<>(service.flavors);
+ }
+
+ public Flavor findFlavor(@NotNull UUID id) {
+ checkOpen();
+
+ return service.flavorById.get(id);
+ }
+
+ @NotNull
+ public Flavor newFlavor(@NotNull String name, int cpuCount, long memorySize, @NotNull Map<String, ?> meta) {
+ checkOpen();
+
+ final ComputeService service = this.service;
+ UUID uid = new UUID(service.clock.millis(), service.random.nextLong());
+ ServiceFlavor flavor = new ServiceFlavor(service, uid, name, cpuCount, memorySize, meta);
+
+ service.flavorById.put(uid, flavor);
+ service.flavors.add(flavor);
+
+ return flavor;
+ }
+
+ @NotNull
+ public List<Image> queryImages() {
+ checkOpen();
+
+ return new ArrayList<>(service.images);
+ }
+
+ public Image findImage(@NotNull UUID id) {
+ checkOpen();
+
+ return service.imageById.get(id);
+ }
+
+ public Image newImage(@NotNull String name) {
+ return newImage(name, Collections.emptyMap(), Collections.emptyMap());
+ }
+
+ @NotNull
+ public Image newImage(@NotNull String name, @NotNull Map<String, String> labels, @NotNull Map<String, ?> meta) {
+ checkOpen();
+
+ final ComputeService service = this.service;
+ UUID uid = new UUID(service.clock.millis(), service.random.nextLong());
+
+ ServiceImage image = new ServiceImage(service, uid, name, labels, meta);
+
+ service.imageById.put(uid, image);
+ service.images.add(image);
+
+ return image;
+ }
+
+ @NotNull
+ public ServiceTask newTask(
+ @NotNull String name,
+ @NotNull Flavor flavor,
+ @NotNull Workload workload,
+ @NotNull Map<String, ?> meta) {
+ checkOpen();
+
+ final ComputeService service = this.service;
+ UUID uid = new UUID(service.clock.millis(), service.random.nextLong());
+
+ final ServiceFlavor internalFlavor =
+ Objects.requireNonNull(service.flavorById.get(flavor.getUid()), "Unknown flavor");
+
+ ServiceTask task = new ServiceTask(service, uid, name, internalFlavor, workload, meta);
+
+ service.taskById.put(uid, task);
+ service.tasks.add(task);
+
+ service.tasksTotal++;
+
+ task.start();
+
+ return task;
+ }
+
+ @Nullable
+ public ServiceTask findTask(@NotNull UUID id) {
+ checkOpen();
+ return service.taskById.get(id);
+ }
+
+ @NotNull
+ public List<ServiceTask> queryTasks() {
+ checkOpen();
+
+ return new ArrayList<>(service.tasks);
+ }
+
+ public void close() {
+ isClosed = true;
+ }
+
+ @Override
+ public String toString() {
+ return "ComputeService.Client";
+ }
+
+ @Nullable
+ public void rescheduleTask(@NotNull ServiceTask task, @NotNull Workload workload) {
+ ServiceTask internalTask = findTask(task.getUid());
+ // SimHost from = service.lookupHost(internalTask);
+
+ // from.delete(internalTask);
+
+ internalTask.host = null;
+
+ internalTask.setWorkload(workload);
+ internalTask.start();
+ }
+ }
+
+ /**
+ * A request to schedule a {@link ServiceTask} onto one of the {@link SimHost}s.
+ */
+ static class SchedulingRequest {
+ final ServiceTask task;
+ final long submitTime;
+
+ boolean isCancelled;
+
+ SchedulingRequest(ServiceTask task, long submitTime) {
+ this.task = task;
+ this.submitTime = submitTime;
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/HostView.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/HostView.java
new file mode 100644
index 00000000..f4aa9c70
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/HostView.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.service;
+
+import org.opendc.compute.simulator.host.SimHost;
+
+/**
+ * A view of a {@link SimHost} as seen from the {@link ComputeService}.
+ */
+public class HostView {
+ private final SimHost host;
+ int instanceCount;
+ long availableMemory;
+ int provisionedCores;
+
+ /**
+ * Construct a {@link HostView} instance.
+ *
+ * @param host The host to create a view of.
+ */
+ public HostView(SimHost host) {
+ this.host = host;
+ this.availableMemory = host.getModel().memoryCapacity();
+ }
+
+ /**
+ * The {@link SimHost} this is a view of.
+ */
+ public SimHost getHost() {
+ return host;
+ }
+
+ /**
+ * Return the number of instances on this host.
+ */
+ public int getInstanceCount() {
+ return instanceCount;
+ }
+
+ /**
+ * Return the available memory of the host.
+ */
+ public long getAvailableMemory() {
+ return availableMemory;
+ }
+
+ /**
+ * Return the provisioned cores on the host.
+ */
+ public int getProvisionedCores() {
+ return provisionedCores;
+ }
+
+ @Override
+ public String toString() {
+ return "HostView[host=" + host + "]";
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java
new file mode 100644
index 00000000..eddde87e
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.service;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.NotNull;
+import org.opendc.compute.api.Flavor;
+
+/**
+ * Implementation of {@link Flavor} provided by {@link ComputeService}.
+ */
+public final class ServiceFlavor implements Flavor {
+ private final ComputeService service;
+ private final UUID uid;
+ private final String name;
+ private final int coreCount;
+ private final long memorySize;
+ private final Map<String, ?> meta;
+
+ ServiceFlavor(ComputeService service, UUID uid, String name, int coreCount, long memorySize, Map<String, ?> meta) {
+ this.service = service;
+ this.uid = uid;
+ this.name = name;
+ this.coreCount = coreCount;
+ this.memorySize = memorySize;
+ this.meta = meta;
+ }
+
+ @Override
+ public int getCoreCount() {
+ return coreCount;
+ }
+
+ @Override
+ public long getMemorySize() {
+ return memorySize;
+ }
+
+ @NotNull
+ @Override
+ public UUID getUid() {
+ return uid;
+ }
+
+ @NotNull
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @NotNull
+ @Override
+ public Map<String, Object> getMeta() {
+ return Collections.unmodifiableMap(meta);
+ }
+
+ @Override
+ public void reload() {
+ // No-op: this object is the source-of-truth
+ }
+
+ @Override
+ public void delete() {
+ service.delete(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ServiceFlavor flavor = (ServiceFlavor) o;
+ return service.equals(flavor.service) && uid.equals(flavor.uid);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(service, uid);
+ }
+
+ @Override
+ public String toString() {
+ return "Flavor[uid=" + uid + ",name=" + name + "]";
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceImage.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceImage.java
new file mode 100644
index 00000000..dffa4356
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceImage.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.service;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.NotNull;
+import org.opendc.compute.api.Image;
+
+/**
+ * Implementation of {@link Image} provided by {@link ComputeService}.
+ */
+public final class ServiceImage implements Image {
+ private final ComputeService service;
+ private final UUID uid;
+ private final String name;
+ private final Map<String, String> labels;
+ private final Map<String, ?> meta;
+
+ ServiceImage(ComputeService service, UUID uid, String name, Map<String, String> labels, Map<String, ?> meta) {
+ this.service = service;
+ this.uid = uid;
+ this.name = name;
+ this.labels = labels;
+ this.meta = meta;
+ }
+
+ @NotNull
+ @Override
+ public UUID getUid() {
+ return uid;
+ }
+
+ @NotNull
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @NotNull
+ @Override
+ public Map<String, Object> getMeta() {
+ return Collections.unmodifiableMap(meta);
+ }
+
+ @Override
+ public void reload() {
+ // No-op: this object is the source-of-truth
+ }
+
+ @Override
+ public void delete() {
+ service.delete(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ServiceImage image = (ServiceImage) o;
+ return service.equals(image.service) && uid.equals(image.uid);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(service, uid);
+ }
+
+ @Override
+ public String toString() {
+ return "Image[uid=" + uid + ",name=" + name + "]";
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java
new file mode 100644
index 00000000..f39142eb
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.service;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.opendc.compute.api.TaskState;
+import org.opendc.compute.simulator.TaskWatcher;
+import org.opendc.compute.simulator.host.SimHost;
+import org.opendc.simulator.compute.workload.Workload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ServiceTask} provided by {@link ComputeService}.
+ */
+public class ServiceTask {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTask.class);
+
+ private final ComputeService service;
+ private final UUID uid;
+
+ private final String name;
+ private final ServiceFlavor flavor;
+ public Workload workload;
+
+ private Map<String, ?> meta; // TODO: remove this
+
+ private final List<TaskWatcher> watchers = new ArrayList<>();
+ private TaskState state = TaskState.CREATED;
+ Instant launchedAt = null;
+ Instant createdAt;
+ Instant finishedAt;
+ SimHost host = null;
+ private ComputeService.SchedulingRequest request = null;
+
+ private int numFailures = 0;
+
+ ServiceTask(
+ ComputeService service,
+ UUID uid,
+ String name,
+ ServiceFlavor flavor,
+ Workload workload,
+ Map<String, ?> meta) {
+ this.service = service;
+ this.uid = uid;
+ this.name = name;
+ this.flavor = flavor;
+ this.workload = workload;
+ this.meta = meta;
+
+ this.createdAt = this.service.getClock().instant();
+ }
+
+ @NotNull
+ public UUID getUid() {
+ return uid;
+ }
+
+ @NotNull
+ public String getName() {
+ return name;
+ }
+
+ @NotNull
+ public ServiceFlavor getFlavor() {
+ return flavor;
+ }
+
+ @NotNull
+ public Map<String, Object> getMeta() {
+ return Collections.unmodifiableMap(meta);
+ }
+
+ public void setWorkload(Workload newWorkload) {
+ this.workload = newWorkload;
+ }
+
+ @NotNull
+ public TaskState getState() {
+ return state;
+ }
+
+ @Nullable
+ public Instant getLaunchedAt() {
+ return launchedAt;
+ }
+
+ @Nullable
+ public Instant getCreatedAt() {
+ return createdAt;
+ }
+
+ @Nullable
+ public Instant getFinishedAt() {
+ return finishedAt;
+ }
+
+ /**
+ * Return the {@link SimHost} on which the task is running or <code>null</code> if it is not running on a host.
+ */
+ public SimHost getHost() {
+ return host;
+ }
+
+ public void setHost(SimHost host) {
+ this.host = host;
+ }
+
+ public int getNumFailures() {
+ return this.numFailures;
+ }
+
+ public void start() {
+ switch (state) {
+ case PROVISIONING:
+ LOGGER.debug("User tried to start task but request is already pending: doing nothing");
+ case RUNNING:
+ LOGGER.debug("User tried to start task but task is already running");
+ break;
+ case COMPLETED:
+ case TERMINATED:
+ LOGGER.warn("User tried to start deleted task");
+ throw new IllegalStateException("Task is deleted");
+ case CREATED:
+ LOGGER.info("User requested to start task {}", uid);
+ setState(TaskState.PROVISIONING);
+ assert request == null : "Scheduling request already active";
+ request = service.schedule(this);
+ break;
+ case FAILED:
+ LOGGER.info("User requested to start task after failure {}", uid);
+ setState(TaskState.PROVISIONING);
+ request = service.schedule(this);
+ break;
+ }
+ }
+
+ public void watch(@NotNull TaskWatcher watcher) {
+ watchers.add(watcher);
+ }
+
+ public void unwatch(@NotNull TaskWatcher watcher) {
+ watchers.remove(watcher);
+ }
+
+ public void delete() {
+ cancelProvisioningRequest();
+ final SimHost host = this.host;
+ if (host != null) {
+ host.delete(this);
+ }
+ service.delete(this);
+
+ this.setState(TaskState.DELETED);
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ServiceTask task = (ServiceTask) o;
+ return service.equals(task.service) && uid.equals(task.uid);
+ }
+
+ public int hashCode() {
+ return Objects.hash(service, uid);
+ }
+
+ public String toString() {
+ return "Task[uid=" + uid + ",name=" + name + ",state=" + state + "]";
+ }
+
+ void setState(TaskState newState) {
+ if (this.state == newState) {
+ return;
+ }
+
+ for (TaskWatcher watcher : watchers) {
+ watcher.onStateChanged(this, newState);
+ }
+ if (newState == TaskState.FAILED) {
+ this.numFailures++;
+ }
+
+ if ((newState == TaskState.COMPLETED) || newState == TaskState.FAILED) {
+ this.finishedAt = this.service.getClock().instant();
+ }
+
+ this.state = newState;
+ }
+
+ /**
+ * Cancel the provisioning request if active.
+ */
+ private void cancelProvisioningRequest() {
+ final ComputeService.SchedulingRequest request = this.request;
+ if (request != null) {
+ this.request = null;
+ request.isCancelled = true;
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java
new file mode 100644
index 00000000..ea37f5f2
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry;
+
+/**
+ * Statistics about the CPUs of a guest.
+ *
+ * @param activeTime The cumulative time (in seconds) that the CPUs of the guest were actively running.
+ * @param idleTime The cumulative time (in seconds) the CPUs of the guest were idle.
+ * @param stealTime The cumulative CPU time (in seconds) that the guest was ready to run, but not granted time by the host.
+ * @param lostTime The cumulative CPU time (in seconds) that was lost due to interference with other machines.
+ * @param capacity The available CPU capacity of the guest (in MHz).
+ * @param usage Amount of CPU resources (in MHz) actually used by the guest.
+ * @param utilization The utilization of the CPU resources (in %) relative to the total CPU capacity.
+ */
+public record GuestCpuStats(
+ long activeTime,
+ long idleTime,
+ long stealTime,
+ long lostTime,
+ float capacity,
+ float usage,
+ float utilization) {}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestSystemStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestSystemStats.java
new file mode 100644
index 00000000..0d51e223
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestSystemStats.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry;
+
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * System-level statistics of a guest.
+ *
+ * @param uptime The cumulative uptime of the guest since last boot (in ms).
+ * @param downtime The cumulative downtime of the guest since last boot (in ms).
+ * @param bootTime The time at which the guest booted.
+ */
+public record GuestSystemStats(Duration uptime, Duration downtime, Instant bootTime) {}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostCpuStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostCpuStats.java
new file mode 100644
index 00000000..3f2aab78
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostCpuStats.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry;
+
+/**
+ * Statistics about the CPUs of a host.
+ *
+ * @param activeTime The cumulative time (in seconds) that the CPUs of the host were actively running.
+ * @param idleTime The cumulative time (in seconds) the CPUs of the host were idle.
+ * @param stealTime The cumulative CPU time (in seconds) that virtual machines were ready to run, but were not able to.
+ * @param lostTime The cumulative CPU time (in seconds) that was lost due to interference between virtual machines.
+ * @param capacity The available CPU capacity of the host (in MHz).
+ * @param demand Amount of CPU resources (in MHz) the guests would use if there were no CPU contention or CPU
+ * limits.
+ * @param usage Amount of CPU resources (in MHz) actually used by the host.
+ * @param utilization The utilization of the CPU resources (in %) relative to the total CPU capacity.
+ */
+public record HostCpuStats(
+ long activeTime,
+ long idleTime,
+ long stealTime,
+ long lostTime,
+ float capacity,
+ float demand,
+ float usage,
+ float utilization) {}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostSystemStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostSystemStats.java
new file mode 100644
index 00000000..353e62fa
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/HostSystemStats.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry;
+
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * System-level statistics of a host.
+ *
+ * @param uptime The cumulative uptime of the host since last boot (in ms).
+ * @param downtime The cumulative downtime of the host since last boot (in ms).
+ * @param bootTime The time at which the task started.
+ * @param powerDraw Instantaneous power draw of the system (in W).
+ * @param energyUsage The cumulative energy usage of the system (in J).
+ * @param guestsTerminated The number of guests that are in a terminated state.
+ * @param guestsRunning The number of guests that are in a running state.
+ * @param guestsError The number of guests that are in an error state.
+ * @param guestsInvalid The number of guests that are in an unknown state.
+ */
+public record HostSystemStats(
+ Duration uptime,
+ Duration downtime,
+ Instant bootTime,
+ float powerDraw,
+ float energyUsage,
+ int guestsTerminated,
+ int guestsRunning,
+ int guestsError,
+ int guestsInvalid) {}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/SchedulerStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/SchedulerStats.java
new file mode 100644
index 00000000..9d44a4b8
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/SchedulerStats.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry;
+
+/**
+ * Statistics about the scheduling component of the [ComputeService].
+ *
+ * @param hostsAvailable The number of hosts currently available for scheduling.
+ * @param hostsUnavailable The number of hosts unavailable for scheduling.
+ * @param attemptsSuccess Scheduling attempts that resulted into an allocation onto a host.
+ * @param attemptsFailure The number of failed scheduling attempt due to any reason
+ * @param tasksTotal The number of tasks registered with the service.
+ * @param tasksPending The number of tasks that are pending to be scheduled.
+ * @param tasksActive The number of tasks that are currently managed by the service and running.
+ */
+public record SchedulerStats(
+ int hostsAvailable,
+ int hostsUnavailable,
+ long attemptsSuccess,
+ long attemptsFailure,
+ int tasksTotal,
+ int tasksPending,
+ int tasksActive,
+ int tasksCompleted,
+ int tasksTerminated) {}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt
deleted file mode 100644
index ca72c910..00000000
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-/**
- * A mutable [ServiceRegistry].
- */
-public interface MutableServiceRegistry : ServiceRegistry {
- /**
- * Register [service] for the specified [name] in this registry.
- *
- * @param name The name of the service to register, which should follow the rules for domain names as defined by
- * DNS.
- * @param type The interface provided by the service.
- * @param service A reference to the actual implementation of the service.
- */
- public fun <T : Any> register(
- name: String,
- type: Class<T>,
- service: T,
- )
-
- /**
- * Remove the service with [name] and [type] from this registry.
- *
- * @param name The name of the service to remove, which should follow the rules for domain names as defined by DNS.
- * @param type The type of the service to remove.
- */
- public fun remove(
- name: String,
- type: Class<*>,
- )
-
- /**
- * Remove all services registered with [name].
- *
- * @param name The name of the services to remove, which should follow the rules for domain names as defined by DNS.
- */
- public fun remove(name: String)
-
- /**
- * Create a copy of the registry.
- */
- public override fun clone(): MutableServiceRegistry
-}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt
index 5a4bced1..e2f6c9d0 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt
@@ -23,26 +23,53 @@
package org.opendc.compute.simulator
/**
- * A read-only registry of services used during experiments to resolve services.
- *
- * The service registry is similar conceptually to the Domain Name System (DNS), which is a naming system used to
- * identify computers reachable via the Internet. The service registry should be used in a similar fashion.
+ * Implementation of the [ServiceRegistry] interface.
*/
-public interface ServiceRegistry {
- /**
- * Lookup the service with the specified [name] and [type].
- *
- * @param name The name of the service to resolve, which should follow the rules for domain names as defined by DNS.
- * @param type The type of the service to resolve, identified by the interface that is implemented by the service.
- * @return The service with specified [name] and implementing [type] or `null` if it does not exist.
- */
+public class ServiceRegistry(private val registry: MutableMap<String, MutableMap<Class<*>, Any>> = mutableMapOf()) {
public fun <T : Any> resolve(
name: String,
type: Class<T>,
- ): T?
+ ): T? {
+ val servicesForName = registry[name] ?: return null
+
+ @Suppress("UNCHECKED_CAST")
+ return servicesForName[type] as T?
+ }
+
+ public fun <T : Any> register(
+ name: String,
+ type: Class<T>,
+ service: T,
+ ) {
+ val services = registry.computeIfAbsent(name) { mutableMapOf() }
+
+ if (type in services) {
+ throw IllegalStateException("Duplicate service $type registered for name $name")
+ }
+
+ services[type] = service
+ }
+
+ public fun remove(
+ name: String,
+ type: Class<*>,
+ ) {
+ val services = registry[name] ?: return
+ services.remove(type)
+ }
+
+ public fun remove(name: String) {
+ registry.remove(name)
+ }
+
+ public fun clone(): ServiceRegistry {
+ val res = mutableMapOf<String, MutableMap<Class<*>, Any>>()
+ registry.mapValuesTo(res) { (_, v) -> v.toMutableMap() }
+ return ServiceRegistry(res)
+ }
- /**
- * Create a copy of the registry.
- */
- public fun clone(): ServiceRegistry
+ override fun toString(): String {
+ val entries = registry.map { "${it.key}=${it.value}" }.joinToString()
+ return "ServiceRegistry{$entries}"
+ }
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt
deleted file mode 100644
index bf3ee43f..00000000
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-/**
- * Implementation of the [MutableServiceRegistry] interface.
- */
-internal class ServiceRegistryImpl(private val registry: MutableMap<String, MutableMap<Class<*>, Any>> = mutableMapOf()) :
- MutableServiceRegistry {
- override fun <T : Any> resolve(
- name: String,
- type: Class<T>,
- ): T? {
- val servicesForName = registry[name] ?: return null
-
- @Suppress("UNCHECKED_CAST")
- return servicesForName[type] as T?
- }
-
- override fun <T : Any> register(
- name: String,
- type: Class<T>,
- service: T,
- ) {
- val services = registry.computeIfAbsent(name) { mutableMapOf() }
-
- if (type in services) {
- throw IllegalStateException("Duplicate service $type registered for name $name")
- }
-
- services[type] = service
- }
-
- override fun remove(
- name: String,
- type: Class<*>,
- ) {
- val services = registry[name] ?: return
- services.remove(type)
- }
-
- override fun remove(name: String) {
- registry.remove(name)
- }
-
- override fun clone(): MutableServiceRegistry {
- val res = mutableMapOf<String, MutableMap<Class<*>, Any>>()
- registry.mapValuesTo(res) { (_, v) -> v.toMutableMap() }
- return ServiceRegistryImpl(res)
- }
-
- override fun toString(): String {
- val entries = registry.map { "${it.key}=${it.value}" }.joinToString()
- return "ServiceRegistry{$entries}"
- }
-}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
deleted file mode 100644
index e681403c..00000000
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Task
-import org.opendc.compute.api.TaskState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostListener
-import org.opendc.compute.service.driver.HostModel
-import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.driver.telemetry.GuestCpuStats
-import org.opendc.compute.service.driver.telemetry.GuestSystemStats
-import org.opendc.compute.service.driver.telemetry.HostCpuStats
-import org.opendc.compute.service.driver.telemetry.HostSystemStats
-import org.opendc.compute.simulator.internal.DefaultWorkloadMapper
-import org.opendc.compute.simulator.internal.Guest
-import org.opendc.compute.simulator.internal.GuestListener
-import org.opendc.simulator.compute.SimBareMetalMachine
-import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.kernel.SimHypervisor
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.compute.workload.SimWorkloads
-import java.time.Duration
-import java.time.Instant
-import java.time.InstantSource
-import java.util.UUID
-import java.util.function.Supplier
-
-/**
- * A [Host] implementation that simulates virtual machines on a physical machine using [SimHypervisor].
- *
- * @param uid The unique identifier of the host.
- * @param name The name of the host.
- * @param meta The metadata of the host.
- * @param clock The (virtual) clock used to track time.
- * @param machine The [SimBareMetalMachine] on which the host runs.
- * @param hypervisor The [SimHypervisor] to run on top of the machine.
- * @param mapper A [SimWorkloadMapper] to map a [Task] to a [SimWorkload].
- * @param bootModel A [Supplier] providing the [SimWorkload] to execute during the boot procedure of the hypervisor.
- * @param optimize A flag to indicate to optimize the machine models of the virtual machines.
- */
-public class SimHost(
- private val uid: UUID,
- private val name: String,
- private val meta: Map<String, Any>,
- private val clock: InstantSource,
- private val machine: SimBareMetalMachine,
- private val hypervisor: SimHypervisor,
- private val mapper: SimWorkloadMapper = DefaultWorkloadMapper,
- private val bootModel: Supplier<SimWorkload?> = Supplier { null },
- private val optimize: Boolean = false,
-) : Host, AutoCloseable {
- /**
- * The event listeners registered with this host.
- */
- private val listeners = mutableListOf<HostListener>()
-
- /**
- * The virtual machines running on the hypervisor.
- */
- private val guests = HashMap<Task, Guest>()
- private val localGuests = mutableListOf<Guest>()
-
- private var localState: HostState = HostState.DOWN
- set(value) {
- if (value != field) {
- listeners.forEach { it.onStateChanged(this, value) }
- }
- field = value
- }
-
- private val model: HostModel =
- HostModel(
- machine.model.cpu.totalCapacity,
- machine.model.cpu.coreCount,
- machine.model.memory.size,
- )
-
- /**
- * The [GuestListener] that listens for guest events.
- */
- private val guestListener =
- object : GuestListener {
- override fun onStart(guest: Guest) {
- listeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) }
- }
-
- override fun onStop(guest: Guest) {
- listeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) }
- }
- }
-
- init {
- launch()
- }
-
- override fun getUid(): UUID {
- return uid
- }
-
- override fun getName(): String {
- return name
- }
-
- override fun getModel(): HostModel {
- return model
- }
-
- override fun getMeta(): Map<String, *> {
- return meta
- }
-
- override fun getState(): HostState {
- return localState
- }
-
- override fun getInstances(): Set<Task> {
- return guests.keys
- }
-
- override fun canFit(task: Task): Boolean {
- val sufficientMemory = model.memoryCapacity >= task.flavor.memorySize
- val enoughCpus = model.coreCount >= task.flavor.coreCount
- val canFit = hypervisor.canFit(task.flavor.toMachineModel())
-
- return sufficientMemory && enoughCpus && canFit
- }
-
- override fun spawn(task: Task) {
- guests.computeIfAbsent(task) { key ->
- require(canFit(key)) { "Task does not fit" }
-
- val machine = hypervisor.newMachine(key.flavor.toMachineModel())
- val newGuest =
- Guest(
- clock,
- this,
- hypervisor,
- mapper,
- guestListener,
- task,
- machine,
- )
-
- localGuests.add(newGuest)
- newGuest
- }
- }
-
- override fun contains(task: Task): Boolean {
- return task in guests
- }
-
- override fun start(task: Task) {
- val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" }
- guest.start()
- }
-
- override fun stop(task: Task) {
- val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" }
- guest.stop()
- }
-
- override fun delete(task: Task) {
- val guest = guests[task] ?: return
- guest.delete()
-
- guests.remove(task)
- localGuests.remove(guest)
- }
-
- override fun addListener(listener: HostListener) {
- listeners.add(listener)
- }
-
- override fun removeListener(listener: HostListener) {
- listeners.remove(listener)
- }
-
- override fun close() {
- reset(HostState.DOWN)
- machine.cancel()
- }
-
- override fun getSystemStats(): HostSystemStats {
- updateUptime()
-
- var terminated = 0
- var running = 0
- var error = 0
- var invalid = 0
-
- val guests = localGuests.listIterator()
- for (guest in guests) {
- when (guest.state) {
- TaskState.TERMINATED -> terminated++
- TaskState.RUNNING -> running++
- TaskState.ERROR -> error++
- TaskState.DELETED -> {
- // Remove guests that have been deleted
- this.guests.remove(guest.task)
- guests.remove()
- }
- else -> invalid++
- }
- }
-
- return HostSystemStats(
- Duration.ofMillis(localUptime),
- Duration.ofMillis(localDowntime),
- localBootTime,
- machine.psu.powerDraw,
- machine.psu.energyUsage,
- terminated,
- running,
- error,
- invalid,
- )
- }
-
- override fun getSystemStats(task: Task): GuestSystemStats {
- val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" }
- return guest.getSystemStats()
- }
-
- override fun getCpuStats(): HostCpuStats {
- val counters = hypervisor.counters
- counters.sync()
-
- return HostCpuStats(
- counters.cpuActiveTime,
- counters.cpuIdleTime,
- counters.cpuStealTime,
- counters.cpuLostTime,
- hypervisor.cpuCapacity,
- hypervisor.cpuDemand,
- hypervisor.cpuUsage,
- hypervisor.cpuUsage / localCpuLimit,
- )
- }
-
- override fun getCpuStats(task: Task): GuestCpuStats {
- val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" }
- return guest.getCpuStats()
- }
-
- override fun hashCode(): Int = uid.hashCode()
-
- override fun equals(other: Any?): Boolean {
- return other is SimHost && uid == other.uid
- }
-
- override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
-
- public fun fail() {
- reset(HostState.ERROR)
-
- for (guest in localGuests) {
- guest.fail()
- }
- }
-
- public fun recover() {
- updateUptime()
-
- launch()
- }
-
- /**
- * The [SimMachineContext] that represents the machine running the hypervisor.
- */
- private var ctx: SimMachineContext? = null
-
- /**
- * Launch the hypervisor.
- */
- private fun launch() {
- check(ctx == null) { "Concurrent hypervisor running" }
-
- val bootWorkload = bootModel.get()
- val hypervisor = hypervisor
- val hypervisorWorkload =
- object : SimWorkload by hypervisor {
- override fun onStart(ctx: SimMachineContext) {
- try {
- localBootTime = clock.instant()
- localState = HostState.UP
- hypervisor.onStart(ctx)
-
- // Recover the guests that were running on the hypervisor.
- for (guest in localGuests) {
- guest.recover()
- }
- } catch (cause: Throwable) {
- localState = HostState.ERROR
- throw cause
- }
- }
- }
-
- val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload
-
- // Launch hypervisor onto machine
- ctx =
- machine.startWorkload(workload, emptyMap()) { cause ->
- localState = if (cause != null) HostState.ERROR else HostState.DOWN
- ctx = null
- }
- }
-
- /**
- * Reset the machine.
- */
- private fun reset(state: HostState) {
- updateUptime()
-
- // Stop the hypervisor
- ctx?.shutdown()
- localState = state
- }
-
- /**
- * Convert flavor to machine model.
- */
- private fun Flavor.toMachineModel(): MachineModel {
- return MachineModel(machine.model.cpu, MemoryUnit("Generic", "Generic", 3200.0, memorySize))
- }
-
- private var localLastReport = clock.millis()
- private var localUptime = 0L
- private var localDowntime = 0L
- private var localBootTime: Instant? = null
- private val localCpuLimit = machine.model.cpu.totalCapacity
-
- /**
- * Helper function to track the uptime of a machine.
- */
- private fun updateUptime() {
- val now = clock.millis()
- val duration = now - localLastReport
- localLastReport = now
-
- if (localState == HostState.UP) {
- localUptime += duration
- } else if (localState == HostState.ERROR) {
- // Only increment downtime if the machine is in a failure state
- localDowntime += duration
- }
-
- val guests = localGuests
- for (i in guests.indices) {
- guests[i].updateUptime()
- }
- }
-}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/TaskWatcher.kt
index a85091a0..9fe4dff5 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/TaskWatcher.kt
@@ -22,16 +22,21 @@
package org.opendc.compute.simulator
-import org.opendc.compute.api.Image
-import org.opendc.compute.api.Task
-import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.compute.api.TaskState
+import org.opendc.compute.simulator.service.ServiceTask
/**
- * A [SimWorkloadMapper] is responsible for mapping a [Task] and [Image] to a [SimWorkload] that can be simulated.
+ * An interface used to watch the state of [ServiceTask] instances.
*/
-public fun interface SimWorkloadMapper {
+public interface TaskWatcher {
/**
- * Map the specified [task] to a [SimWorkload] that can be simulated.
+ * This method is invoked when the state of a [ServiceTask] changes.
+ *
+ * @param task The task whose state has changed.
+ * @param newState The new state of the task.
*/
- public fun createWorkload(task: Task): SimWorkload
+ public fun onStateChanged(
+ task: ServiceTask,
+ newState: TaskState,
+ ) {}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
new file mode 100644
index 00000000..31ff384c
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
@@ -0,0 +1,369 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.host
+
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.api.TaskState
+import org.opendc.compute.simulator.internal.Guest
+import org.opendc.compute.simulator.internal.GuestListener
+import org.opendc.compute.simulator.service.ServiceTask
+import org.opendc.compute.simulator.telemetry.GuestCpuStats
+import org.opendc.compute.simulator.telemetry.GuestSystemStats
+import org.opendc.compute.simulator.telemetry.HostCpuStats
+import org.opendc.compute.simulator.telemetry.HostSystemStats
+import org.opendc.simulator.compute.cpu.CpuPowerModel
+import org.opendc.simulator.compute.machine.SimMachine
+import org.opendc.simulator.compute.models.MachineModel
+import org.opendc.simulator.compute.models.MemoryUnit
+import org.opendc.simulator.engine.FlowGraph
+import java.time.Duration
+import java.time.Instant
+import java.time.InstantSource
+import java.util.UUID
+
+/**
+ * A [Host] implementation that simulates virtual machines on a physical machine.
+ *
+ * @param uid The unique identifier of the host.
+ * @param name The name of the host.
+ * @param meta The metadata of the host.
+ * @param clock The (virtual) clock used to track time.
+ * @param graph The Flow Graph that the Host is part of
+ * @param machineModel The static model of the host
+ * @param powerModel The static powerModel of the CPU TODO: can this be combined with machinemodel?
+ * @constructor Create empty Sim host
+ */
+public class SimHost(
+ private val uid: UUID,
+ private val name: String,
+ private val meta: Map<String, Any>,
+ private val clock: InstantSource,
+ private val graph: FlowGraph,
+ private val machineModel: MachineModel,
+ private val powerModel: CpuPowerModel,
+) : AutoCloseable {
+ /**
+ * The event listeners registered with this host.
+ */
+ private val hostListeners = mutableListOf<HostListener>()
+
+ /**
+ * The virtual machines running on the hypervisor.
+ */
+ private val taskToGuestMap = HashMap<ServiceTask, Guest>()
+ private val guests = mutableListOf<Guest>()
+
+ private var hostState: HostState = HostState.DOWN
+ set(value) {
+ if (value != field) {
+ hostListeners.forEach { it.onStateChanged(this, value) }
+ }
+ field = value
+ }
+
+ private val model: HostModel =
+ HostModel(
+ machineModel.cpu.totalCapacity,
+ machineModel.cpu.coreCount,
+ machineModel.memory.size,
+ )
+
+ private var simMachine: SimMachine? = null
+
+ /**
+ * The [GuestListener] that listens for guest events.
+ */
+ private val guestListener =
+ object : GuestListener {
+ override fun onStart(guest: Guest) {
+ hostListeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) }
+ }
+
+ override fun onStop(guest: Guest) {
+ hostListeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) }
+ }
+ }
+
+ private var lastReport = clock.millis()
+ private var totalUptime = 0L
+ private var totalDowntime = 0L
+ private var bootTime: Instant? = null
+ private val cpuLimit = machineModel.cpu.totalCapacity
+
+ init {
+ launch()
+ }
+
+ /**
+ * Launch the hypervisor.
+ */
+ private fun launch() {
+ bootTime = this.clock.instant()
+ hostState = HostState.UP
+
+ if (this.simMachine != null) {
+ return
+ }
+
+ this.simMachine =
+ SimMachine(
+ this.graph,
+ this.machineModel,
+ this.powerModel,
+ ) { cause ->
+ hostState = if (cause != null) HostState.ERROR else HostState.DOWN
+ }
+ }
+
+ override fun close() {
+ reset(HostState.DOWN)
+ }
+
+ public fun fail() {
+ reset(HostState.ERROR)
+
+ // Fail the guest and delete them
+ // This weird loop is the only way I have been able to make it work.
+ while (guests.size > 0) {
+ val guest = guests[0]
+ guest.fail()
+ this.delete(guest.task)
+ }
+ }
+
+ public fun recover() {
+ updateUptime()
+
+ launch()
+ }
+
+ /**
+ * Reset the machine.
+ */
+ private fun reset(state: HostState) {
+ updateUptime()
+
+ // Stop the hypervisor
+ hostState = state
+ }
+
+ public fun getUid(): UUID {
+ return uid
+ }
+
+ public fun getName(): String {
+ return name
+ }
+
+ public fun getModel(): HostModel {
+ return model
+ }
+
+ public fun getMeta(): Map<String, *> {
+ return meta
+ }
+
+ public fun getState(): HostState {
+ return hostState
+ }
+
+ public fun getInstances(): Set<ServiceTask> {
+ return taskToGuestMap.keys
+ }
+
+ public fun getGuests(): List<Guest> {
+ return this.guests
+ }
+
+ public fun canFit(task: ServiceTask): Boolean {
+ val sufficientMemory = model.memoryCapacity >= task.flavor.memorySize
+ val enoughCpus = model.coreCount >= task.flavor.coreCount
+ val canFit = simMachine!!.canFit(task.flavor.toMachineModel())
+
+ return sufficientMemory && enoughCpus && canFit
+ }
+
+ /**
+ * Spawn A Virtual machine that run the Task and put this Task as a Guest on it
+ *
+ * @param task
+ */
+ public fun spawn(task: ServiceTask) {
+ assert(simMachine != null) { "Tried start task $task while no SimMachine is active" }
+
+ require(canFit(task)) { "Task does not fit" }
+
+ val newGuest =
+ Guest(
+ clock,
+ this,
+ guestListener,
+ task,
+ simMachine!!,
+ )
+
+ guests.add(newGuest)
+ newGuest.start()
+
+ taskToGuestMap.computeIfAbsent(task) { newGuest }
+ }
+
+ public fun contains(task: ServiceTask): Boolean {
+ return task in taskToGuestMap
+ }
+
+ public fun start(task: ServiceTask) {
+ val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.uid} at host $uid" }
+ guest.start()
+ }
+
+ public fun stop(task: ServiceTask) {
+ val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.uid} at host $uid" }
+ guest.stop()
+ }
+
+ public fun delete(task: ServiceTask) {
+ val guest = taskToGuestMap[task] ?: return
+ guest.delete()
+
+ taskToGuestMap.remove(task)
+ guests.remove(guest)
+ task.host = null
+ }
+
+ public fun removeTask(task: ServiceTask) {
+ val guest = taskToGuestMap[task] ?: return
+ guest.delete()
+
+ taskToGuestMap.remove(task)
+ guests.remove(guest)
+ }
+
+ public fun addListener(listener: HostListener) {
+ hostListeners.add(listener)
+ }
+
+ public fun removeListener(listener: HostListener) {
+ hostListeners.remove(listener)
+ }
+
+ public fun getSystemStats(): HostSystemStats {
+ updateUptime()
+ this.simMachine!!.psu.updateCounters()
+
+ var terminated = 0
+ var running = 0
+ var failed = 0
+ var invalid = 0
+ var completed = 0
+
+ val guests = guests.listIterator()
+ for (guest in guests) {
+ when (guest.state) {
+ TaskState.RUNNING -> running++
+ TaskState.COMPLETED, TaskState.FAILED, TaskState.TERMINATED -> {
+ failed++
+ // Remove guests that have been deleted
+ this.taskToGuestMap.remove(guest.task)
+ guests.remove()
+ }
+ else -> invalid++
+ }
+ }
+
+ return HostSystemStats(
+ Duration.ofMillis(totalUptime),
+ Duration.ofMillis(totalDowntime),
+ bootTime,
+ simMachine!!.psu.powerDraw,
+ simMachine!!.psu.energyUsage,
+ terminated,
+ running,
+ failed,
+ invalid,
+ )
+ }
+
+ public fun getSystemStats(task: ServiceTask): GuestSystemStats {
+ val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.uid} at host $uid" }
+ return guest.getSystemStats()
+ }
+
+ public fun getCpuStats(): HostCpuStats {
+ simMachine!!.cpu.updateCounters(this.clock.millis())
+
+ val counters = simMachine!!.performanceCounters
+
+ return HostCpuStats(
+ counters.cpuActiveTime,
+ counters.cpuIdleTime,
+ counters.cpuStealTime,
+ counters.cpuLostTime,
+ counters.cpuCapacity,
+ counters.cpuDemand,
+ counters.cpuSupply,
+ counters.cpuSupply / cpuLimit,
+ )
+ }
+
+ public fun getCpuStats(task: ServiceTask): GuestCpuStats {
+ val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.uid} at host $uid" }
+ return guest.getCpuStats()
+ }
+
+ override fun hashCode(): Int = uid.hashCode()
+
+ override fun equals(other: Any?): Boolean {
+ return other is SimHost && uid == other.uid
+ }
+
+ override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
+
+ /**
+ * Convert flavor to machine model.
+ */
+ private fun Flavor.toMachineModel(): MachineModel {
+ return MachineModel(simMachine!!.machineModel.cpu, MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+ }
+
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun updateUptime() {
+ val now = clock.millis()
+ val duration = now - lastReport
+ lastReport = now
+
+ if (hostState == HostState.UP) {
+ totalUptime += duration
+ } else if (hostState == HostState.ERROR) {
+ // Only increment downtime if the machine is in a failure state
+ totalDowntime += duration
+ }
+
+ val guests = guests
+ for (i in guests.indices) {
+ guests[i].updateUptime()
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index cf6c146a..3a923222 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -23,15 +23,16 @@
package org.opendc.compute.simulator.internal
import mu.KotlinLogging
-import org.opendc.compute.api.Task
import org.opendc.compute.api.TaskState
-import org.opendc.compute.service.driver.telemetry.GuestCpuStats
-import org.opendc.compute.service.driver.telemetry.GuestSystemStats
-import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.SimWorkloadMapper
-import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.kernel.SimHypervisor
-import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.compute.simulator.host.SimHost
+import org.opendc.compute.simulator.service.ServiceTask
+import org.opendc.compute.simulator.telemetry.GuestCpuStats
+import org.opendc.compute.simulator.telemetry.GuestSystemStats
+import org.opendc.simulator.compute.machine.SimMachine
+import org.opendc.simulator.compute.machine.VirtualMachine
+import org.opendc.simulator.compute.workload.ChainWorkload
+import org.opendc.simulator.compute.workload.TraceFragment
+import org.opendc.simulator.compute.workload.TraceWorkload
import java.time.Duration
import java.time.Instant
import java.time.InstantSource
@@ -39,14 +40,12 @@ import java.time.InstantSource
/**
* A virtual machine instance that is managed by a [SimHost].
*/
-internal class Guest(
+public class Guest(
private val clock: InstantSource,
- val host: SimHost,
- private val hypervisor: SimHypervisor,
- private val mapper: SimWorkloadMapper,
+ public val host: SimHost,
private val listener: GuestListener,
- val task: Task,
- val machine: SimHypervisor.SimVirtualMachine,
+ public val task: ServiceTask,
+ public val simMachine: SimMachine,
) {
/**
* The state of the [Guest].
@@ -54,50 +53,133 @@ internal class Guest(
* [TaskState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for
* a task.
*/
- var state: TaskState = TaskState.TERMINATED
+ public var state: TaskState = TaskState.CREATED
private set
/**
+ * The [VirtualMachine] on which the task is currently running
+ */
+ public var virtualMachine: VirtualMachine? = null
+
+ private var uptime = 0L
+ private var downtime = 0L
+ private var lastReport = clock.millis()
+ private var bootTime: Instant? = null
+ private val cpuLimit = simMachine.cpu.cpuModel.totalCapacity
+
+ /**
* Start the guest.
*/
- fun start() {
+ public fun start() {
when (state) {
- TaskState.TERMINATED, TaskState.ERROR -> {
+ TaskState.CREATED, TaskState.FAILED -> {
LOGGER.info { "User requested to start task ${task.uid}" }
doStart()
}
TaskState.RUNNING -> return
- TaskState.DELETED -> {
- LOGGER.warn { "User tried to start deleted task" }
- throw IllegalArgumentException("Task is deleted")
+ TaskState.COMPLETED, TaskState.TERMINATED -> {
+ LOGGER.warn { "User tried to start a finished task" }
+ throw IllegalArgumentException("Task is already finished")
}
else -> assert(false) { "Invalid state transition" }
}
}
/**
+ * Launch the guest on the simulated Virtual machine
+ */
+ private fun doStart() {
+ assert(virtualMachine == null) { "Concurrent job is already running" }
+
+ onStart()
+
+ val bootworkload =
+ TraceWorkload(
+ ArrayList(
+ listOf(
+ TraceFragment(
+ 1000000L,
+ 100000.0,
+ 1,
+ ),
+ ),
+ ),
+ 0,
+ 0,
+ 0.0,
+ )
+ val newChainWorkload =
+ ChainWorkload(
+ ArrayList(listOf(task.workload)),
+ task.workload.checkpointInterval,
+ task.workload.checkpointDuration,
+ task.workload.checkpointIntervalScaling,
+ )
+
+ virtualMachine =
+ simMachine.startWorkload(newChainWorkload) { cause ->
+ onStop(if (cause != null) TaskState.FAILED else TaskState.COMPLETED)
+ }
+ }
+
+ /**
+ * This method is invoked when the guest was started on the host and has booted into a running state.
+ */
+ private fun onStart() {
+ bootTime = clock.instant()
+ state = TaskState.RUNNING
+ listener.onStart(this)
+ }
+
+ /**
* Stop the guest.
*/
- fun stop() {
+ public fun stop() {
when (state) {
- TaskState.RUNNING -> doStop(TaskState.TERMINATED)
- TaskState.ERROR -> doRecover()
- TaskState.TERMINATED, TaskState.DELETED -> return
+ TaskState.RUNNING -> doStop(TaskState.COMPLETED)
+ TaskState.FAILED -> state = TaskState.TERMINATED
+ TaskState.COMPLETED, TaskState.TERMINATED -> return
else -> assert(false) { "Invalid state transition" }
}
}
/**
+ * Attempt to stop the task and put it into [target] state.
+ */
+ private fun doStop(target: TaskState) {
+ assert(virtualMachine != null) { "Invalid job state" }
+ val virtualMachine = this.virtualMachine ?: return
+ if (target == TaskState.FAILED) {
+ virtualMachine.shutdown(Exception("Task has failed"))
+ } else {
+ virtualMachine.shutdown()
+ }
+
+ this.virtualMachine = null
+
+ this.state = target
+ }
+
+ /**
+ * This method is invoked when the guest stopped.
+ */
+ private fun onStop(target: TaskState) {
+ updateUptime()
+
+ state = target
+ listener.onStop(this)
+ }
+
+ /**
* Delete the guest.
*
* This operation will stop the guest if it is running on the host and remove all resources associated with the
* guest.
*/
- fun delete() {
+ public fun delete() {
stop()
- state = TaskState.DELETED
- hypervisor.removeMachine(machine)
+ state = TaskState.FAILED
}
/**
@@ -105,19 +187,19 @@ internal class Guest(
*
* This operation forcibly stops the guest and puts the task into an error state.
*/
- fun fail() {
+ public fun fail() {
if (state != TaskState.RUNNING) {
return
}
- doStop(TaskState.ERROR)
+ doStop(TaskState.FAILED)
}
/**
* Recover the guest if it is in an error state.
*/
- fun recover() {
- if (state != TaskState.ERROR) {
+ public fun recover() {
+ if (state != TaskState.FAILED) {
return
}
@@ -127,117 +209,46 @@ internal class Guest(
/**
* Obtain the system statistics of this guest.
*/
- fun getSystemStats(): GuestSystemStats {
+ public fun getSystemStats(): GuestSystemStats {
updateUptime()
return GuestSystemStats(
- Duration.ofMillis(localUptime),
- Duration.ofMillis(localDowntime),
- localBootTime,
+ Duration.ofMillis(uptime),
+ Duration.ofMillis(downtime),
+ bootTime,
)
}
/**
* Obtain the CPU statistics of this guest.
*/
- fun getCpuStats(): GuestCpuStats {
- val counters = machine.counters
- counters.sync()
+ public fun getCpuStats(): GuestCpuStats {
+ virtualMachine!!.updateCounters(this.clock.millis())
+ val counters = virtualMachine!!.performanceCounters
return GuestCpuStats(
counters.cpuActiveTime / 1000L,
counters.cpuIdleTime / 1000L,
counters.cpuStealTime / 1000L,
counters.cpuLostTime / 1000L,
- machine.cpuCapacity,
- machine.cpuUsage,
- machine.cpuUsage / localCpuLimit,
+ counters.cpuCapacity,
+ counters.cpuSupply,
+ counters.cpuSupply / cpuLimit,
)
}
/**
- * The [SimMachineContext] representing the current active virtual machine instance or `null` if no virtual machine
- * is active.
- */
- private var ctx: SimMachineContext? = null
-
- /**
- * Launch the guest on the simulated
- */
- private fun doStart() {
- assert(ctx == null) { "Concurrent job running" }
-
- onStart()
-
- val workload: SimWorkload = mapper.createWorkload(task)
- workload.setOffset(clock.millis())
- val meta = mapOf("driver" to host, "task" to task) + task.meta
- ctx =
- machine.startWorkload(workload, meta) { cause ->
- onStop(if (cause != null) TaskState.ERROR else TaskState.TERMINATED)
- ctx = null
- }
- }
-
- /**
- * Attempt to stop the task and put it into [target] state.
- */
- private fun doStop(target: TaskState) {
- assert(ctx != null) { "Invalid job state" }
- val ctx = ctx ?: return
- if (target == TaskState.ERROR) {
- ctx.shutdown(Exception("Stopped because of ERROR"))
- } else {
- ctx.shutdown()
- }
-
- state = target
- }
-
- /**
- * Attempt to recover from an error state.
- */
- private fun doRecover() {
- state = TaskState.TERMINATED
- }
-
- /**
- * This method is invoked when the guest was started on the host and has booted into a running state.
- */
- private fun onStart() {
- localBootTime = clock.instant()
- state = TaskState.RUNNING
- listener.onStart(this)
- }
-
- /**
- * This method is invoked when the guest stopped.
- */
- private fun onStop(target: TaskState) {
- updateUptime()
-
- state = target
- listener.onStop(this)
- }
-
- private var localUptime = 0L
- private var localDowntime = 0L
- private var localLastReport = clock.millis()
- private var localBootTime: Instant? = null
- private val localCpuLimit = machine.model.cpu.totalCapacity
-
- /**
* Helper function to track the uptime and downtime of the guest.
*/
- fun updateUptime() {
+ public fun updateUptime() {
val now = clock.millis()
- val duration = now - localLastReport
- localLastReport = now
+ val duration = now - lastReport
+ lastReport = now
if (state == TaskState.RUNNING) {
- localUptime += duration
- } else if (state == TaskState.ERROR) {
- localDowntime += duration
+ uptime += duration
+ } else if (state == TaskState.FAILED) {
+ downtime += duration
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
index e6d0fdad..895d78f9 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
@@ -25,14 +25,14 @@ package org.opendc.compute.simulator.internal
/**
* Helper interface to listen for [Guest] events.
*/
-internal interface GuestListener {
+public interface GuestListener {
/**
* This method is invoked when the guest machine is running.
*/
- fun onStart(guest: Guest)
+ public fun onStart(guest: Guest)
/**
* This method is invoked when the guest machine is stopped.
*/
- fun onStop(guest: Guest)
+ public fun onStop(guest: Guest)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt
index f1123742..f295f522 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt
@@ -23,9 +23,9 @@
package org.opendc.compute.simulator.provisioner
import org.opendc.compute.carbon.CarbonTrace
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.telemetry.ComputeMetricReader
-import org.opendc.compute.telemetry.ComputeMonitor
+import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.compute.simulator.telemetry.ComputeMetricReader
+import org.opendc.compute.simulator.telemetry.ComputeMonitor
import java.time.Duration
/**
@@ -44,7 +44,15 @@ public class ComputeMonitorProvisioningStep(
requireNotNull(
ctx.registry.resolve(serviceDomain, ComputeService::class.java),
) { "Compute service $serviceDomain does not exist" }
- val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval, startTime, carbonTrace)
+ val metricReader =
+ ComputeMetricReader(
+ ctx.dispatcher,
+ service,
+ monitor,
+ exportInterval,
+ startTime,
+ carbonTrace,
+ )
return AutoCloseable { metricReader.close() }
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt
index 645c9d46..6bdb131f 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt
@@ -22,8 +22,8 @@
package org.opendc.compute.simulator.provisioner
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.service.ComputeService
import java.time.Duration
/**
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
index afde8219..07db3d26 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
@@ -25,9 +25,8 @@
package org.opendc.compute.simulator.provisioner
import org.opendc.compute.carbon.CarbonTrace
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.telemetry.ComputeMonitor
+import org.opendc.compute.simulator.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.telemetry.ComputeMonitor
import org.opendc.compute.topology.specs.HostSpec
import java.time.Duration
@@ -41,7 +40,7 @@ import java.time.Duration
public fun setupComputeService(
serviceDomain: String,
scheduler: (ProvisioningContext) -> ComputeScheduler,
- schedulingQuantum: Duration = Duration.ofMinutes(5),
+ schedulingQuantum: Duration = Duration.ofSeconds(1),
maxNumFailures: Int = 10,
): ProvisioningStep {
return ComputeServiceProvisioningStep(serviceDomain, scheduler, schedulingQuantum, maxNumFailures)
@@ -76,7 +75,6 @@ public fun registerComputeMonitor(
public fun setupHosts(
serviceDomain: String,
specs: List<HostSpec>,
- optimize: Boolean = false,
): ProvisioningStep {
- return HostsProvisioningStep(serviceDomain, specs, optimize)
+ return HostsProvisioningStep(serviceDomain, specs)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
index a80be634..19674d5e 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
@@ -22,13 +22,10 @@
package org.opendc.compute.simulator.provisioner
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.host.SimHost
+import org.opendc.compute.simulator.service.ComputeService
import org.opendc.compute.topology.specs.HostSpec
-import org.opendc.simulator.compute.SimBareMetalMachine
-import org.opendc.simulator.compute.kernel.SimHypervisor
-import org.opendc.simulator.flow2.FlowEngine
-import java.util.SplittableRandom
+import org.opendc.simulator.engine.FlowEngine
/**
* A [ProvisioningStep] that provisions a list of hosts for a [ComputeService].
@@ -40,30 +37,27 @@ import java.util.SplittableRandom
public class HostsProvisioningStep internal constructor(
private val serviceDomain: String,
private val specs: List<HostSpec>,
- private val optimize: Boolean,
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
val service =
requireNotNull(
ctx.registry.resolve(serviceDomain, ComputeService::class.java),
) { "Compute service $serviceDomain does not exist" }
- val engine = FlowEngine.create(ctx.dispatcher)
- val graph = engine.newGraph()
val hosts = mutableSetOf<SimHost>()
- for (spec in specs) {
- val machine = SimBareMetalMachine.create(graph, spec.model, spec.psuFactory)
- val hypervisor = SimHypervisor.create(spec.multiplexerFactory, SplittableRandom(ctx.seeder.nextLong()))
+ val flowEngine = FlowEngine.create(ctx.dispatcher)
+ val flowGraph = flowEngine.newGraph()
+ for (spec in specs) {
val host =
SimHost(
spec.uid,
spec.name,
spec.meta,
ctx.dispatcher.timeSource,
- machine,
- hypervisor,
- optimize = optimize,
+ flowGraph,
+ spec.model,
+ spec.cpuPowerModel,
)
require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" }
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt
index 58d3a8c2..2e76478e 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt
@@ -23,9 +23,7 @@
package org.opendc.compute.simulator.provisioner
import org.opendc.common.Dispatcher
-import org.opendc.compute.simulator.MutableServiceRegistry
import org.opendc.compute.simulator.ServiceRegistry
-import org.opendc.compute.simulator.ServiceRegistryImpl
import java.util.ArrayDeque
import java.util.SplittableRandom
@@ -47,7 +45,7 @@ public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable {
object : ProvisioningContext {
override val dispatcher: Dispatcher = dispatcher
override val seeder: SplittableRandom = SplittableRandom(seed)
- override val registry: MutableServiceRegistry = ServiceRegistryImpl()
+ override val registry: ServiceRegistry = ServiceRegistry()
override fun toString(): String = "Provisioner.ProvisioningContext"
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt
index 1788c8e2..20c441c4 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt
@@ -23,7 +23,7 @@
package org.opendc.compute.simulator.provisioner
import org.opendc.common.Dispatcher
-import org.opendc.compute.simulator.MutableServiceRegistry
+import org.opendc.compute.simulator.ServiceRegistry
import java.util.SplittableRandom
import java.util.random.RandomGenerator
@@ -44,7 +44,7 @@ public interface ProvisioningContext {
public val seeder: RandomGenerator
/**
- * A [MutableServiceRegistry] where the provisioned services are registered.
+ * A [ServiceRegistry] where the provisioned services are registered.
*/
- public val registry: MutableServiceRegistry
+ public val registry: ServiceRegistry
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt
new file mode 100644
index 00000000..f0a2c3b4
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A generic scheduler interface used by the [ComputeService] to select hosts to place [ServiceTask]s on.
+ */
+public interface ComputeScheduler {
+ /**
+ * Register the specified [host] to be used for scheduling.
+ */
+ public fun addHost(host: HostView)
+
+ /**
+ * Remove the specified [host] to be removed from the scheduling pool.
+ */
+ public fun removeHost(host: HostView)
+
+ /**
+ * Select a host for the specified [task].
+ *
+ * @param task The server to select a host for.
+ * @return The host to schedule the server on or `null` if no server is available.
+ */
+ public fun select(task: ServiceTask): HostView?
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt
new file mode 100644
index 00000000..ec3aedcb
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeSchedulers")
+
+package org.opendc.compute.simulator.scheduler
+
+import org.opendc.compute.simulator.scheduler.filters.ComputeFilter
+import org.opendc.compute.simulator.scheduler.filters.RamFilter
+import org.opendc.compute.simulator.scheduler.filters.VCpuFilter
+import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.simulator.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.simulator.scheduler.weights.RamWeigher
+import org.opendc.compute.simulator.scheduler.weights.VCpuWeigher
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
+
+public enum class ComputeSchedulerEnum {
+ Mem,
+ MemInv,
+ CoreMem,
+ CoreMemInv,
+ ActiveServers,
+ ActiveServersInv,
+ ProvisionedCores,
+ ProvisionedCoresInv,
+ Random,
+ Replay,
+}
+
+public fun createComputeScheduler(
+ name: String,
+ seeder: RandomGenerator,
+ placements: Map<String, String> = emptyMap(),
+): ComputeScheduler {
+ return createComputeScheduler(ComputeSchedulerEnum.valueOf(name.uppercase()), seeder, placements)
+}
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+public fun createComputeScheduler(
+ name: ComputeSchedulerEnum,
+ seeder: RandomGenerator,
+ placements: Map<String, String> = emptyMap(),
+): ComputeScheduler {
+ val cpuAllocationRatio = 1.0
+ val ramAllocationRatio = 1.5
+ return when (name) {
+ ComputeSchedulerEnum.Mem ->
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0)),
+ )
+ ComputeSchedulerEnum.MemInv ->
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0)),
+ )
+ ComputeSchedulerEnum.CoreMem ->
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0)),
+ )
+ ComputeSchedulerEnum.CoreMemInv ->
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0)),
+ )
+ ComputeSchedulerEnum.ActiveServers ->
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0)),
+ )
+ ComputeSchedulerEnum.ActiveServersInv ->
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0)),
+ )
+ ComputeSchedulerEnum.ProvisionedCores ->
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)),
+ )
+ ComputeSchedulerEnum.ProvisionedCoresInv ->
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)),
+ )
+ ComputeSchedulerEnum.Random ->
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = SplittableRandom(seeder.nextLong()),
+ )
+ ComputeSchedulerEnum.Replay -> ReplayScheduler(placements)
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt
new file mode 100644
index 00000000..9fd3a862
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler
+
+import org.opendc.compute.simulator.scheduler.filters.HostFilter
+import org.opendc.compute.simulator.scheduler.weights.HostWeigher
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
+import kotlin.math.min
+
+/**
+ * A [ComputeScheduler] implementation that uses filtering and weighing passes to select
+ * the host to schedule a [ServiceTask] on.
+ *
+ * This implementation is based on the filter scheduler from OpenStack Nova.
+ * See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html
+ *
+ * @param filters The list of filters to apply when searching for an appropriate host.
+ * @param weighers The list of weighers to apply when searching for an appropriate host.
+ * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen.
+ * @param random A [RandomGenerator] instance for selecting
+ */
+public class FilterScheduler(
+ private val filters: List<HostFilter>,
+ private val weighers: List<HostWeigher>,
+ private val subsetSize: Int = 1,
+ private val random: RandomGenerator = SplittableRandom(0),
+) : ComputeScheduler {
+ /**
+ * The pool of hosts available to the scheduler.
+ */
+ private val hosts = mutableListOf<HostView>()
+
+ init {
+ require(subsetSize >= 1) { "Subset size must be one or greater" }
+ }
+
+ override fun addHost(host: HostView) {
+ hosts.add(host)
+ }
+
+ override fun removeHost(host: HostView) {
+ hosts.remove(host)
+ }
+
+ override fun select(task: ServiceTask): HostView? {
+ val hosts = hosts
+ val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } }
+
+ val subset =
+ if (weighers.isNotEmpty()) {
+ val results = weighers.map { it.getWeights(filteredHosts, task) }
+ val weights = DoubleArray(filteredHosts.size)
+
+ for (result in results) {
+ val min = result.min
+ val range = (result.max - min)
+
+ // Skip result if all weights are the same
+ if (range == 0.0) {
+ continue
+ }
+
+ val multiplier = result.multiplier
+ val factor = multiplier / range
+
+ for ((i, weight) in result.weights.withIndex()) {
+ weights[i] += factor * (weight - min)
+ }
+ }
+
+ weights.indices
+ .asSequence()
+ .sortedByDescending { weights[it] }
+ .map { filteredHosts[it] }
+ .take(subsetSize)
+ .toList()
+ } else {
+ filteredHosts
+ }
+
+ // fixme: currently finding no matching hosts can result in an error
+ return when (val maxSize = min(subsetSize, subset.size)) {
+ 0 -> null
+ 1 -> subset[0]
+ else -> subset[random.nextInt(maxSize)]
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt
new file mode 100644
index 00000000..43e366d9
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler
+
+import mu.KotlinLogging
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * Policy replaying VM-cluster assignment.
+ *
+ * Within each cluster, the active servers on each node determine which node gets
+ * assigned the VM image.
+ */
+public class ReplayScheduler(private val vmPlacements: Map<String, String>) : ComputeScheduler {
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The pool of hosts available to the scheduler.
+ */
+ private val hosts = mutableListOf<HostView>()
+
+ override fun addHost(host: HostView) {
+ hosts.add(host)
+ }
+
+ override fun removeHost(host: HostView) {
+ hosts.remove(host)
+ }
+
+ override fun select(task: ServiceTask): HostView? {
+ val clusterName =
+ vmPlacements[task.name]
+ ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${task.name}")
+ val machinesInCluster = hosts.filter { it.host.getName().contains(clusterName) }
+
+ if (machinesInCluster.isEmpty()) {
+ logger.info { "Could not find any machines belonging to cluster $clusterName for image ${task.name}, assigning randomly." }
+ return hosts.maxByOrNull { it.availableMemory }
+ }
+
+ return machinesInCluster.maxByOrNull { it.availableMemory }
+ ?: throw IllegalStateException("Cloud not find any machine and could not randomly assign")
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/ComputeFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/ComputeFilter.kt
new file mode 100644
index 00000000..99a9390e
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/ComputeFilter.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.filters
+
+import org.opendc.compute.simulator.host.HostState
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostFilter] that filters on active hosts.
+ */
+public class ComputeFilter : HostFilter {
+ override fun test(
+ host: HostView,
+ task: ServiceTask,
+ ): Boolean {
+ val result = host.host.getState() == HostState.UP
+ return result
+ }
+
+ override fun toString(): String = "ComputeFilter"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/DifferentHostFilter.kt
index 412da37f..279a2717 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/DifferentHostFilter.kt
@@ -20,27 +20,22 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.internal
+package org.opendc.compute.simulator.scheduler.filters
-import org.opendc.compute.api.Task
-import org.opendc.compute.simulator.SimMetaWorkloadMapper
-import org.opendc.compute.simulator.SimWorkloadMapper
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.compute.workload.SimWorkloads
-import java.time.Duration
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+import java.util.UUID
/**
- * A [SimWorkloadMapper] to introduces a boot delay of 1 ms. This object exists to retain the old behavior while
- * introducing the possibility of adding custom boot delays.
+ * A [HostFilter] that ensures an instance is scheduled on a different host from a set of instances.
*/
-internal object DefaultWorkloadMapper : SimWorkloadMapper {
- private val delegate = SimMetaWorkloadMapper()
-
- override fun createWorkload(task: Task): SimWorkload {
- val workload = delegate.createWorkload(task)
-
- // FIXME: look at connecting this to frontend. This does currently not work correctly
- val bootWorkload = SimWorkloads.runtime(Duration.ofMillis(0), 1.0, 0L, 0L)
- return SimWorkloads.chain(bootWorkload, workload)
+public class DifferentHostFilter : HostFilter {
+ override fun test(
+ host: HostView,
+ task: ServiceTask,
+ ): Boolean {
+ @Suppress("UNCHECKED_CAST")
+ val affinityUUIDs = task.meta["scheduler_hint:different_host"] as? Set<UUID> ?: return true
+ return host.host.getInstances().none { it.uid in affinityUUIDs }
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/HostFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/HostFilter.kt
new file mode 100644
index 00000000..bb9c1cbf
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/HostFilter.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.filters
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A filter used by the [FilterScheduler] to filter hosts.
+ */
+public fun interface HostFilter {
+ /**
+ * Test whether the specified [host] should be included in the selection
+ * for scheduling the specified [task].
+ */
+ public fun test(
+ host: HostView,
+ task: ServiceTask,
+ ): Boolean
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/InstanceCountFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/InstanceCountFilter.kt
new file mode 100644
index 00000000..53d68acf
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/InstanceCountFilter.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.filters
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostFilter] that filters hosts based on the number of instances on the host.
+ *
+ * @param limit The maximum number of instances on the host.
+ */
+public class InstanceCountFilter(private val limit: Int) : HostFilter {
+ override fun test(
+ host: HostView,
+ task: ServiceTask,
+ ): Boolean {
+ return host.instanceCount < limit
+ }
+
+ override fun toString(): String = "InstanceCountFilter[limit=$limit]"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/RamFilter.kt
new file mode 100644
index 00000000..0b570d52
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/RamFilter.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.filters
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostFilter] that filters hosts based on the memory requirements of a [ServiceTask] and the RAM available on the host.
+ *
+ * @param allocationRatio Virtual RAM to physical RAM allocation ratio.
+ */
+public class RamFilter(private val allocationRatio: Double) : HostFilter {
+ override fun test(
+ host: HostView,
+ task: ServiceTask,
+ ): Boolean {
+ val requestedMemory = task.flavor.memorySize
+ val availableMemory = host.availableMemory
+ val memoryCapacity = host.host.getModel().memoryCapacity
+
+ // Do not allow an instance to overcommit against itself, only against
+ // other instances.
+ if (requestedMemory > memoryCapacity) {
+ return false
+ }
+
+ val limit = memoryCapacity * allocationRatio
+ val used = memoryCapacity - availableMemory
+ val usable = limit - used
+
+ val result = usable >= requestedMemory
+ return result
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/SameHostFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/SameHostFilter.kt
new file mode 100644
index 00000000..761b125d
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/SameHostFilter.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.filters
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+import java.util.UUID
+
+/**
+ * A [HostFilter] that ensures an instance is scheduled on the same host as all other instances in a set of instances.
+ */
+public class SameHostFilter : HostFilter {
+ override fun test(
+ host: HostView,
+ task: ServiceTask,
+ ): Boolean {
+ @Suppress("UNCHECKED_CAST")
+ val affinityUUIDs = task.meta["scheduler_hint:same_host"] as? Set<UUID> ?: return true
+ return host.host.getInstances().any { it.uid in affinityUUIDs }
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt
new file mode 100644
index 00000000..256caa94
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.filters
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostFilter] that filters hosts based on the vCPU speed requirements of a [ServiceTask] and the available
+ * capacity on the host.
+ */
+public class VCpuCapacityFilter : HostFilter {
+ override fun test(
+ host: HostView,
+ task: ServiceTask,
+ ): Boolean {
+ val requiredCapacity = task.flavor.meta["cpu-capacity"] as? Double
+ val availableCapacity = host.host.getModel().cpuCapacity
+
+ return requiredCapacity == null || availableCapacity >= (requiredCapacity / task.flavor.coreCount)
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuFilter.kt
new file mode 100644
index 00000000..c179a7bf
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuFilter.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.filters
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostFilter] that filters hosts based on the vCPU requirements of a [ServiceTask] and the available vCPUs on the host.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
+ */
+public class VCpuFilter(private val allocationRatio: Double) : HostFilter {
+ override fun test(
+ host: HostView,
+ task: ServiceTask,
+ ): Boolean {
+ val requested = task.flavor.coreCount
+ val totalCores = host.host.getModel().coreCount
+ val limit = totalCores * allocationRatio
+
+ // Do not allow an instance to overcommit against itself, only against other instances
+ if (requested > totalCores) {
+ return false
+ }
+
+ val availableCores = limit - host.provisionedCores
+ return availableCores >= requested
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt
new file mode 100644
index 00000000..b6c43c10
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.weights
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the available memory per core on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available core memory, and a negative number will result in the scheduler preferring hosts with less available core
+ * memory.
+ */
+public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
+ override fun getWeight(
+ host: HostView,
+ task: ServiceTask,
+ ): Double {
+ return host.availableMemory.toDouble()
+ }
+
+ override fun toString(): String = "CoreRamWeigher"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/HostWeigher.kt
new file mode 100644
index 00000000..c1e0c590
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/HostWeigher.kt
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.weights
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request.
+ */
+public interface HostWeigher {
+ /**
+ * The multiplier for the weigher.
+ */
+ public val multiplier: Double
+
+ /**
+ * Obtain the weight of the specified [host] when scheduling the specified [ServiceTask].
+ */
+ public fun getWeight(
+ host: HostView,
+ task: ServiceTask,
+ ): Double
+
+ /**
+ * Obtain the weights for [hosts] when scheduling the specified [task].
+ */
+ public fun getWeights(
+ hosts: List<HostView>,
+ task: ServiceTask,
+ ): Result {
+ val weights = DoubleArray(hosts.size)
+ var min = Double.MAX_VALUE
+ var max = Double.MIN_VALUE
+
+ for ((i, host) in hosts.withIndex()) {
+ val weight = getWeight(host, task)
+ weights[i] = weight
+ min = kotlin.math.min(min, weight)
+ max = kotlin.math.max(max, weight)
+ }
+
+ return Result(weights, min, max, multiplier)
+ }
+
+ /**
+ * A result returned by the weigher.
+ *
+ * @param weights The weights returned by the weigher.
+ * @param min The minimum weight returned.
+ * @param max The maximum weight returned.
+ * @param multiplier The weight multiplier to use.
+ */
+ public class Result(
+ public val weights: DoubleArray,
+ public val min: Double,
+ public val max: Double,
+ public val multiplier: Double,
+ )
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/InstanceCountWeigher.kt
new file mode 100644
index 00000000..9277c1ae
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/InstanceCountWeigher.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.weights
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the number of instances on the host.
+ */
+public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher {
+ override fun getWeight(
+ host: HostView,
+ task: ServiceTask,
+ ): Double {
+ return host.instanceCount.toDouble()
+ }
+
+ override fun toString(): String = "InstanceCountWeigher"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/RamWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/RamWeigher.kt
new file mode 100644
index 00000000..1cbfea59
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/RamWeigher.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.weights
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the available RAM (memory) on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available memory, and a negative number will result in the scheduler preferring hosts with less memory.
+ */
+public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
+ override fun getWeight(
+ host: HostView,
+ task: ServiceTask,
+ ): Double {
+ return host.availableMemory.toDouble()
+ }
+
+ override fun toString(): String = "RamWeigher"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuCapacityWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuCapacityWeigher.kt
new file mode 100644
index 00000000..4f52e11a
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuCapacityWeigher.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.weights
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the difference required vCPU capacity and the available CPU capacity.
+ */
+public class VCpuCapacityWeigher(override val multiplier: Double = 1.0) : HostWeigher {
+ override fun getWeight(
+ host: HostView,
+ task: ServiceTask,
+ ): Double {
+ val model = host.host.getModel()
+ val requiredCapacity = task.flavor.meta["cpu-capacity"] as? Double ?: 0.0
+ return model.cpuCapacity - requiredCapacity / task.flavor.coreCount
+ }
+
+ override fun toString(): String = "VCpuWeigher"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuWeigher.kt
new file mode 100644
index 00000000..3f9a7f03
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/VCpuWeigher.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.weights
+
+import org.opendc.compute.simulator.service.HostView
+import org.opendc.compute.simulator.service.ServiceTask
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the remaining number of vCPUs available.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
+ */
+public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher {
+ init {
+ require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" }
+ }
+
+ override fun getWeight(
+ host: HostView,
+ task: ServiceTask,
+ ): Double {
+ return allocationRatio - host.provisionedCores
+ }
+
+ override fun toString(): String = "VCpuWeigher"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt
new file mode 100644
index 00000000..d5fb991d
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt
@@ -0,0 +1,664 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
+import org.opendc.compute.api.TaskState
+import org.opendc.compute.carbon.CarbonTrace
+import org.opendc.compute.simulator.host.SimHost
+import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.compute.simulator.service.ServiceTask
+import org.opendc.compute.simulator.telemetry.table.HostInfo
+import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
+import org.opendc.compute.simulator.telemetry.table.TaskInfo
+import org.opendc.compute.simulator.telemetry.table.TaskTableReader
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
+ * export interval.
+ *
+ * @param dispatcher A [Dispatcher] for scheduling the future events.
+ * @param service The [ComputeService] to monitor.
+ * @param monitor The monitor to export the metrics to.
+ * @param exportInterval The export interval.
+ */
+public class ComputeMetricReader(
+ dispatcher: Dispatcher,
+ private val service: ComputeService,
+ private val monitor: ComputeMonitor,
+ private val exportInterval: Duration = Duration.ofMinutes(5),
+ private val startTime: Duration = Duration.ofMillis(0),
+ private val carbonTrace: CarbonTrace = CarbonTrace(null),
+) : AutoCloseable {
+ private val logger = KotlinLogging.logger {}
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher())
+ private val clock = dispatcher.timeSource
+
+ /**
+ * Aggregator for service metrics.
+ */
+ private val serviceTableReader =
+ ServiceTableReaderImpl(
+ service,
+ startTime,
+ )
+
+ private var loggCounter = 0
+
+ /**
+ * Mapping from [Host] instances to [HostTableReaderImpl]
+ */
+ private val hostTableReaders = mutableMapOf<SimHost, HostTableReaderImpl>()
+
+ /**
+ * Mapping from [Task] instances to [TaskTableReaderImpl]
+ */
+ private val taskTableReaders = mutableMapOf<ServiceTask, TaskTableReaderImpl>()
+
+ /**
+ * The background job that is responsible for collecting the metrics every cycle.
+ */
+ private val job =
+ scope.launch {
+ val intervalMs = exportInterval.toMillis()
+ try {
+ while (isActive) {
+ delay(intervalMs)
+
+ loggState()
+ }
+ } finally {
+ loggState()
+
+ if (monitor is AutoCloseable) {
+ monitor.close()
+ }
+ }
+ }
+
+ private fun loggState() {
+ loggCounter++
+ try {
+ val now = this.clock.instant()
+
+ for (host in this.service.hosts) {
+ val reader =
+ this.hostTableReaders.computeIfAbsent(host) {
+ HostTableReaderImpl(
+ it,
+ startTime,
+ carbonTrace,
+ )
+ }
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
+
+ for (task in this.service.tasks) {
+ val reader =
+ this.taskTableReaders.computeIfAbsent(task) {
+ TaskTableReaderImpl(
+ service,
+ it,
+ startTime,
+ )
+ }
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
+
+ for (task in this.service.tasksToRemove) {
+ task.delete()
+ }
+ this.service.clearTasksToRemove()
+
+ this.serviceTableReader.record(now)
+ monitor.record(this.serviceTableReader.copy())
+
+ if (loggCounter >= 100) {
+ var loggString = "\n\t\t\t\t\tMetrics after ${now.toEpochMilli() / 1000 / 60 / 60} hours:\n"
+ loggString += "\t\t\t\t\t\tTasks Total: ${this.serviceTableReader.tasksTotal}\n"
+ loggString += "\t\t\t\t\t\tTasks Active: ${this.serviceTableReader.tasksActive}\n"
+ loggString += "\t\t\t\t\t\tTasks Pending: ${this.serviceTableReader.tasksPending}\n"
+ loggString += "\t\t\t\t\t\tTasks Completed: ${this.serviceTableReader.tasksCompleted}\n"
+ loggString += "\t\t\t\t\t\tTasks Terminated: ${this.serviceTableReader.tasksTerminated}\n"
+
+ this.logger.warn { loggString }
+ loggCounter = 0
+ }
+ } catch (cause: Throwable) {
+ this.logger.warn(cause) { "Exporter threw an Exception" }
+ }
+ }
+
+ override fun close() {
+ job.cancel()
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ private class ServiceTableReaderImpl(
+ private val service: ComputeService,
+ private val startTime: Duration = Duration.ofMillis(0),
+ ) : ServiceTableReader {
+ override fun copy(): ServiceTableReader {
+ val newServiceTable =
+ ServiceTableReaderImpl(
+ service,
+ )
+ newServiceTable.setValues(this)
+
+ return newServiceTable
+ }
+
+ override fun setValues(table: ServiceTableReader) {
+ _timestamp = table.timestamp
+ _timestampAbsolute = table.timestampAbsolute
+
+ _hostsUp = table.hostsUp
+ _hostsDown = table.hostsDown
+ _tasksTotal = table.tasksTotal
+ _tasksPending = table.tasksPending
+ _tasksActive = table.tasksActive
+ _tasksCompleted = table.tasksCompleted
+ _tasksTerminated = table.tasksTerminated
+ _attemptsSuccess = table.attemptsSuccess
+ _attemptsFailure = table.attemptsFailure
+ }
+
+ private var _timestamp: Instant = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ private var _timestampAbsolute: Instant = Instant.MIN
+ override val timestampAbsolute: Instant
+ get() = _timestampAbsolute
+
+ override val hostsUp: Int
+ get() = _hostsUp
+ private var _hostsUp = 0
+
+ override val hostsDown: Int
+ get() = _hostsDown
+ private var _hostsDown = 0
+
+ override val tasksTotal: Int
+ get() = _tasksTotal
+ private var _tasksTotal = 0
+
+ override val tasksPending: Int
+ get() = _tasksPending
+ private var _tasksPending = 0
+
+ override val tasksCompleted: Int
+ get() = _tasksCompleted
+ private var _tasksCompleted = 0
+
+ override val tasksActive: Int
+ get() = _tasksActive
+ private var _tasksActive = 0
+
+ override val tasksTerminated: Int
+ get() = _tasksTerminated
+ private var _tasksTerminated = 0
+
+ override val attemptsSuccess: Int
+ get() = _attemptsSuccess
+ private var _attemptsSuccess = 0
+
+ override val attemptsFailure: Int
+ get() = _attemptsFailure
+ private var _attemptsFailure = 0
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ _timestamp = now
+ _timestampAbsolute = now + startTime
+
+ val stats = service.getSchedulerStats()
+ _hostsUp = stats.hostsAvailable
+ _hostsDown = stats.hostsUnavailable
+ _tasksTotal = stats.tasksTotal
+ _tasksPending = stats.tasksPending
+ _tasksCompleted = stats.tasksCompleted
+ _tasksActive = stats.tasksActive
+ _tasksTerminated = stats.tasksTerminated
+ _attemptsSuccess = stats.attemptsSuccess.toInt()
+ _attemptsFailure = stats.attemptsFailure.toInt()
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ private class HostTableReaderImpl(
+ host: SimHost,
+ private val startTime: Duration = Duration.ofMillis(0),
+ private val carbonTrace: CarbonTrace = CarbonTrace(null),
+ ) : HostTableReader {
+ override fun copy(): HostTableReader {
+ val newHostTable =
+ HostTableReaderImpl(_host)
+ newHostTable.setValues(this)
+
+ return newHostTable
+ }
+
+ override fun setValues(table: HostTableReader) {
+ _timestamp = table.timestamp
+ _timestampAbsolute = table.timestampAbsolute
+
+ _guestsTerminated = table.guestsTerminated
+ _guestsRunning = table.guestsRunning
+ _guestsError = table.guestsError
+ _guestsInvalid = table.guestsInvalid
+ _cpuLimit = table.cpuLimit
+ _cpuDemand = table.cpuDemand
+ _cpuUsage = table.cpuUsage
+ _cpuUtilization = table.cpuUtilization
+ _cpuActiveTime = table.cpuActiveTime
+ _cpuIdleTime = table.cpuIdleTime
+ _cpuStealTime = table.cpuStealTime
+ _cpuLostTime = table.cpuLostTime
+ _powerDraw = table.powerDraw
+ _energyUsage = table.energyUsage
+ _carbonIntensity = table.carbonIntensity
+ _carbonEmission = table.carbonEmission
+ _uptime = table.uptime
+ _downtime = table.downtime
+ _bootTime = table.bootTime
+ _bootTimeAbsolute = table.bootTimeAbsolute
+ }
+
+ private val _host = host
+
+ override val host: HostInfo =
+ HostInfo(
+ host.getUid().toString(),
+ host.getName(),
+ "x86",
+ host.getModel().coreCount,
+ host.getModel().cpuCapacity,
+ host.getModel().memoryCapacity,
+ )
+
+ override val timestamp: Instant
+ get() = _timestamp
+ private var _timestamp = Instant.MIN
+
+ override val timestampAbsolute: Instant
+ get() = _timestampAbsolute
+ private var _timestampAbsolute = Instant.MIN
+
+ override val guestsTerminated: Int
+ get() = _guestsTerminated
+ private var _guestsTerminated = 0
+
+ override val guestsRunning: Int
+ get() = _guestsRunning
+ private var _guestsRunning = 0
+
+ override val guestsError: Int
+ get() = _guestsError
+ private var _guestsError = 0
+
+ override val guestsInvalid: Int
+ get() = _guestsInvalid
+ private var _guestsInvalid = 0
+
+ override val cpuLimit: Float
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0f
+
+ override val cpuUsage: Float
+ get() = _cpuUsage
+ private var _cpuUsage = 0.0f
+
+ override val cpuDemand: Float
+ get() = _cpuDemand
+ private var _cpuDemand = 0.0f
+
+ override val cpuUtilization: Float
+ get() = _cpuUtilization
+ private var _cpuUtilization = 0.0f
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ override val powerDraw: Float
+ get() = _powerDraw
+ private var _powerDraw = 0.0f
+
+ override val energyUsage: Float
+ get() = _energyUsage - previousEnergyUsage
+ private var _energyUsage = 0.0f
+ private var previousEnergyUsage = 0.0f
+
+ override val carbonIntensity: Float
+ get() = _carbonIntensity
+ private var _carbonIntensity = 0.0f
+
+ override val carbonEmission: Float
+ get() = _carbonEmission
+ private var _carbonEmission = 0.0f
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime = 0L
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime = 0L
+ private var previousDowntime = 0L
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ override val bootTimeAbsolute: Instant?
+ get() = _bootTimeAbsolute
+ private var _bootTimeAbsolute: Instant? = null
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val hostCpuStats = _host.getCpuStats()
+ val hostSysStats = _host.getSystemStats()
+
+ _timestamp = now
+ _timestampAbsolute = now + startTime
+
+ _guestsTerminated = hostSysStats.guestsTerminated
+ _guestsRunning = hostSysStats.guestsRunning
+ _guestsError = hostSysStats.guestsError
+ _guestsInvalid = hostSysStats.guestsInvalid
+ _cpuLimit = hostCpuStats.capacity
+ _cpuDemand = hostCpuStats.demand
+ _cpuUsage = hostCpuStats.usage
+ _cpuUtilization = hostCpuStats.utilization
+ _cpuActiveTime = hostCpuStats.activeTime
+ _cpuIdleTime = hostCpuStats.idleTime
+ _cpuStealTime = hostCpuStats.stealTime
+ _cpuLostTime = hostCpuStats.lostTime
+ _powerDraw = hostSysStats.powerDraw
+ _energyUsage = hostSysStats.energyUsage
+ _carbonIntensity = carbonTrace.getCarbonIntensity(timestampAbsolute)
+
+ _carbonEmission = carbonIntensity * (energyUsage / 3600000.0f) // convert energy usage from J to kWh
+ _uptime = hostSysStats.uptime.toMillis()
+ _downtime = hostSysStats.downtime.toMillis()
+ _bootTime = hostSysStats.bootTime
+ _bootTime = hostSysStats.bootTime + startTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+ previousEnergyUsage = _energyUsage
+ previousUptime = _uptime
+ previousDowntime = _downtime
+
+ _guestsTerminated = 0
+ _guestsRunning = 0
+ _guestsError = 0
+ _guestsInvalid = 0
+
+ _cpuLimit = 0.0f
+ _cpuUsage = 0.0f
+ _cpuDemand = 0.0f
+ _cpuUtilization = 0.0f
+
+ _powerDraw = 0.0f
+ _energyUsage = 0.0f
+ _carbonIntensity = 0.0f
+ _carbonEmission = 0.0f
+ }
+ }
+
+ /**
+ * An aggregator for task metrics before they are reported.
+ */
+ private class TaskTableReaderImpl(
+ private val service: ComputeService,
+ private val task: ServiceTask,
+ private val startTime: Duration = Duration.ofMillis(0),
+ ) : TaskTableReader {
+ override fun copy(): TaskTableReader {
+ val newTaskTable =
+ TaskTableReaderImpl(
+ service,
+ task,
+ )
+ newTaskTable.setValues(this)
+
+ return newTaskTable
+ }
+
+ override fun setValues(table: TaskTableReader) {
+ host = table.host
+
+ _timestamp = table.timestamp
+ _timestampAbsolute = table.timestampAbsolute
+
+ _cpuLimit = table.cpuLimit
+ _cpuActiveTime = table.cpuActiveTime
+ _cpuIdleTime = table.cpuIdleTime
+ _cpuStealTime = table.cpuStealTime
+ _cpuLostTime = table.cpuLostTime
+ _uptime = table.uptime
+ _downtime = table.downtime
+ _provisionTime = table.provisionTime
+ _bootTime = table.bootTime
+ _bootTimeAbsolute = table.bootTimeAbsolute
+
+ _creationTime = table.creationTime
+ _finishTime = table.finishTime
+
+ _taskState = table.taskState
+ }
+
+ /**
+ * The static information about this task.
+ */
+ override val taskInfo =
+ TaskInfo(
+ task.uid.toString(),
+ task.name,
+ "vm",
+ "x86",
+ task.flavor.coreCount,
+ task.flavor.memorySize,
+ )
+
+ /**
+ * The [HostInfo] of the host on which the task is hosted.
+ */
+ override var host: HostInfo? = null
+ private var _host: SimHost? = null
+
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ private var _timestampAbsolute = Instant.MIN
+ override val timestampAbsolute: Instant
+ get() = _timestampAbsolute
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime: Long = 0
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime: Long = 0
+ private var previousDowntime = 0L
+
+ override val provisionTime: Instant?
+ get() = _provisionTime
+ private var _provisionTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ override val creationTime: Instant?
+ get() = _creationTime
+ private var _creationTime: Instant? = null
+
+ override val finishTime: Instant?
+ get() = _finishTime
+ private var _finishTime: Instant? = null
+
+ override val cpuLimit: Float
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0f
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ override val bootTimeAbsolute: Instant?
+ get() = _bootTimeAbsolute
+ private var _bootTimeAbsolute: Instant? = null
+
+ override val taskState: TaskState?
+ get() = _taskState
+ private var _taskState: TaskState? = null
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val newHost = service.lookupHost(task)
+ if (newHost != null && newHost.getUid() != _host?.getUid()) {
+ _host = newHost
+ host =
+ HostInfo(
+ newHost.getUid().toString(),
+ newHost.getName(),
+ "x86",
+ newHost.getModel().coreCount,
+ newHost.getModel().cpuCapacity,
+ newHost.getModel().memoryCapacity,
+ )
+ }
+
+ val cpuStats = _host?.getCpuStats(task)
+ val sysStats = _host?.getSystemStats(task)
+
+ _timestamp = now
+ _timestampAbsolute = now + startTime
+
+ _cpuLimit = cpuStats?.capacity ?: 0.0f
+ _cpuActiveTime = cpuStats?.activeTime ?: 0
+ _cpuIdleTime = cpuStats?.idleTime ?: 0
+ _cpuStealTime = cpuStats?.stealTime ?: 0
+ _cpuLostTime = cpuStats?.lostTime ?: 0
+ _uptime = sysStats?.uptime?.toMillis() ?: 0
+ _downtime = sysStats?.downtime?.toMillis() ?: 0
+ _provisionTime = task.launchedAt
+ _bootTime = sysStats?.bootTime
+ _creationTime = task.createdAt
+ _finishTime = task.finishedAt
+
+ _taskState = task.state
+
+ if (sysStats != null) {
+ _bootTimeAbsolute = sysStats.bootTime + startTime
+ } else {
+ _bootTimeAbsolute = null
+ }
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ previousUptime = _uptime
+ previousDowntime = _downtime
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+
+ _host = null
+ _cpuLimit = 0.0f
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt
new file mode 100644
index 00000000..534bcc09
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry
+
+import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
+import org.opendc.compute.simulator.telemetry.table.TaskTableReader
+
+/**
+ * A monitor that tracks the metrics and events of the OpenDC Compute service.
+ */
+public interface ComputeMonitor {
+ /**
+ * Record an entry with the specified [reader].
+ */
+ public fun record(reader: TaskTableReader) {}
+
+ /**
+ * Record an entry with the specified [reader].
+ */
+ public fun record(reader: HostTableReader) {}
+
+ /**
+ * Record an entry with the specified [reader].
+ */
+ public fun record(reader: ServiceTableReader) {}
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt
new file mode 100644
index 00000000..3f220ad1
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt
@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.parquet
+
+import kotlinx.serialization.KSerializer
+import kotlinx.serialization.Serializable
+import kotlinx.serialization.builtins.ListSerializer
+import kotlinx.serialization.descriptors.SerialDescriptor
+import kotlinx.serialization.descriptors.buildClassSerialDescriptor
+import kotlinx.serialization.encoding.Decoder
+import kotlinx.serialization.encoding.Encoder
+import kotlinx.serialization.encoding.encodeStructure
+import kotlinx.serialization.json.Json
+import kotlinx.serialization.json.JsonDecoder
+import kotlinx.serialization.json.JsonElement
+import kotlinx.serialization.json.jsonObject
+import org.opendc.common.logger.logger
+import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
+import org.opendc.compute.simulator.telemetry.table.TaskTableReader
+import org.opendc.trace.util.parquet.exporter.ColListSerializer
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+import org.opendc.trace.util.parquet.exporter.Exportable
+import org.opendc.trace.util.parquet.exporter.columnSerializer
+
+/**
+ * Aggregates the necessary settings to personalize the output
+ * parquet files for compute workloads.
+ *
+ * @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file.
+ * @param[taskExportColumns] the columns that will be included in the `task.parquet` raw output file.
+ * @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file.
+ */
+@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class)
+public data class ComputeExportConfig(
+ public val hostExportColumns: Set<ExportColumn<HostTableReader>>,
+ public val taskExportColumns: Set<ExportColumn<TaskTableReader>>,
+ public val serviceExportColumns: Set<ExportColumn<ServiceTableReader>>,
+) {
+ public constructor(
+ hostExportColumns: Collection<ExportColumn<HostTableReader>>,
+ taskExportColumns: Collection<ExportColumn<TaskTableReader>>,
+ serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>,
+ ) : this(
+ hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS,
+ taskExportColumns.toSet() + DfltTaskExportColumns.BASE_EXPORT_COLUMNS,
+ serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS,
+ )
+
+ /**
+ * @return formatted string representing the export config.
+ */
+ public fun fmt(): String =
+ """
+ | === Compute Export Config ===
+ | Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')}
+ | Task columns : ${taskExportColumns.map { it.name }.toString().trim('[', ']')}
+ | Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')}
+ """.trimIndent()
+
+ public companion object {
+ internal val LOG by logger()
+
+ /**
+ * Force the jvm to load the default [ExportColumn]s relevant to compute export,
+ * so that they are available for deserialization.
+ */
+ public fun loadDfltColumns() {
+ DfltHostExportColumns
+ DfltTaskExportColumns
+ DfltServiceExportColumns
+ }
+
+ /**
+ * Config that includes all columns defined in [DfltHostExportColumns],
+ * [DfltTaskExportColumns], [DfltServiceExportColumns] among all other loaded
+ * columns for [HostTableReader], [TaskTableReader] and [ServiceTableReader].
+ */
+ public val ALL_COLUMNS: ComputeExportConfig by lazy {
+ ComputeExportConfig.Companion.loadDfltColumns()
+ ComputeExportConfig(
+ hostExportColumns = ExportColumn.getAllLoadedColumns(),
+ taskExportColumns = ExportColumn.getAllLoadedColumns(),
+ serviceExportColumns = ExportColumn.getAllLoadedColumns(),
+ )
+ }
+
+ /**
+ * A runtime [KSerializer] is needed for reasons explained in [columnSerializer] docs.
+ *
+ * This serializer makes use of reified column serializers for the 2 properties.
+ */
+ internal object ComputeExportConfigSerializer : KSerializer<ComputeExportConfig> {
+ override val descriptor: SerialDescriptor =
+ buildClassSerialDescriptor("org.opendc.compute.telemetry.export.parquet.ComputeExportConfig") {
+ element(
+ "hostExportColumns",
+ ListSerializer(columnSerializer<HostTableReader>()).descriptor,
+ )
+ element(
+ "taskExportColumns",
+ ListSerializer(columnSerializer<TaskTableReader>()).descriptor,
+ )
+ element(
+ "serviceExportColumns",
+ ListSerializer(columnSerializer<ServiceTableReader>()).descriptor,
+ )
+ }
+
+ override fun deserialize(decoder: Decoder): ComputeExportConfig {
+ val jsonDec =
+ (decoder as? JsonDecoder) ?: let {
+ // Basically a recursive call with a JsonDecoder.
+ return json.decodeFromString(decoder.decodeString().trim('"'))
+ }
+
+ // Loads the default columns so that they are available for deserialization.
+ ComputeExportConfig.Companion.loadDfltColumns()
+ val elem = jsonDec.decodeJsonElement().jsonObject
+
+ val hostFields: List<ExportColumn<HostTableReader>> = elem["hostExportColumns"].toFieldList()
+ val taskFields: List<ExportColumn<TaskTableReader>> = elem["taskExportColumns"].toFieldList()
+ val serviceFields: List<ExportColumn<ServiceTableReader>> = elem["serviceExportColumns"].toFieldList()
+
+ return ComputeExportConfig(
+ hostExportColumns = hostFields,
+ taskExportColumns = taskFields,
+ serviceExportColumns = serviceFields,
+ )
+ }
+
+ override fun serialize(
+ encoder: Encoder,
+ value: ComputeExportConfig,
+ ) {
+ encoder.encodeStructure(ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor) {
+ encodeSerializableElement(
+ ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor,
+ 0,
+ ColListSerializer(columnSerializer<HostTableReader>()),
+ value.hostExportColumns.toList(),
+ )
+ encodeSerializableElement(
+ ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor,
+ 1,
+ ColListSerializer(columnSerializer<TaskTableReader>()),
+ value.taskExportColumns.toList(),
+ )
+ encodeSerializableElement(
+ ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor,
+ 2,
+ ColListSerializer(columnSerializer<ServiceTableReader>()),
+ value.serviceExportColumns.toList(),
+ )
+ }
+ }
+ }
+ }
+}
+
+private val json = Json { ignoreUnknownKeys = true }
+
+private inline fun <reified T : Exportable> JsonElement?.toFieldList(): List<ExportColumn<T>> =
+ this?.let {
+ json.decodeFromJsonElement(ColListSerializer(columnSerializer<T>()), it)
+ }?.ifEmpty {
+ ComputeExportConfig.Companion.LOG.warn(
+ "deserialized list of export columns for exportable ${T::class.simpleName} " +
+ "produced empty list, falling back to all loaded columns",
+ )
+ ExportColumn.getAllLoadedColumns<T>()
+ } ?: ExportColumn.getAllLoadedColumns<T>()
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt
new file mode 100644
index 00000000..1b76da6b
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltHostExportColumns.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.parquet
+
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable.
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltHostExportColumns
+ * ...
+ * ```
+ */
+public object DfltHostExportColumns {
+ public val TIMESTAMP: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val HOST_ID: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ ) { Binary.fromString(it.host.id) }
+
+ public val HOST_NAME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_name"),
+ ) { Binary.fromString(it.host.name) }
+
+ public val CPU_COUNT: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("core_count"),
+ ) { it.host.coreCount }
+
+ public val MEM_CAPACITY: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("mem_capacity"),
+ ) { it.host.memCapacity }
+
+ public val GUESTS_TERMINATED: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_terminated"),
+ ) { it.guestsTerminated }
+
+ public val GUESTS_RUNNING: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_running"),
+ ) { it.guestsRunning }
+
+ public val GUESTS_ERROR: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_error"),
+ ) { it.guestsError }
+
+ public val GUESTS_INVALID: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_invalid"),
+ ) { it.guestsInvalid }
+
+ public val CPU_LIMIT: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("cpu_limit"),
+ ) { it.cpuLimit }
+
+ public val CPU_USAGE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("cpu_usage"),
+ ) { it.cpuUsage }
+
+ public val CPU_DEMAND: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("cpu_demand"),
+ ) { it.cpuDemand }
+
+ public val CPU_UTILIZATION: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("cpu_utilization"),
+ ) { it.cpuUtilization }
+
+ public val CPU_TIME_ACTIVE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_active"),
+ ) { it.cpuActiveTime }
+
+ public val CPU_TIME_IDLE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_idle"),
+ ) { it.cpuIdleTime }
+
+ public val CPU_TIME_STEAL: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_steal"),
+ ) { it.cpuStealTime }
+
+ public val CPU_TIME_LOST: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_lost"),
+ ) { it.cpuLostTime }
+
+ public val POWER_DRAW: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("power_draw"),
+ ) { it.powerDraw }
+
+ public val ENERGY_USAGE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("energy_usage"),
+ ) { it.energyUsage }
+
+ public val CARBON_INTENSITY: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("carbon_intensity"),
+ ) { it.carbonIntensity }
+
+ public val CARBON_EMISSION: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("carbon_emission"),
+ ) { it.carbonEmission }
+
+ public val UP_TIME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("uptime"),
+ ) { it.uptime }
+
+ public val DOWN_TIME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("downtime"),
+ ) { it.downtime }
+
+ public val BOOT_TIME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time"),
+ ) { it.bootTime?.toEpochMilli() }
+
+ public val BOOT_TIME_ABS: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time_absolute"),
+ ) { it.bootTimeAbsolute?.toEpochMilli() }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltServiceExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltServiceExportColumns.kt
new file mode 100644
index 00000000..aa08e8ff
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltServiceExportColumns.kt
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.parquet
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable.
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltServiceExportColumns
+ * ...
+ * ```
+ */
+public object DfltServiceExportColumns {
+ public val TIMESTAMP: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val HOSTS_UP: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("hosts_up"),
+ ) { it.hostsUp }
+
+ public val TASKS_PENDING: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("tasks_pending"),
+ ) { it.tasksPending }
+
+ public val TASKS_TOTAL: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("tasks_total"),
+ ) { it.tasksTotal }
+
+ public val TASKS_ACTIVE: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("tasks_active"),
+ ) { it.tasksActive }
+
+ public val TASKS_COMPLETED: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("tasks_completed"),
+ ) { it.tasksCompleted }
+
+ public val TASKS_FAILED: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("tasks_terminated"),
+ ) { it.tasksTerminated }
+
+ public val ATTEMPTS_SUCCESS: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("attempts_success"),
+ ) { it.attemptsSuccess }
+
+ public val ATTEMPTS_FAILURE: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("attempts_failure"),
+ ) { it.attemptsFailure }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
new file mode 100644
index 00000000..6658e444
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.parquet
+
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.simulator.telemetry.table.TaskTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltTaskExportColumns
+ * ...
+ * ```
+ */
+public object DfltTaskExportColumns {
+ public val TIMESTAMP: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val TASK_ID: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("task_id"),
+ ) { Binary.fromString(it.taskInfo.id) }
+
+ public val HOST_ID: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field =
+ Types.optional(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ ) { it.host?.id?.let { Binary.fromString(it) } }
+
+ public val TASK_NAME: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("task_name"),
+ ) { Binary.fromString(it.taskInfo.name) }
+
+ public val CPU_COUNT: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("cpu_count"),
+ ) { it.taskInfo.cpuCount }
+
+ public val MEM_CAPACITY: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("mem_capacity"),
+ ) { it.taskInfo.memCapacity }
+
+ public val CPU_LIMIT: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("cpu_limit"),
+ ) { it.cpuLimit }
+
+ public val CPU_TIME_ACTIVE: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_active"),
+ ) { it.cpuActiveTime }
+
+ public val CPU_TIME_IDLE: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_idle"),
+ ) { it.cpuIdleTime }
+
+ public val CPU_TIME_STEAL: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_steal"),
+ ) { it.cpuStealTime }
+
+ public val CPU_TIME_LOST: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_lost"),
+ ) { it.cpuLostTime }
+
+ public val UP_TIME: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("uptime"),
+ ) { it.uptime }
+
+ public val DOWN_TIME: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("downtime"),
+ ) { it.downtime }
+
+ public val PROVISION_TIME: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("provision_time"),
+ ) { it.provisionTime?.toEpochMilli() }
+
+ public val BOOT_TIME: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time"),
+ ) { it.bootTime?.toEpochMilli() }
+
+ public val CREATION_TIME: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("creation_time"),
+ ) { it.creationTime?.toEpochMilli() }
+
+ public val FINISH_TIME: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("finish_time"),
+ ) { it.finishTime?.toEpochMilli() }
+
+ public val BOOT_TIME_ABS: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time_absolute"),
+ ) { it.bootTimeAbsolute?.toEpochMilli() }
+
+ public val TASK_STATE: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field =
+ Types.optional(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("task_state"),
+ ) { Binary.fromString(it.taskState?.name) }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt
new file mode 100644
index 00000000..4cd920c4
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.parquet
+
+import org.opendc.compute.simulator.telemetry.ComputeMonitor
+import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
+import org.opendc.compute.simulator.telemetry.table.TaskTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+import org.opendc.trace.util.parquet.exporter.Exportable
+import org.opendc.trace.util.parquet.exporter.Exporter
+import java.io.File
+
+/**
+ * A [ComputeMonitor] that logs the events to a Parquet file.
+ */
+public class ParquetComputeMonitor(
+ private val hostExporter: Exporter<HostTableReader>,
+ private val taskExporter: Exporter<TaskTableReader>,
+ private val serviceExporter: Exporter<ServiceTableReader>,
+) : ComputeMonitor, AutoCloseable {
+ override fun record(reader: HostTableReader) {
+ hostExporter.write(reader)
+ }
+
+ override fun record(reader: TaskTableReader) {
+ taskExporter.write(reader)
+ }
+
+ override fun record(reader: ServiceTableReader) {
+ serviceExporter.write(reader)
+ }
+
+ override fun close() {
+ hostExporter.close()
+ taskExporter.close()
+ serviceExporter.close()
+ }
+
+ public companion object {
+ /**
+ * Overloaded constructor with [ComputeExportConfig] as parameter.
+ *
+ * @param[base] parent pathname for output file.
+ * @param[partition] child pathname for output file.
+ * @param[bufferSize] size of the buffer used by the writer thread.
+ */
+ public operator fun invoke(
+ base: File,
+ partition: String,
+ bufferSize: Int,
+ computeExportConfig: ComputeExportConfig,
+ ): ParquetComputeMonitor =
+ invoke(
+ base = base,
+ partition = partition,
+ bufferSize = bufferSize,
+ hostExportColumns = computeExportConfig.hostExportColumns,
+ taskExportColumns = computeExportConfig.taskExportColumns,
+ serviceExportColumns = computeExportConfig.serviceExportColumns,
+ )
+
+ /**
+ * Constructor that loads default [ExportColumn]s defined in
+ * [DfltHostExportColumns], [DfltTaskExportColumns], [DfltServiceExportColumns]
+ * in case optional parameters are omitted and all fields need to be retrieved.
+ *
+ * @param[base] parent pathname for output file.
+ * @param[partition] child pathname for output file.
+ * @param[bufferSize] size of the buffer used by the writer thread.
+ */
+ public operator fun invoke(
+ base: File,
+ partition: String,
+ bufferSize: Int,
+ hostExportColumns: Collection<ExportColumn<HostTableReader>>? = null,
+ taskExportColumns: Collection<ExportColumn<TaskTableReader>>? = null,
+ serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>? = null,
+ ): ParquetComputeMonitor {
+ // Loads the fields in case they need to be retrieved if optional params are omitted.
+ ComputeExportConfig.loadDfltColumns()
+
+ return ParquetComputeMonitor(
+ hostExporter =
+ Exporter(
+ outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() },
+ columns = hostExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
+ taskExporter =
+ Exporter(
+ outputFile = File(base, "$partition/task.parquet").also { it.parentFile.mkdirs() },
+ columns = taskExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
+ serviceExporter =
+ Exporter(
+ outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() },
+ columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
+ )
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/README.md b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/README.md
new file mode 100644
index 00000000..3baafed4
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/README.md
@@ -0,0 +1,70 @@
+### Summary
+Added output configuration, that can be defined in the scenario `.json` file, that allows to select which columns are to be included in the raw oputput files `host.parquet`, `task.parquet` and `service.parquet`.
+
+### Columns
+The 'default' columns are defined in `DfltHostExportcolumns`, `DfltTaskExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be definied anywhere (`ExportColumn<Exportable>`) and it is going to be deserializable as long as it is loaded by the jvm.
+
+### Deserialization
+Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex is provided, the default one is used. The default regex matches the column name in case-insensitive manner, either with `_` as in the name or with ` ` (blank space).
+
+###### E.g.:
+***column name*** = "cpuModel\_count"
+***default column regex*** = "\\s*(?:cpu_count|cpuModel count)\\s*" (case insensitive)
+***matches*** = "cpuModel\_count", "cpuModel count", "CpU/_cOuNt" etc.
+
+### JSON Schema
+```json
+// scenario.json
+{
+ ...
+ "computeExportConfig": {
+ "type": "object",
+ "properties": {
+ "hostExportColumns": { "type": "array" },
+ "taskExportColumns": { "type": "array" } ,
+ "serviceExportColumns": { "type": "array" } ,
+ "required": [ /* NONE REQUIRED */ ]
+ }
+ },
+ ...
+ "required": [
+ ...
+ // NOT REQUIRED
+ ]
+}
+```
+
+&nbsp;
+###### Bad Formatting Cases
+- If a column name (and type) does not match any deserializable column, the entry is ignored and error message is logged.
+- If an empty list of columns is provided or those that are provided were not deserializable, then all loaded columns for that `Exportable` are used, and a warning message is logged.
+- If no list is provided, then all loaded columns for that `Exportable` are used.
+
+
+### Example
+
+```json
+// scenario.json
+{
+ ...
+ "computeExportConfig": {
+ "hostExportColumns": ["timestamp", "timestamp_absolute", "invalid-entry1", "guests_invalid"],
+ "taskExportColumns": ["invalid-entry2"],
+ "serviceExportColumns": ["timestamp", "tasks_active", "tasks_pending"]
+ },
+ ...
+```
+
+```json
+// console output
+10:51:56.561 [ERROR] ColListSerializer - no match found for column "invalid-entry1", ignoring...
+10:51:56.563 [ERROR] ColListSerializer - no match found for column "invalid-entry2", ignoring...
+10:51:56.564 [WARN] ComputeExportConfig - deserialized list of export columns for exportable TaskTableReader produced empty list, falling back to all loaded columns
+10:51:56.584 [INFO] ScenariosSpec -
+| === Compute Export Config ===
+| Host columns : timestamp, timestamp_absolute, guests_invalid
+| Task columns : timestamp, timestamp_absolute, task_id, task_name, cpu_count, mem_capacity, cpu_limit, cpu_time_active, cpu_time_idle, cpu_time_steal, cpu_time_lost, uptime, downtime, provision_time, boot_time, boot_time_absolute
+| Service columns : timestamp, tasks_active, tasks_pending
+
+```
+
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostInfo.kt
index 907f6acd..1f1b9522 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostInfo.kt
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator
-
-import org.opendc.compute.api.Task
-import org.opendc.simulator.compute.workload.SimWorkload
+package org.opendc.compute.simulator.telemetry.table
/**
- * A [SimWorkloadMapper] that maps a [Task] to a workload via the meta-data.
+ * Information about a host exposed to the telemetry service.
*/
-public class SimMetaWorkloadMapper(private val key: String = "workload") : SimWorkloadMapper {
- override fun createWorkload(task: Task): SimWorkload {
- return requireNotNull(task.meta[key] ?: task.image.meta[key]) as SimWorkload
- }
-}
+public data class HostInfo(
+ val id: String,
+ val name: String,
+ val arch: String,
+ val coreCount: Int,
+ val coreSpeed: Float,
+ val memCapacity: Long,
+)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt
new file mode 100644
index 00000000..5f09e7f5
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+import org.opendc.trace.util.parquet.exporter.Exportable
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a host trace entry.
+ */
+public interface HostTableReader : Exportable {
+ public fun copy(): HostTableReader
+
+ public fun setValues(table: HostTableReader)
+
+ /**
+ * The [HostInfo] of the host to which the row belongs to.
+ */
+ public val host: HostInfo
+
+ /**
+ * The timestamp of the current entry of the reader relative to the start of the workload.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestampAbsolute: Instant
+
+ /**
+ * The number of guests that are in a terminated state.
+ */
+ public val guestsTerminated: Int
+
+ /**
+ * The number of guests that are in a running state.
+ */
+ public val guestsRunning: Int
+
+ /**
+ * The number of guests that are in an error state.
+ */
+ public val guestsError: Int
+
+ /**
+ * The number of guests that are in an unknown state.
+ */
+ public val guestsInvalid: Int
+
+ /**
+ * The capacity of the CPUs in the host (in MHz).
+ */
+ public val cpuLimit: Float
+
+ /**
+ * The usage of all CPUs in the host (in MHz).
+ */
+ public val cpuUsage: Float
+
+ /**
+ * The demand of all vCPUs of the guests (in MHz)
+ */
+ public val cpuDemand: Float
+
+ /**
+ * The CPU utilization of the host.
+ */
+ public val cpuUtilization: Float
+
+ /**
+ * The duration (in ms) that a CPU was active in the host.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in ms) that a CPU was idle in the host.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in ms) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in ms) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+
+ /**
+ * The current power draw of the host in W.
+ */
+ public val powerDraw: Float
+
+ /**
+ * The total energy consumption of the host since last sample in J.
+ */
+ public val energyUsage: Float
+
+ /**
+ * The current carbon intensity of the host in gCO2 / kW.
+ */
+ public val carbonIntensity: Float
+
+ /**
+ * The current carbon emission since the last deadline in g.
+ */
+ public val carbonEmission: Float
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the host booted relative to the start of the workload.
+ */
+ public val bootTime: Instant?
+
+ /**
+ * The [Instant] at which the host booted.
+ */
+ public val bootTimeAbsolute: Instant?
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceData.kt
new file mode 100644
index 00000000..16c38297
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceData.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+import java.time.Instant
+
+/**
+ * A trace entry for the compute service.
+ */
+public data class ServiceData(
+ val timestamp: Instant,
+ val hostsUp: Int,
+ val hostsDown: Int,
+ val tasksTotal: Int,
+ val tasksPending: Int,
+ val tasksActive: Int,
+ val attemptsSuccess: Int,
+ val attemptsTerminated: Int,
+)
+
+/**
+ * Convert a [ServiceTableReader] into a persistent object.
+ */
+public fun ServiceTableReader.toServiceData(): ServiceData {
+ return ServiceData(
+ timestamp,
+ hostsUp,
+ hostsDown,
+ tasksTotal,
+ tasksPending,
+ tasksActive,
+ attemptsSuccess,
+ attemptsFailure,
+ )
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt
new file mode 100644
index 00000000..690dfe0a
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+import org.opendc.trace.util.parquet.exporter.Exportable
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a service trace entry.
+ */
+public interface ServiceTableReader : Exportable {
+ public fun copy(): ServiceTableReader
+
+ public fun setValues(table: ServiceTableReader)
+
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestampAbsolute: Instant
+
+ /**
+ * The number of hosts that are up at this instant.
+ */
+ public val hostsUp: Int
+
+ /**
+ * The number of hosts that are down at this instant.
+ */
+ public val hostsDown: Int
+
+ /**
+ * The number of tasks that are registered with the compute service.
+ */
+ public val tasksTotal: Int
+
+ /**
+ * The number of tasks that are pending to be scheduled.
+ */
+ public val tasksPending: Int
+
+ /**
+ * The number of tasks that are currently active.
+ */
+ public val tasksActive: Int
+
+ /**
+ * The number of tasks that completed the tasks successfully
+ */
+ public val tasksCompleted: Int
+
+ /**
+ * The number of tasks that failed more times than allowed and are thus terminated
+ */
+ public val tasksTerminated: Int
+
+ /**
+ * The scheduling attempts that were successful.
+ */
+ public val attemptsSuccess: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to client error.
+ */
+ public val attemptsFailure: Int
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskInfo.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskInfo.kt
new file mode 100644
index 00000000..6ff56541
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskInfo.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+/**
+ * Static information about a task exposed to the telemetry service.
+ */
+public data class TaskInfo(
+ val id: String,
+ val name: String,
+ val type: String,
+ val arch: String,
+ val cpuCount: Int,
+ val memCapacity: Long,
+)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt
new file mode 100644
index 00000000..bc6a4edd
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+import org.opendc.compute.api.TaskState
+import org.opendc.compute.simulator.telemetry.parquet.DfltTaskExportColumns
+import org.opendc.trace.util.parquet.exporter.Exportable
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a task trace entry.
+ */
+public interface TaskTableReader : Exportable {
+ public fun copy(): TaskTableReader
+
+ public fun setValues(table: TaskTableReader)
+
+ /**
+ * The timestamp of the current entry of the reader relative to the start of the workload.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestampAbsolute: Instant
+
+ /**
+ * The [TaskInfo] of the task to which the row belongs to.
+ */
+ public val taskInfo: TaskInfo
+
+ /**
+ * The [HostInfo] of the host on which the task is hosted or `null` if it has no host.
+ */
+ public val host: HostInfo?
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the task was enqueued for the scheduler.
+ */
+ public val provisionTime: Instant?
+
+ /**
+ * The [Instant] at which the task booted relative to the start of the workload.
+ */
+ public val bootTime: Instant?
+
+ /**
+ * The [Instant] at which the task booted.
+ */
+ public val bootTimeAbsolute: Instant?
+
+ /**
+ * The [Instant] at which the task booted relative to the start of the workload.
+ */
+ public val creationTime: Instant?
+
+ /**
+ * The [Instant] at which the task booted relative to the start of the workload.
+ */
+ public val finishTime: Instant?
+
+ /**
+ * The capacity of the CPUs of Host on which the task is running (in MHz).
+ */
+ public val cpuLimit: Float
+
+ /**
+ * The duration (in seconds) that a CPU was active in the task.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the task.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+
+ /**
+ * The state of the task
+ */
+ public val taskState: TaskState?
+}
+
+// Loads the default export fields for deserialization whenever this file is loaded.
+private val _ignore = DfltTaskExportColumns
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
deleted file mode 100644
index b5bc66a9..00000000
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ /dev/null
@@ -1,393 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.suspendCancellableCoroutine
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Image
-import org.opendc.compute.api.Task
-import org.opendc.compute.api.TaskState
-import org.opendc.compute.api.TaskWatcher
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostListener
-import org.opendc.simulator.compute.SimBareMetalMachine
-import org.opendc.simulator.compute.kernel.SimHypervisor
-import org.opendc.simulator.compute.model.Cpu
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.workload.SimTrace
-import org.opendc.simulator.compute.workload.SimTraceFragment
-import org.opendc.simulator.flow2.FlowEngine
-import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory
-import org.opendc.simulator.kotlin.runSimulation
-import java.time.Instant
-import java.util.SplittableRandom
-import java.util.UUID
-import kotlin.coroutines.resume
-
-/**
- * Basic test-suite for the hypervisor.
- */
-internal class SimHostTest {
- private lateinit var machineModel: MachineModel
-
- @BeforeEach
- fun setUp() {
- machineModel =
- MachineModel(
- Cpu(
- 0,
- 2,
- 3200.0,
- "Intel",
- "Xeon",
- "amd64",
- ),
- // memory
- MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000 * 4),
- )
- }
-
- /**
- * Test a single virtual machine hosted by the hypervisor.
- */
- @Test
- fun testSingle() =
- runSimulation {
- val duration = 5 * 60L
-
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val machine = SimBareMetalMachine.create(graph, machineModel)
- val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
-
- val host =
- SimHost(
- uid = UUID.randomUUID(),
- name = "test",
- meta = emptyMap(),
- timeSource,
- machine,
- hypervisor,
- )
- val vmImage =
- MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to
- SimTrace.ofFragments(
- SimTraceFragment(0, duration * 1000, 0.0, 2),
- SimTraceFragment(duration * 1000, duration * 1000, 3200.0, 2),
- SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
- SimTraceFragment(duration * 3000, duration * 1000, 6500.0, 2),
- ).createWorkload(0),
- ),
- )
-
- val flavor = MockFlavor(2, 0)
-
- suspendCancellableCoroutine { cont ->
- host.addListener(
- object : HostListener {
- private var finished = 0
-
- override fun onStateChanged(
- host: Host,
- task: Task,
- newState: TaskState,
- ) {
- if (newState == TaskState.TERMINATED && ++finished == 1) {
- cont.resume(Unit)
- }
- }
- },
- )
- val server = MockTask(UUID.randomUUID(), "a", flavor, vmImage)
- host.spawn(server)
- host.start(server)
- }
-
- // Ensure last cycle is collected
-// delay(1000L * duration)
- host.close()
-
- val cpuStats = host.getCpuStats()
-
- assertAll(
- { assertEquals(450000, cpuStats.activeTime, "Active time does not match") },
- { assertEquals(750000, cpuStats.idleTime, "Idle time does not match") },
- { assertEquals(4688, cpuStats.stealTime, "Steal time does not match") },
- { assertEquals(1200000, timeSource.millis()) },
- )
- }
-
- /**
- * Test overcommitting of resources by the hypervisor.
- */
- @Test
- fun testOvercommitted() =
- runSimulation {
- val duration = 5 * 60L
-
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val machine = SimBareMetalMachine.create(graph, machineModel)
- val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
-
- val host =
- SimHost(
- uid = UUID.randomUUID(),
- name = "test",
- meta = emptyMap(),
- timeSource,
- machine,
- hypervisor,
- )
- val vmImageA =
- MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to
- SimTrace.ofFragments(
- SimTraceFragment(0, duration * 1000, 0.0, 2),
- SimTraceFragment(duration * 1000, duration * 1000, 3200.0, 2),
- SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
- SimTraceFragment(duration * 3000, duration * 1000, 6500.0, 2),
- ).createWorkload(0),
- ),
- )
- val vmImageB =
- MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to
- SimTrace.ofFragments(
- SimTraceFragment(0, duration * 1000, 0.0, 2),
- SimTraceFragment(duration * 1000, duration * 1000, 3200.0, 2),
- SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
- SimTraceFragment(duration * 3000, duration * 1000, 6500.0, 2),
- ).createWorkload(0),
- ),
- )
-
- val flavor = MockFlavor(2, 0)
-
- coroutineScope {
- suspendCancellableCoroutine { cont ->
- host.addListener(
- object : HostListener {
- private var finished = 0
-
- override fun onStateChanged(
- host: Host,
- task: Task,
- newState: TaskState,
- ) {
- if (newState == TaskState.TERMINATED && ++finished == 2) {
- cont.resume(Unit)
- }
- }
- },
- )
- val serverA = MockTask(UUID.randomUUID(), "a", flavor, vmImageA)
- host.spawn(serverA)
- val serverB = MockTask(UUID.randomUUID(), "b", flavor, vmImageB)
- host.spawn(serverB)
-
- host.start(serverA)
- host.start(serverB)
- }
- }
-
- // Ensure last cycle is collected
- delay(1000L * duration)
- host.close()
-
- val cpuStats = host.getCpuStats()
-
- assertAll(
- { assertEquals(600000, cpuStats.activeTime, "Active time does not match") },
- { assertEquals(900000, cpuStats.idleTime, "Idle time does not match") },
- { assertEquals(309375, cpuStats.stealTime, "Steal time does not match") },
- { assertEquals(1500000, timeSource.millis()) },
- )
- }
-
- /**
- * Test failure of the host.
- */
- @Test
- fun testFailure() =
- runSimulation {
- val duration = 5 * 60L
-
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val machine = SimBareMetalMachine.create(graph, machineModel)
- val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
- val host =
- SimHost(
- uid = UUID.randomUUID(),
- name = "test",
- meta = emptyMap(),
- timeSource,
- machine,
- hypervisor,
- )
- val image =
- MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to
- SimTrace.ofFragments(
- SimTraceFragment(0, duration * 1000, 0.0, 2),
- SimTraceFragment(duration * 1000, duration * 1000, 3200.0, 2),
- SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
- SimTraceFragment(duration * 3000, duration * 1000, 6500.0, 2),
- ).createWorkload(0),
- ),
- )
- val flavor = MockFlavor(2, 0)
- val server = MockTask(UUID.randomUUID(), "a", flavor, image)
-
- coroutineScope {
- host.spawn(server)
- host.start(server)
- delay(5000L)
- host.fail()
- delay(duration * 1000)
- host.recover()
-
- suspendCancellableCoroutine { cont ->
- host.addListener(
- object : HostListener {
- override fun onStateChanged(
- host: Host,
- task: Task,
- newState: TaskState,
- ) {
- if (newState == TaskState.TERMINATED) {
- cont.resume(Unit)
- }
- }
- },
- )
- }
- }
-
- host.close()
- // Ensure last cycle is collected
- delay(1000L * duration)
-
- val cpuStats = host.getCpuStats()
- val sysStats = host.getSystemStats()
- val guestSysStats = host.getSystemStats(server)
-
- assertAll(
- { assertEquals(755000, cpuStats.idleTime, "Idle time does not match") },
- { assertEquals(450000, cpuStats.activeTime, "Active time does not match") },
- { assertEquals(1205000, sysStats.uptime.toMillis(), "Uptime does not match") },
- { assertEquals(300000, sysStats.downtime.toMillis(), "Downtime does not match") },
- { assertEquals(1205000, guestSysStats.uptime.toMillis(), "Guest uptime does not match") },
- { assertEquals(300000, guestSysStats.downtime.toMillis(), "Guest downtime does not match") },
- )
- }
-
- private class MockFlavor(
- override val coreCount: Int,
- override val memorySize: Long,
- ) : Flavor {
- override val uid: UUID = UUID.randomUUID()
- override val name: String = "test"
- override val labels: Map<String, String> = emptyMap()
- override val meta: Map<String, Any> = emptyMap()
-
- override fun delete() {
- throw NotImplementedError()
- }
-
- override fun reload() {
- throw NotImplementedError()
- }
- }
-
- private class MockImage(
- override val uid: UUID,
- override val name: String,
- override val labels: Map<String, String>,
- override val meta: Map<String, Any>,
- ) : Image {
- override fun delete() {
- throw NotImplementedError()
- }
-
- override fun reload() {
- throw NotImplementedError()
- }
- }
-
- private class MockTask(
- override val uid: UUID,
- override val name: String,
- override val flavor: Flavor,
- override val image: Image,
- override val numFailures: Int = 10,
- ) : Task {
- override val labels: Map<String, String> = emptyMap()
-
- override val meta: Map<String, Any> = emptyMap()
-
- override val state: TaskState = TaskState.TERMINATED
-
- override val launchedAt: Instant? = null
-
- override fun start() {}
-
- override fun stop() {}
-
- override fun delete() {}
-
- override fun watch(watcher: TaskWatcher) {}
-
- override fun unwatch(watcher: TaskWatcher) {}
-
- override fun reload() {}
- }
-}