From 3363df4c72a064e590ca98f8e01832cfa4e15a3f Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 27 Aug 2024 13:48:46 +0200 Subject: Renamed input files and internally server is changed to task (#246) * Updated SimTrace to use a single ArrayDeque instead of three separate lists for deadline, cpuUsage, and coreCount * Renamed input files to tasks.parquet and fragments.parquet. Renamed server to task. OpenDC nows exports tasks.parquet instead of server.parquet --- .../kotlin/org/opendc/compute/api/ComputeClient.kt | 28 +- .../main/kotlin/org/opendc/compute/api/Flavor.kt | 6 +- .../main/kotlin/org/opendc/compute/api/Image.kt | 2 +- .../api/InsufficientServerCapacityException.kt | 31 -- .../api/InsufficientTaskCapacityException.kt | 31 ++ .../main/kotlin/org/opendc/compute/api/Server.kt | 74 ---- .../kotlin/org/opendc/compute/api/ServerState.kt | 53 --- .../kotlin/org/opendc/compute/api/ServerWatcher.kt | 39 -- .../src/main/kotlin/org/opendc/compute/api/Task.kt | 74 ++++ .../kotlin/org/opendc/compute/api/TaskState.kt | 53 +++ .../kotlin/org/opendc/compute/api/TaskWatcher.kt | 39 ++ .../failure/hostfault/StartStopHostFault.kt | 10 +- .../org/opendc/compute/service/ComputeService.java | 178 ++++----- .../org/opendc/compute/service/ServiceServer.java | 255 ------------ .../org/opendc/compute/service/ServiceTask.java | 255 ++++++++++++ .../org/opendc/compute/service/driver/Host.java | 52 +-- .../compute/service/driver/HostListener.java | 8 +- .../service/driver/telemetry/HostSystemStats.java | 2 +- .../compute/service/telemetry/SchedulerStats.java | 12 +- .../compute/service/scheduler/ComputeScheduler.kt | 10 +- .../compute/service/scheduler/FilterScheduler.kt | 10 +- .../compute/service/scheduler/ReplayScheduler.kt | 10 +- .../service/scheduler/filters/ComputeFilter.kt | 4 +- .../scheduler/filters/DifferentHostFilter.kt | 6 +- .../service/scheduler/filters/HostFilter.kt | 6 +- .../scheduler/filters/InstanceCountFilter.kt | 4 +- .../compute/service/scheduler/filters/RamFilter.kt | 8 +- .../service/scheduler/filters/SameHostFilter.kt | 6 +- .../scheduler/filters/VCpuCapacityFilter.kt | 10 +- .../service/scheduler/filters/VCpuFilter.kt | 8 +- .../service/scheduler/weights/CoreRamWeigher.kt | 4 +- .../service/scheduler/weights/HostWeigher.kt | 12 +- .../scheduler/weights/InstanceCountWeigher.kt | 4 +- .../service/scheduler/weights/RamWeigher.kt | 4 +- .../scheduler/weights/VCpuCapacityWeigher.kt | 8 +- .../service/scheduler/weights/VCpuWeigher.kt | 4 +- .../opendc/compute/service/ComputeServiceTest.kt | 72 ++-- .../opendc/compute/service/ServiceServerTest.kt | 298 -------------- .../org/opendc/compute/service/ServiceTaskTest.kt | 442 +++++++++++++++++++++ .../service/scheduler/FilterSchedulerTest.kt | 176 ++++---- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 66 +-- .../compute/simulator/SimMetaWorkloadMapper.kt | 8 +- .../opendc/compute/simulator/SimWorkloadMapper.kt | 8 +- .../simulator/internal/DefaultWorkloadMapper.kt | 6 +- .../org/opendc/compute/simulator/internal/Guest.kt | 62 +-- .../org/opendc/compute/simulator/SimHostTest.kt | 42 +- .../compute/telemetry/ComputeMetricReader.kt | 92 ++--- .../org/opendc/compute/telemetry/ComputeMonitor.kt | 4 +- .../export/parquet/ComputeExportConfig.kt | 32 +- .../export/parquet/DfltServerExportColumns.kt | 153 ------- .../export/parquet/DfltServiceExportColumns.kt | 12 +- .../export/parquet/DfltTaskExportColumns.kt | 153 +++++++ .../export/parquet/ParquetComputeMonitor.kt | 22 +- .../compute/telemetry/export/parquet/README.md | 16 +- .../opendc/compute/telemetry/table/ServerInfo.kt | 37 -- .../compute/telemetry/table/ServerTableReader.kt | 109 ----- .../opendc/compute/telemetry/table/ServiceData.kt | 12 +- .../compute/telemetry/table/ServiceTableReader.kt | 12 +- .../org/opendc/compute/telemetry/table/TaskInfo.kt | 37 ++ .../compute/telemetry/table/TaskTableReader.kt | 109 +++++ .../opendc/compute/workload/ComputeWorkloads.kt | 2 +- 61 files changed, 1721 insertions(+), 1581 deletions(-) delete mode 100644 opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt create mode 100644 opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientTaskCapacityException.kt delete mode 100644 opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt delete mode 100644 opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt delete mode 100644 opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt create mode 100644 opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Task.kt create mode 100644 opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/TaskState.kt create mode 100644 opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/TaskWatcher.kt delete mode 100644 opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ServiceServer.java create mode 100644 opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ServiceTask.java delete mode 100644 opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceServerTest.kt create mode 100644 opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceTaskTest.kt delete mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt delete mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt delete mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskInfo.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt index 7863f20b..9e24a3fd 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -84,38 +84,38 @@ public interface ComputeClient : AutoCloseable { ): Image /** - * Obtain the list of [Server]s accessible by the requesting user. + * Obtain the list of [Task]s accessible by the requesting user. */ - public fun queryServers(): List + public fun queryTasks(): List /** - * Obtain a [Server] by its unique identifier. + * Obtain a [Task] by its unique identifier. * - * @param id The identifier of the server. + * @param id The identifier of the task. */ - public fun findServer(id: UUID): Server? + public fun findTask(id: UUID): Task? /** - * Create a new [Server] instance at this compute service. + * Create a new [Task] instance at this compute service. * - * @param name The name of the server to deploy. + * @param name The name of the task to deploy. * @param image The image to be deployed. * @param flavor The flavor of the machine instance to run this [image] on. - * @param labels The identifying labels of the server. - * @param meta The non-identifying meta-data of the server. - * @param start A flag to indicate that the server should be started immediately. + * @param labels The identifying labels of the task. + * @param meta The non-identifying meta-data of the task. + * @param start A flag to indicate that the task should be started immediately. */ - public fun newServer( + public fun newTask( name: String, image: Image, flavor: Flavor, labels: Map = emptyMap(), meta: Map = emptyMap(), start: Boolean = true, - ): Server + ): Task - public fun rescheduleServer( - server: Server, + public fun rescheduleTask( + task: Task, workload: SimWorkload, ) diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt index 201a9aed..99042c24 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt @@ -23,8 +23,8 @@ package org.opendc.compute.api /** - * Flavors define the compute and memory capacity of [Server] instance. To put it simply, a flavor is an available - * hardware configuration for a server. It defines the size of a virtual server that can be launched. + * Flavors define the compute and memory capacity of [Task] instance. To put it simply, a flavor is an available + * hardware configuration for a task. It defines the size of a virtual task that can be launched. */ public interface Flavor : Resource { /** @@ -33,7 +33,7 @@ public interface Flavor : Resource { public val coreCount: Int /** - * The amount of RAM available to the server (in MB). + * The amount of RAM available to the task (in MB). */ public val memorySize: Long } diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt index c4a04b96..ce7f7f40 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt @@ -23,6 +23,6 @@ package org.opendc.compute.api /** - * An image containing a bootable operating system that can directly be executed by physical or virtual server. + * An image containing a bootable operating system that can directly be executed by physical or virtual task. */ public interface Image : Resource diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt deleted file mode 100644 index 497d5266..00000000 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.api - -/** - * This exception is thrown to indicate that the compute service does not have enough capacity at the moment to - * fulfill a launch request. - */ -public class InsufficientServerCapacityException( - override val cause: Throwable? = null, -) : Exception("There was insufficient capacity available to satisfy the launch request") diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientTaskCapacityException.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientTaskCapacityException.kt new file mode 100644 index 00000000..ef6e4761 --- /dev/null +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientTaskCapacityException.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.api + +/** + * This exception is thrown to indicate that the compute service does not have enough capacity at the moment to + * fulfill a launch request. + */ +public class InsufficientTaskCapacityException( + override val cause: Throwable? = null, +) : Exception("There was insufficient capacity available to satisfy the launch request") diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt deleted file mode 100644 index b4cc5129..00000000 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.api - -import java.time.Instant - -/** - * A stateful object representing a server instance that is running on some physical or virtual machine. - */ -public interface Server : Resource { - /** - * The flavor of the server. - */ - public val flavor: Flavor - - /** - * The image of the server. - */ - public val image: Image - - /** - * The last known state of the server. - */ - public val state: ServerState - - /** - * The most recent moment in time when the server was launched. - */ - public val launchedAt: Instant? - - /** - * Request the server to be started. - */ - public fun start() - - /** - * Request the server to be stopped. - */ - public fun stop() - - /** - * Register the specified [ServerWatcher] to watch the state of the server. - * - * @param watcher The watcher to register for the server. - */ - public fun watch(watcher: ServerWatcher) - - /** - * De-register the specified [ServerWatcher] from the server to stop it from receiving events. - * - * @param watcher The watcher to de-register from the server. - */ - public fun unwatch(watcher: ServerWatcher) -} diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt deleted file mode 100644 index a4d7d7d7..00000000 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.api - -/** - * An enumeration describing the possible states of a server. - */ -public enum class ServerState { - /** - * Resources are being allocated for the instance. The instance is not running yet. - */ - PROVISIONING, - - /** - * A user shut down the instance. - */ - TERMINATED, - - /** - * The server instance is booting up or running. - */ - RUNNING, - - /** - * The server is in an error state. - */ - ERROR, - - /** - * The server has been deleted and cannot be started later on. - */ - DELETED, -} diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt deleted file mode 100644 index 3229e101..00000000 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.api - -/** - * An interface used to watch the state of [Server] instances. - */ -public interface ServerWatcher { - /** - * This method is invoked when the state of a [Server] changes. - * - * @param server The server whose state has changed. - * @param newState The new state of the server. - */ - public fun onStateChanged( - server: Server, - newState: ServerState, - ) {} -} diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Task.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Task.kt new file mode 100644 index 00000000..c9b0aeb3 --- /dev/null +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Task.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.api + +import java.time.Instant + +/** + * A stateful object representing a task instance that is running on some physical or virtual machine. + */ +public interface Task : Resource { + /** + * The flavor of the task. + */ + public val flavor: Flavor + + /** + * The image of the task. + */ + public val image: Image + + /** + * The last known state of the task. + */ + public val state: TaskState + + /** + * The most recent moment in time when the task was launched. + */ + public val launchedAt: Instant? + + /** + * Request the task to be started. + */ + public fun start() + + /** + * Request the task to be stopped. + */ + public fun stop() + + /** + * Register the specified [TaskWatcher] to watch the state of the task. + * + * @param watcher The watcher to register for the task. + */ + public fun watch(watcher: TaskWatcher) + + /** + * De-register the specified [TaskWatcher] from the task to stop it from receiving events. + * + * @param watcher The watcher to de-register from the task. + */ + public fun unwatch(watcher: TaskWatcher) +} diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/TaskState.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/TaskState.kt new file mode 100644 index 00000000..a093ff47 --- /dev/null +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/TaskState.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.api + +/** + * An enumeration describing the possible states of a task. + */ +public enum class TaskState { + /** + * Resources are being allocated for the instance. The instance is not running yet. + */ + PROVISIONING, + + /** + * A user shut down the instance. + */ + TERMINATED, + + /** + * The task instance is booting up or running. + */ + RUNNING, + + /** + * The task is in an error state. + */ + ERROR, + + /** + * The task has been deleted and cannot be started later on. + */ + DELETED, +} diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/TaskWatcher.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/TaskWatcher.kt new file mode 100644 index 00000000..423d7dec --- /dev/null +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/TaskWatcher.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.api + +/** + * An interface used to watch the state of [Task] instances. + */ +public interface TaskWatcher { + /** + * This method is invoked when the state of a [Task] changes. + * + * @param task The task whose state has changed. + * @param newState The new state of the task. + */ + public fun onStateChanged( + task: Task, + newState: TaskState, + ) {} +} diff --git a/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/hostfault/StartStopHostFault.kt b/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/hostfault/StartStopHostFault.kt index 7b86df07..f4273d2c 100644 --- a/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/hostfault/StartStopHostFault.kt +++ b/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/hostfault/StartStopHostFault.kt @@ -41,14 +41,14 @@ public class StartStopHostFault( val client: ComputeClient = service.newClient() for (host in victims) { - val servers = host.instances + val tasks = host.instances - val sortedServers = servers.sortedBy { it.name } - val snapshots = sortedServers.map { (it.meta["workload"] as SimWorkload).snapshot() } + val sortedTasks = tasks.sortedBy { it.name } + val snapshots = sortedTasks.map { (it.meta["workload"] as SimWorkload).snapshot() } host.fail() - for ((server, snapshot) in servers.zip(snapshots)) { - client.rescheduleServer(server, snapshot) + for ((task, snapshot) in sortedTasks.zip(snapshots)) { + client.rescheduleTask(task, snapshot) } } diff --git a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ComputeService.java b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ComputeService.java index a7e9f509..a64f6a4e 100644 --- a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ComputeService.java +++ b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ComputeService.java @@ -44,8 +44,8 @@ import org.opendc.common.util.Pacer; import org.opendc.compute.api.ComputeClient; import org.opendc.compute.api.Flavor; import org.opendc.compute.api.Image; -import org.opendc.compute.api.Server; -import org.opendc.compute.api.ServerState; +import org.opendc.compute.api.Task; +import org.opendc.compute.api.TaskState; import org.opendc.compute.service.driver.Host; import org.opendc.compute.service.driver.HostListener; import org.opendc.compute.service.driver.HostModel; @@ -68,7 +68,7 @@ public final class ComputeService implements AutoCloseable { private final InstantSource clock; /** - * The {@link ComputeScheduler} responsible for placing the servers onto hosts. + * The {@link ComputeScheduler} responsible for placing the tasks onto hosts. */ private final ComputeScheduler scheduler; @@ -98,14 +98,14 @@ public final class ComputeService implements AutoCloseable { private final Set availableHosts = new HashSet<>(); /** - * The servers that should be launched by the service. + * The tasks that should be launched by the service. */ - private final Deque queue = new ArrayDeque<>(); + private final Deque taskQueue = new ArrayDeque<>(); /** - * The active servers in the system. + * The active tasks in the system. */ - private final Map activeServers = new HashMap<>(); + private final Map activeTasks = new HashMap<>(); /** * The registered flavors for this compute service. @@ -122,14 +122,14 @@ public final class ComputeService implements AutoCloseable { private final List images = new ArrayList<>(); /** - * The registered servers for this compute service. + * The registered tasks for this compute service. */ - private final Map serverById = new HashMap<>(); + private final Map taskById = new HashMap<>(); - private final List servers = new ArrayList<>(); + private final List tasks = new ArrayList<>(); /** - * A [HostListener] used to track the active servers. + * A [HostListener] used to track the active tasks. */ private final HostListener hostListener = new HostListener() { @Override @@ -151,28 +151,26 @@ public final class ComputeService implements AutoCloseable { } @Override - public void onStateChanged(@NotNull Host host, @NotNull Server server, @NotNull ServerState newState) { - final ServiceServer serviceServer = (ServiceServer) server; + public void onStateChanged(@NotNull Host host, @NotNull Task task, @NotNull TaskState newState) { + final ServiceTask serviceTask = (ServiceTask) task; - if (serviceServer.getHost() != host) { - // This can happen when a server is rescheduled and started on another machine, while being deleted from + if (serviceTask.getHost() != host) { + // This can happen when a task is rescheduled and started on another machine, while being deleted from // the old machine. return; } - serviceServer.setState(newState); + serviceTask.setState(newState); - if (newState == ServerState.TERMINATED - || newState == ServerState.DELETED - || newState == ServerState.ERROR) { - LOGGER.info("Server {} {} {} finished", server.getUid(), server.getName(), server.getFlavor()); + if (newState == TaskState.TERMINATED || newState == TaskState.DELETED || newState == TaskState.ERROR) { + LOGGER.info("task {} {} {} finished", task.getUid(), task.getName(), task.getFlavor()); - if (activeServers.remove(server) != null) { - serversActive--; + if (activeTasks.remove(task) != null) { + tasksActive--; } HostView hv = hostToView.get(host); - final ServiceFlavor flavor = serviceServer.getFlavor(); + final ServiceFlavor flavor = serviceTask.getFlavor(); if (hv != null) { hv.provisionedCores -= flavor.getCoreCount(); hv.instanceCount--; @@ -192,8 +190,8 @@ public final class ComputeService implements AutoCloseable { private long attemptsSuccess = 0L; private long attemptsFailure = 0L; private long attemptsError = 0L; - private int serversPending = 0; - private int serversActive = 0; + private int tasksPending = 0; + private int tasksActive = 0; /** * Construct a {@link ComputeService} instance. @@ -222,10 +220,10 @@ public final class ComputeService implements AutoCloseable { } /** - * Return the {@link Server}s hosted by this service. + * Return the {@link Task}s hosted by this service. */ - public List getServers() { - return Collections.unmodifiableList(servers); + public List getTasks() { + return Collections.unmodifiableList(tasks); } /** @@ -265,15 +263,14 @@ public final class ComputeService implements AutoCloseable { } /** - * Lookup the {@link Host} that currently hosts the specified {@link Server}. + * Lookup the {@link Host} that currently hosts the specified {@link Task}. */ - public Host lookupHost(Server server) { - if (server instanceof ServiceServer) { - return ((ServiceServer) server).getHost(); + public Host lookupHost(Task task) { + if (task instanceof ServiceTask) { + return ((ServiceTask) task).getHost(); } - ServiceServer internal = - Objects.requireNonNull(serverById.get(server.getUid()), "Invalid server passed to lookupHost"); + ServiceTask internal = Objects.requireNonNull(taskById.get(task.getUid()), "Invalid task passed to lookupHost"); return internal.getHost(); } @@ -294,9 +291,9 @@ public final class ComputeService implements AutoCloseable { attemptsSuccess, attemptsFailure, attemptsError, - servers.size(), - serversPending, - serversActive); + tasks.size(), + tasksPending, + tasksActive); } @Override @@ -310,17 +307,17 @@ public final class ComputeService implements AutoCloseable { } /** - * Enqueue the specified [server] to be scheduled onto a host. + * Enqueue the specified [task] to be scheduled onto a host. */ - SchedulingRequest schedule(ServiceServer server) { - LOGGER.debug("Enqueueing server {} to be assigned to host", server.getUid()); + SchedulingRequest schedule(ServiceTask task) { + LOGGER.debug("Enqueueing task {} to be assigned to host", task.getUid()); long now = clock.millis(); - SchedulingRequest request = new SchedulingRequest(server, now); + SchedulingRequest request = new SchedulingRequest(task, now); - server.launchedAt = Instant.ofEpochMilli(now); - queue.add(request); - serversPending++; + task.launchedAt = Instant.ofEpochMilli(now); + taskQueue.add(request); + tasksPending++; requestSchedulingCycle(); return request; } @@ -335,9 +332,9 @@ public final class ComputeService implements AutoCloseable { images.remove(image); } - void delete(ServiceServer server) { - serverById.remove(server.getUid()); - servers.remove(server); + void delete(ServiceTask task) { + taskById.remove(task.getUid()); + tasks.remove(task); } /** @@ -345,7 +342,7 @@ public final class ComputeService implements AutoCloseable { */ private void requestSchedulingCycle() { // Bail out in case the queue is empty. - if (queue.isEmpty()) { + if (taskQueue.isEmpty()) { return; } @@ -358,35 +355,34 @@ public final class ComputeService implements AutoCloseable { private void doSchedule() { // reorder tasks - while (!queue.isEmpty()) { - SchedulingRequest request = queue.peek(); + while (!taskQueue.isEmpty()) { + SchedulingRequest request = taskQueue.peek(); if (request.isCancelled) { - queue.poll(); - serversPending--; + taskQueue.poll(); + tasksPending--; continue; } - final ServiceServer server = request.server; + final ServiceTask task = request.task; // Check if all dependencies are met // otherwise continue - final ServiceFlavor flavor = server.getFlavor(); - final HostView hv = scheduler.select(request.server); + final ServiceFlavor flavor = task.getFlavor(); + final HostView hv = scheduler.select(request.task); - if (hv == null || !hv.getHost().canFit(server)) { - LOGGER.trace( - "Server {} selected for scheduling but no capacity available for it at the moment", server); + if (hv == null || !hv.getHost().canFit(task)) { + LOGGER.trace("Task {} selected for scheduling but no capacity available for it at the moment", task); if (flavor.getMemorySize() > maxMemory || flavor.getCoreCount() > maxCores) { // Remove the incoming image - queue.poll(); - serversPending--; + taskQueue.poll(); + tasksPending--; attemptsFailure++; - LOGGER.warn("Failed to spawn {}: does not fit", server); + LOGGER.warn("Failed to spawn {}: does not fit", task); - server.setState(ServerState.TERMINATED); + task.setState(TaskState.TERMINATED); continue; } else { break; @@ -396,25 +392,25 @@ public final class ComputeService implements AutoCloseable { Host host = hv.getHost(); // Remove request from queue - queue.poll(); - serversPending--; + taskQueue.poll(); + tasksPending--; - LOGGER.info("Assigned server {} to host {}", server, host); + LOGGER.info("Assigned task {} to host {}", task, host); try { - server.host = host; + task.host = host; - host.spawn(server); - host.start(server); + host.spawn(task); + host.start(task); - serversActive++; + tasksActive++; attemptsSuccess++; hv.instanceCount++; hv.provisionedCores += flavor.getCoreCount(); hv.availableMemory -= flavor.getMemorySize(); - activeServers.put(server, host); + activeTasks.put(task, host); } catch (Exception cause) { LOGGER.error("Failed to deploy VM", cause); attemptsError++; @@ -537,7 +533,7 @@ public final class ComputeService implements AutoCloseable { @NotNull @Override - public Server newServer( + public Task newTask( @NotNull String name, @NotNull Image image, @NotNull Flavor flavor, @@ -554,31 +550,31 @@ public final class ComputeService implements AutoCloseable { final ServiceImage internalImage = Objects.requireNonNull(service.imageById.get(image.getUid()), "Unknown image"); - ServiceServer server = new ServiceServer(service, uid, name, internalFlavor, internalImage, labels, meta); + ServiceTask task = new ServiceTask(service, uid, name, internalFlavor, internalImage, labels, meta); - service.serverById.put(uid, server); - service.servers.add(server); + service.taskById.put(uid, task); + service.tasks.add(task); if (start) { - server.start(); + task.start(); } - return server; + return task; } @Nullable @Override - public Server findServer(@NotNull UUID id) { + public Task findTask(@NotNull UUID id) { checkOpen(); - return service.serverById.get(id); + return service.taskById.get(id); } @NotNull @Override - public List queryServers() { + public List queryTasks() { checkOpen(); - return new ArrayList<>(service.servers); + return new ArrayList<>(service.tasks); } @Override @@ -593,30 +589,30 @@ public final class ComputeService implements AutoCloseable { @Nullable @Override - public void rescheduleServer(@NotNull Server server, @NotNull SimWorkload workload) { - ServiceServer internalServer = (ServiceServer) findServer(server.getUid()); - Host from = service.lookupHost(internalServer); + public void rescheduleTask(@NotNull Task task, @NotNull SimWorkload workload) { + ServiceTask internalTask = (ServiceTask) findTask(task.getUid()); + Host from = service.lookupHost(internalTask); - from.delete(internalServer); + from.delete(internalTask); - internalServer.host = null; + internalTask.host = null; - internalServer.setWorkload(workload); - internalServer.start(); + internalTask.setWorkload(workload); + internalTask.start(); } } /** - * A request to schedule a {@link ServiceServer} onto one of the {@link Host}s. + * A request to schedule a {@link ServiceTask} onto one of the {@link Host}s. */ static class SchedulingRequest { - final ServiceServer server; + final ServiceTask task; final long submitTime; boolean isCancelled; - SchedulingRequest(ServiceServer server, long submitTime) { - this.server = server; + SchedulingRequest(ServiceTask task, long submitTime) { + this.task = task; this.submitTime = submitTime; } } diff --git a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ServiceServer.java b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ServiceServer.java deleted file mode 100644 index e363faf2..00000000 --- a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ServiceServer.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.opendc.compute.api.Server; -import org.opendc.compute.api.ServerState; -import org.opendc.compute.api.ServerWatcher; -import org.opendc.compute.service.driver.Host; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation of {@link Server} provided by {@link ComputeService}. - */ -public final class ServiceServer implements Server { - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceServer.class); - - private final ComputeService service; - private final UUID uid; - - private final String name; - private final ServiceFlavor flavor; - private final ServiceImage image; - private final Map labels; - private Map meta; - - private final List watchers = new ArrayList<>(); - private ServerState state = ServerState.TERMINATED; - Instant launchedAt = null; - Host host = null; - private ComputeService.SchedulingRequest request = null; - - ServiceServer( - ComputeService service, - UUID uid, - String name, - ServiceFlavor flavor, - ServiceImage image, - Map labels, - Map meta) { - this.service = service; - this.uid = uid; - this.name = name; - this.flavor = flavor; - this.image = image; - this.labels = labels; - this.meta = meta; - } - - @NotNull - @Override - public UUID getUid() { - return uid; - } - - @NotNull - @Override - public String getName() { - return name; - } - - @NotNull - @Override - public ServiceFlavor getFlavor() { - return flavor; - } - - @NotNull - @Override - public ServiceImage getImage() { - return image; - } - - @NotNull - @Override - public Map getLabels() { - return Collections.unmodifiableMap(labels); - } - - @NotNull - @Override - public Map getMeta() { - return Collections.unmodifiableMap(meta); - } - - public void setWorkload(Object _workload) { - Map new_meta = new HashMap(); - new_meta.put("workload", _workload); - - meta = new_meta; - } - - @NotNull - @Override - public ServerState getState() { - return state; - } - - @Nullable - @Override - public Instant getLaunchedAt() { - return launchedAt; - } - - /** - * Return the {@link Host} on which the server is running or null if it is not running on a host. - */ - public Host getHost() { - return host; - } - - @Override - public void start() { - switch (state) { - case PROVISIONING: - LOGGER.debug("User tried to start server but request is already pending: doing nothing"); - case RUNNING: - LOGGER.debug("User tried to start server but server is already running"); - break; - case DELETED: - LOGGER.warn("User tried to start deleted server"); - throw new IllegalStateException("Server is deleted"); - default: - LOGGER.info("User requested to start server {}", uid); - setState(ServerState.PROVISIONING); - assert request == null : "Scheduling request already active"; - request = service.schedule(this); - break; - } - } - - @Override - public void stop() { - switch (state) { - case PROVISIONING: - cancelProvisioningRequest(); - setState(ServerState.TERMINATED); - break; - case RUNNING: - case ERROR: - final Host host = this.host; - if (host == null) { - throw new IllegalStateException("Server not running"); - } - host.stop(this); - break; - } - } - - @Override - public void watch(@NotNull ServerWatcher watcher) { - watchers.add(watcher); - } - - @Override - public void unwatch(@NotNull ServerWatcher watcher) { - watchers.remove(watcher); - } - - @Override - public void reload() { - // No-op: this object is the source-of-truth - } - - @Override - public void delete() { - switch (state) { - case PROVISIONING: - case TERMINATED: - cancelProvisioningRequest(); - service.delete(this); - setState(ServerState.DELETED); - break; - case RUNNING: - case ERROR: - final Host host = this.host; - if (host == null) { - throw new IllegalStateException("Server not running"); - } - host.delete(this); - service.delete(this); - setState(ServerState.DELETED); - break; - } - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ServiceServer server = (ServiceServer) o; - return service.equals(server.service) && uid.equals(server.uid); - } - - @Override - public int hashCode() { - return Objects.hash(service, uid); - } - - @Override - public String toString() { - return "Server[uid=" + uid + ",name=" + name + ",state=" + state + "]"; - } - - void setState(ServerState state) { - if (this.state != state) { - for (ServerWatcher watcher : watchers) { - watcher.onStateChanged(this, state); - } - } - - this.state = state; - } - - /** - * Cancel the provisioning request if active. - */ - private void cancelProvisioningRequest() { - final ComputeService.SchedulingRequest request = this.request; - if (request != null) { - this.request = null; - request.isCancelled = true; - } - } -} diff --git a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ServiceTask.java b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ServiceTask.java new file mode 100644 index 00000000..e981921a --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/ServiceTask.java @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.opendc.compute.api.Task; +import org.opendc.compute.api.TaskState; +import org.opendc.compute.api.TaskWatcher; +import org.opendc.compute.service.driver.Host; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link Task} provided by {@link ComputeService}. + */ +public final class ServiceTask implements Task { + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTask.class); + + private final ComputeService service; + private final UUID uid; + + private final String name; + private final ServiceFlavor flavor; + private final ServiceImage image; + private final Map labels; + private Map meta; + + private final List watchers = new ArrayList<>(); + private TaskState state = TaskState.TERMINATED; + Instant launchedAt = null; + Host host = null; + private ComputeService.SchedulingRequest request = null; + + ServiceTask( + ComputeService service, + UUID uid, + String name, + ServiceFlavor flavor, + ServiceImage image, + Map labels, + Map meta) { + this.service = service; + this.uid = uid; + this.name = name; + this.flavor = flavor; + this.image = image; + this.labels = labels; + this.meta = meta; + } + + @NotNull + @Override + public UUID getUid() { + return uid; + } + + @NotNull + @Override + public String getName() { + return name; + } + + @NotNull + @Override + public ServiceFlavor getFlavor() { + return flavor; + } + + @NotNull + @Override + public ServiceImage getImage() { + return image; + } + + @NotNull + @Override + public Map getLabels() { + return Collections.unmodifiableMap(labels); + } + + @NotNull + @Override + public Map getMeta() { + return Collections.unmodifiableMap(meta); + } + + public void setWorkload(Object _workload) { + Map new_meta = new HashMap(); + new_meta.put("workload", _workload); + + meta = new_meta; + } + + @NotNull + @Override + public TaskState getState() { + return state; + } + + @Nullable + @Override + public Instant getLaunchedAt() { + return launchedAt; + } + + /** + * Return the {@link Host} on which the task is running or null if it is not running on a host. + */ + public Host getHost() { + return host; + } + + @Override + public void start() { + switch (state) { + case PROVISIONING: + LOGGER.debug("User tried to start task but request is already pending: doing nothing"); + case RUNNING: + LOGGER.debug("User tried to start task but task is already running"); + break; + case DELETED: + LOGGER.warn("User tried to start deleted task"); + throw new IllegalStateException("Task is deleted"); + default: + LOGGER.info("User requested to start task {}", uid); + setState(TaskState.PROVISIONING); + assert request == null : "Scheduling request already active"; + request = service.schedule(this); + break; + } + } + + @Override + public void stop() { + switch (state) { + case PROVISIONING: + cancelProvisioningRequest(); + setState(TaskState.TERMINATED); + break; + case RUNNING: + case ERROR: + final Host host = this.host; + if (host == null) { + throw new IllegalStateException("Task not running"); + } + host.stop(this); + break; + } + } + + @Override + public void watch(@NotNull TaskWatcher watcher) { + watchers.add(watcher); + } + + @Override + public void unwatch(@NotNull TaskWatcher watcher) { + watchers.remove(watcher); + } + + @Override + public void reload() { + // No-op: this object is the source-of-truth + } + + @Override + public void delete() { + switch (state) { + case PROVISIONING: + case TERMINATED: + cancelProvisioningRequest(); + service.delete(this); + setState(TaskState.DELETED); + break; + case RUNNING: + case ERROR: + final Host host = this.host; + if (host == null) { + throw new IllegalStateException("Task not running"); + } + host.delete(this); + service.delete(this); + setState(TaskState.DELETED); + break; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ServiceTask task = (ServiceTask) o; + return service.equals(task.service) && uid.equals(task.uid); + } + + @Override + public int hashCode() { + return Objects.hash(service, uid); + } + + @Override + public String toString() { + return "Task[uid=" + uid + ",name=" + name + ",state=" + state + "]"; + } + + void setState(TaskState state) { + if (this.state != state) { + for (TaskWatcher watcher : watchers) { + watcher.onStateChanged(this, state); + } + } + + this.state = state; + } + + /** + * Cancel the provisioning request if active. + */ + private void cancelProvisioningRequest() { + final ComputeService.SchedulingRequest request = this.request; + if (request != null) { + this.request = null; + request.isCancelled = true; + } + } +} diff --git a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/Host.java b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/Host.java index 760d7f1a..546f774b 100644 --- a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/Host.java +++ b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/Host.java @@ -25,14 +25,14 @@ package org.opendc.compute.service.driver; import java.util.Map; import java.util.Set; import java.util.UUID; -import org.opendc.compute.api.Server; +import org.opendc.compute.api.Task; import org.opendc.compute.service.driver.telemetry.GuestCpuStats; import org.opendc.compute.service.driver.telemetry.GuestSystemStats; import org.opendc.compute.service.driver.telemetry.HostCpuStats; import org.opendc.compute.service.driver.telemetry.HostSystemStats; /** - * Base interface for representing compute resources that host virtualized {@link Server} instances. + * Base interface for representing compute resources that host virtualized {@link Task} instances. */ public interface Host { /** @@ -61,43 +61,43 @@ public interface Host { Map getMeta(); /** - * Return the {@link Server} instances known to the host. + * Return the {@link Task} instances known to the host. */ - Set getInstances(); + Set getInstances(); /** - * Determine whether the specified server can still fit on this host. + * Determine whether the specified task can still fit on this host. */ - boolean canFit(Server server); + boolean canFit(Task task); /** - * Register the specified server on the host. + * Register the specified task on the host. */ - void spawn(Server server); + void spawn(Task task); /** - * Determine whether the specified server exists on the host. + * Determine whether the specified task exists on the host. */ - boolean contains(Server server); + boolean contains(Task task); /** - * Start the server if it is currently not running on this host. + * Start the task if it is currently not running on this host. * - * @throws IllegalArgumentException if the server is not present on the host. + * @throws IllegalArgumentException if the task is not present on the host. */ - void start(Server server); + void start(Task task); /** - * Stop the server if it is currently running on this host. + * Stop the task if it is currently running on this host. * - * @throws IllegalArgumentException if the server is not present on the host. + * @throws IllegalArgumentException if the task is not present on the host. */ - void stop(Server server); + void stop(Task task); /** - * Delete the specified server on this host and cleanup all resources associated with it. + * Delete the specified task on this host and cleanup all resources associated with it. */ - void delete(Server server); + void delete(Task task); /** * Add a [HostListener] to this host. @@ -115,12 +115,12 @@ public interface Host { HostSystemStats getSystemStats(); /** - * Query the system statistics of a {@link Server} that is located on this host. + * Query the system statistics of a {@link Task} that is located on this host. * - * @param server The {@link Server} to obtain the system statistics of. - * @throws IllegalArgumentException if the server is not present on the host. + * @param task The {@link Task} to obtain the system statistics of. + * @throws IllegalArgumentException if the task is not present on the host. */ - GuestSystemStats getSystemStats(Server server); + GuestSystemStats getSystemStats(Task task); /** * Query the CPU statistics of the host. @@ -128,10 +128,10 @@ public interface Host { HostCpuStats getCpuStats(); /** - * Query the CPU statistics of a {@link Server} that is located on this host. + * Query the CPU statistics of a {@link Task} that is located on this host. * - * @param server The {@link Server} to obtain the CPU statistics of. - * @throws IllegalArgumentException if the server is not present on the host. + * @param task The {@link Task} to obtain the CPU statistics of. + * @throws IllegalArgumentException if the task is not present on the host. */ - GuestCpuStats getCpuStats(Server server); + GuestCpuStats getCpuStats(Task task); } diff --git a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/HostListener.java b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/HostListener.java index feefca40..079c6cff 100644 --- a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/HostListener.java +++ b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/HostListener.java @@ -22,17 +22,17 @@ package org.opendc.compute.service.driver; -import org.opendc.compute.api.Server; -import org.opendc.compute.api.ServerState; +import org.opendc.compute.api.Task; +import org.opendc.compute.api.TaskState; /** * Listener interface for events originating from a {@link Host}. */ public interface HostListener { /** - * This method is invoked when the state of server on host changes. + * This method is invoked when the state of task on host changes. */ - default void onStateChanged(Host host, Server server, ServerState newState) {} + default void onStateChanged(Host host, Task task, TaskState newState) {} /** * This method is invoked when the state of a {@link Host} has changed. diff --git a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/telemetry/HostSystemStats.java b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/telemetry/HostSystemStats.java index d9dba274..c0713f3c 100644 --- a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/telemetry/HostSystemStats.java +++ b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/driver/telemetry/HostSystemStats.java @@ -30,7 +30,7 @@ import java.time.Instant; * * @param uptime The cumulative uptime of the host since last boot (in ms). * @param downtime The cumulative downtime of the host since last boot (in ms). - * @param bootTime The time at which the server started. + * @param bootTime The time at which the task started. * @param powerDraw Instantaneous power draw of the system (in W). * @param energyUsage The cumulative energy usage of the system (in J). * @param guestsTerminated The number of guests that are in a terminated state. diff --git a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/telemetry/SchedulerStats.java b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/telemetry/SchedulerStats.java index 2157169b..fc044d8c 100644 --- a/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/telemetry/SchedulerStats.java +++ b/opendc-compute/opendc-compute-service/src/main/java/org/opendc/compute/service/telemetry/SchedulerStats.java @@ -30,9 +30,9 @@ package org.opendc.compute.service.telemetry; * @param attemptsSuccess Scheduling attempts that resulted into an allocation onto a host. * @param attemptsFailure The number of failed scheduling attempt due to insufficient capacity at the moment. * @param attemptsError The number of scheduling attempts that failed due to system error. - * @param serversTotal The number of servers registered with the service. - * @param serversPending The number of servers that are pending to be scheduled. - * @param serversActive The number of servers that are currently managed by the service and running. + * @param tasksTotal The number of tasks registered with the service. + * @param tasksPending The number of tasks that are pending to be scheduled. + * @param tasksActive The number of tasks that are currently managed by the service and running. */ public record SchedulerStats( int hostsAvailable, @@ -40,6 +40,6 @@ public record SchedulerStats( long attemptsSuccess, long attemptsFailure, long attemptsError, - int serversTotal, - int serversPending, - int serversActive) {} + int tasksTotal, + int tasksPending, + int tasksActive) {} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeScheduler.kt index 0ccaf991..42de9ebc 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeScheduler.kt @@ -22,12 +22,12 @@ package org.opendc.compute.service.scheduler -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.ComputeService import org.opendc.compute.service.HostView /** - * A generic scheduler interface used by the [ComputeService] to select hosts to place [Server]s on. + * A generic scheduler interface used by the [ComputeService] to select hosts to place [Task]s on. */ public interface ComputeScheduler { /** @@ -41,10 +41,10 @@ public interface ComputeScheduler { public fun removeHost(host: HostView) /** - * Select a host for the specified [server]. + * Select a host for the specified [task]. * - * @param server The server to select a host for. + * @param task The server to select a host for. * @return The host to schedule the server on or `null` if no server is available. */ - public fun select(server: Server): HostView? + public fun select(task: Task): HostView? } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt index 41118386..772a470d 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView import org.opendc.compute.service.scheduler.filters.HostFilter import org.opendc.compute.service.scheduler.weights.HostWeigher @@ -32,7 +32,7 @@ import kotlin.math.min /** * A [ComputeScheduler] implementation that uses filtering and weighing passes to select - * the host to schedule a [Server] on. + * the host to schedule a [Task] on. * * This implementation is based on the filter scheduler from OpenStack Nova. * See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html @@ -65,13 +65,13 @@ public class FilterScheduler( hosts.remove(host) } - override fun select(server: Server): HostView? { + override fun select(task: Task): HostView? { val hosts = hosts - val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, server) } } + val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } } val subset = if (weighers.isNotEmpty()) { - val results = weighers.map { it.getWeights(filteredHosts, server) } + val results = weighers.map { it.getWeights(filteredHosts, task) } val weights = DoubleArray(filteredHosts.size) for (result in results) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt index a6703c89..d1690ddf 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt @@ -23,7 +23,7 @@ package org.opendc.compute.service.scheduler import mu.KotlinLogging -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** @@ -48,14 +48,14 @@ public class ReplayScheduler(private val vmPlacements: Map) : Co hosts.remove(host) } - override fun select(server: Server): HostView? { + override fun select(task: Task): HostView? { val clusterName = - vmPlacements[server.name] - ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}") + vmPlacements[task.name] + ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${task.name}") val machinesInCluster = hosts.filter { it.host.name.contains(clusterName) } if (machinesInCluster.isEmpty()) { - logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." } + logger.info { "Could not find any machines belonging to cluster $clusterName for image ${task.name}, assigning randomly." } return hosts.maxByOrNull { it.availableMemory } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt index dd707f60..2ad626f3 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.filters -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView import org.opendc.compute.service.driver.HostState @@ -32,7 +32,7 @@ import org.opendc.compute.service.driver.HostState public class ComputeFilter : HostFilter { override fun test( host: HostView, - server: Server, + task: Task, ): Boolean { val result = host.host.state == HostState.UP return result diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/DifferentHostFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/DifferentHostFilter.kt index df67a19f..ffafeaa9 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/DifferentHostFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/DifferentHostFilter.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.filters -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView import java.util.UUID @@ -32,10 +32,10 @@ import java.util.UUID public class DifferentHostFilter : HostFilter { override fun test( host: HostView, - server: Server, + task: Task, ): Boolean { @Suppress("UNCHECKED_CAST") - val affinityUUIDs = server.meta["scheduler_hint:different_host"] as? Set ?: return true + val affinityUUIDs = task.meta["scheduler_hint:different_host"] as? Set ?: return true return host.host.instances.none { it.uid in affinityUUIDs } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt index 902c760e..f506127a 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.filters -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView import org.opendc.compute.service.scheduler.FilterScheduler @@ -32,10 +32,10 @@ import org.opendc.compute.service.scheduler.FilterScheduler public fun interface HostFilter { /** * Test whether the specified [host] should be included in the selection - * for scheduling the specified [server]. + * for scheduling the specified [task]. */ public fun test( host: HostView, - server: Server, + task: Task, ): Boolean } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt index d9348802..7d5eb400 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.filters -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** @@ -33,7 +33,7 @@ import org.opendc.compute.service.HostView public class InstanceCountFilter(private val limit: Int) : HostFilter { override fun test( host: HostView, - server: Server, + task: Task, ): Boolean { return host.instanceCount < limit } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt index d8c3d540..0a28ccc6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt @@ -22,20 +22,20 @@ package org.opendc.compute.service.scheduler.filters -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** - * A [HostFilter] that filters hosts based on the memory requirements of a [Server] and the RAM available on the host. + * A [HostFilter] that filters hosts based on the memory requirements of a [Task] and the RAM available on the host. * * @param allocationRatio Virtual RAM to physical RAM allocation ratio. */ public class RamFilter(private val allocationRatio: Double) : HostFilter { override fun test( host: HostView, - server: Server, + task: Task, ): Boolean { - val requestedMemory = server.flavor.memorySize + val requestedMemory = task.flavor.memorySize val availableMemory = host.availableMemory val memoryCapacity = host.host.model.memoryCapacity diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/SameHostFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/SameHostFilter.kt index 4c31c66a..d8634285 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/SameHostFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/SameHostFilter.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.filters -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView import java.util.UUID @@ -32,10 +32,10 @@ import java.util.UUID public class SameHostFilter : HostFilter { override fun test( host: HostView, - server: Server, + task: Task, ): Boolean { @Suppress("UNCHECKED_CAST") - val affinityUUIDs = server.meta["scheduler_hint:same_host"] as? Set ?: return true + val affinityUUIDs = task.meta["scheduler_hint:same_host"] as? Set ?: return true return host.host.instances.any { it.uid in affinityUUIDs } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt index 01ece80e..f87658cd 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt @@ -22,22 +22,22 @@ package org.opendc.compute.service.scheduler.filters -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** - * A [HostFilter] that filters hosts based on the vCPU speed requirements of a [Server] and the available + * A [HostFilter] that filters hosts based on the vCPU speed requirements of a [Task] and the available * capacity on the host. */ public class VCpuCapacityFilter : HostFilter { override fun test( host: HostView, - server: Server, + task: Task, ): Boolean { - val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double + val requiredCapacity = task.flavor.meta["cpu-capacity"] as? Double val hostModel = host.host.model val availableCapacity = hostModel.cpuCapacity / hostModel.cpuCount - return requiredCapacity == null || availableCapacity >= (requiredCapacity / server.flavor.coreCount) + return requiredCapacity == null || availableCapacity >= (requiredCapacity / task.flavor.coreCount) } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt index cefb3f7a..442e58f6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt @@ -22,20 +22,20 @@ package org.opendc.compute.service.scheduler.filters -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** - * A [HostFilter] that filters hosts based on the vCPU requirements of a [Server] and the available vCPUs on the host. + * A [HostFilter] that filters hosts based on the vCPU requirements of a [Task] and the available vCPUs on the host. * * @param allocationRatio Virtual CPU to physical CPU allocation ratio. */ public class VCpuFilter(private val allocationRatio: Double) : HostFilter { override fun test( host: HostView, - server: Server, + task: Task, ): Boolean { - val requested = server.flavor.coreCount + val requested = task.flavor.coreCount val totalCores = host.host.model.coreCount val limit = totalCores * allocationRatio diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt index d6aafbc7..d84f5e68 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.weights -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** @@ -35,7 +35,7 @@ import org.opendc.compute.service.HostView public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight( host: HostView, - server: Server, + task: Task, ): Double { return host.availableMemory.toDouble() / host.host.model.cpuCount } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt index 825cfff9..3f2c4123 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.weights -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView import org.opendc.compute.service.scheduler.FilterScheduler @@ -36,26 +36,26 @@ public interface HostWeigher { public val multiplier: Double /** - * Obtain the weight of the specified [host] when scheduling the specified [server]. + * Obtain the weight of the specified [host] when scheduling the specified [task]. */ public fun getWeight( host: HostView, - server: Server, + task: Task, ): Double /** - * Obtain the weights for [hosts] when scheduling the specified [server]. + * Obtain the weights for [hosts] when scheduling the specified [task]. */ public fun getWeights( hosts: List, - server: Server, + task: Task, ): Result { val weights = DoubleArray(hosts.size) var min = Double.MAX_VALUE var max = Double.MIN_VALUE for ((i, host) in hosts.withIndex()) { - val weight = getWeight(host, server) + val weight = getWeight(host, task) weights[i] = weight min = kotlin.math.min(min, weight) max = kotlin.math.max(max, weight) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt index 9e0a9517..0789f109 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.weights -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** @@ -31,7 +31,7 @@ import org.opendc.compute.service.HostView public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight( host: HostView, - server: Server, + task: Task, ): Double { return host.instanceCount.toDouble() } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt index fca2e893..fb03d064 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.weights -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** @@ -34,7 +34,7 @@ import org.opendc.compute.service.HostView public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight( host: HostView, - server: Server, + task: Task, ): Double { return host.availableMemory.toDouble() } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt index 242660c3..6d1482ff 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.weights -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** @@ -31,11 +31,11 @@ import org.opendc.compute.service.HostView public class VCpuCapacityWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight( host: HostView, - server: Server, + task: Task, ): Double { val model = host.host.model - val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double ?: 0.0 - return model.cpuCapacity / model.cpuCount - requiredCapacity / server.flavor.coreCount + val requiredCapacity = task.flavor.meta["cpu-capacity"] as? Double ?: 0.0 + return model.cpuCapacity / model.cpuCount - requiredCapacity / task.flavor.coreCount } override fun toString(): String = "VCpuWeigher" diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt index be93458f..dfc30b54 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt @@ -22,7 +22,7 @@ package org.opendc.compute.service.scheduler.weights -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView /** @@ -37,7 +37,7 @@ public class VCpuWeigher(private val allocationRatio: Double, override val multi override fun getWeight( host: HostView, - server: Server, + task: Task, ): Double { return host.host.model.cpuCount * allocationRatio - host.provisionedCores } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 32d01660..e48244f0 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -36,9 +36,9 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.compute.api.Flavor import org.opendc.compute.api.Image -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.Task +import org.opendc.compute.api.TaskState +import org.opendc.compute.api.TaskWatcher import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostModel @@ -78,21 +78,21 @@ internal class ComputeServiceTest { assertEquals(emptyList(), client.queryFlavors()) assertEquals(emptyList(), client.queryImages()) - assertEquals(emptyList(), client.queryServers()) + assertEquals(emptyList(), client.queryTasks()) client.close() assertThrows { client.queryFlavors() } assertThrows { client.queryImages() } - assertThrows { client.queryServers() } + assertThrows { client.queryTasks() } assertThrows { client.findFlavor(UUID.randomUUID()) } assertThrows { client.findImage(UUID.randomUUID()) } - assertThrows { client.findServer(UUID.randomUUID()) } + assertThrows { client.findTask(UUID.randomUUID()) } assertThrows { client.newFlavor("test", 1, 2) } assertThrows { client.newImage("test") } - assertThrows { client.newServer("test", mockk(), mockk()) } + assertThrows { client.newTask("test", mockk(), mockk()) } } @Test @@ -106,12 +106,12 @@ internal class ComputeServiceTest { val image = client.newImage("test") assertEquals(listOf(image), client.queryImages()) assertEquals(image, client.findImage(image.uid)) - val server = client.newServer("test", image, flavor, start = false) - assertEquals(listOf(server), client.queryServers()) - assertEquals(server, client.findServer(server.uid)) + val server = client.newTask("test", image, flavor, start = false) + assertEquals(listOf(server), client.queryTasks()) + assertEquals(server, client.findTask(server.uid)) server.delete() - assertNull(client.findServer(server.uid)) + assertNull(client.findTask(server.uid)) image.delete() assertNull(client.findImage(image.uid)) @@ -174,12 +174,12 @@ internal class ComputeServiceTest { val client = service.newClient() val flavor = client.newFlavor("test", 1, 0) val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val server = client.newTask("test", image, flavor, start = false) server.start() delay(5L * 60 * 1000) server.reload() - assertEquals(ServerState.TERMINATED, server.state) + assertEquals(TaskState.TERMINATED, server.state) } @Test @@ -188,12 +188,12 @@ internal class ComputeServiceTest { val client = service.newClient() val flavor = client.newFlavor("test", 0, 1024) val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val server = client.newTask("test", image, flavor, start = false) server.start() delay(5L * 60 * 1000) server.reload() - assertEquals(ServerState.TERMINATED, server.state) + assertEquals(TaskState.TERMINATED, server.state) } @Test @@ -202,12 +202,12 @@ internal class ComputeServiceTest { val client = service.newClient() val flavor = client.newFlavor("test", 1, 1024) val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val server = client.newTask("test", image, flavor, start = false) server.start() delay(5L * 60 * 1000) server.reload() - assertEquals(ServerState.TERMINATED, server.state) + assertEquals(TaskState.TERMINATED, server.state) } @Test @@ -216,13 +216,13 @@ internal class ComputeServiceTest { val client = service.newClient() val flavor = client.newFlavor("test", 1, 1024) val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val server = client.newTask("test", image, flavor, start = false) server.start() server.stop() delay(5L * 60 * 1000) server.reload() - assertEquals(ServerState.TERMINATED, server.state) + assertEquals(TaskState.TERMINATED, server.state) } @Test @@ -239,12 +239,12 @@ internal class ComputeServiceTest { val client = service.newClient() val flavor = client.newFlavor("test", 1, 1024) val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val server = client.newTask("test", image, flavor, start = false) server.start() delay(10L * 60 * 1000) server.reload() - assertEquals(ServerState.PROVISIONING, server.state) + assertEquals(TaskState.PROVISIONING, server.state) verify { host.canFit(server) } } @@ -266,7 +266,7 @@ internal class ComputeServiceTest { val client = service.newClient() val flavor = client.newFlavor("test", 1, 1024) val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val server = client.newTask("test", image, flavor, start = false) server.start() delay(5L * 60 * 1000) @@ -276,7 +276,7 @@ internal class ComputeServiceTest { delay(5L * 60 * 1000) server.reload() - assertEquals(ServerState.PROVISIONING, server.state) + assertEquals(TaskState.PROVISIONING, server.state) verify { host.canFit(server) } } @@ -298,7 +298,7 @@ internal class ComputeServiceTest { val client = service.newClient() val flavor = client.newFlavor("test", 1, 1024) val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val server = client.newTask("test", image, flavor, start = false) delay(5L * 60 * 1000) @@ -308,7 +308,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.reload() - assertEquals(ServerState.PROVISIONING, server.state) + assertEquals(TaskState.PROVISIONING, server.state) verify(exactly = 0) { host.canFit(server) } } @@ -330,10 +330,10 @@ internal class ComputeServiceTest { val client = service.newClient() val flavor = client.newFlavor("test", 1, 1024) val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) - val slot = slot() + val server = client.newTask("test", image, flavor, start = false) + val slot = slot() - val watcher = mockk(relaxUnitFun = true) + val watcher = mockk(relaxUnitFun = true) server.watch(watcher) // Start server @@ -341,20 +341,20 @@ internal class ComputeServiceTest { delay(5L * 60 * 1000) coVerify { host.spawn(capture(slot)) } - listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } + listeners.forEach { it.onStateChanged(host, slot.captured, TaskState.RUNNING) } server.reload() - assertEquals(ServerState.RUNNING, server.state) + assertEquals(TaskState.RUNNING, server.state) - verify { watcher.onStateChanged(server, ServerState.RUNNING) } + verify { watcher.onStateChanged(server, TaskState.RUNNING) } // Stop server - listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.TERMINATED) } + listeners.forEach { it.onStateChanged(host, slot.captured, TaskState.TERMINATED) } server.reload() - assertEquals(ServerState.TERMINATED, server.state) + assertEquals(TaskState.TERMINATED, server.state) - verify { watcher.onStateChanged(server, ServerState.TERMINATED) } + verify { watcher.onStateChanged(server, TaskState.TERMINATED) } } @Test @@ -375,12 +375,12 @@ internal class ComputeServiceTest { val client = service.newClient() val flavor = client.newFlavor("test", 1, 1024) val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val server = client.newTask("test", image, flavor, start = false) server.start() delay(5L * 60 * 1000) server.reload() - assertEquals(ServerState.PROVISIONING, server.state) + assertEquals(TaskState.PROVISIONING, server.state) } } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceServerTest.kt deleted file mode 100644 index b420ee3b..00000000 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceServerTest.kt +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service - -import io.mockk.every -import io.mockk.mockk -import io.mockk.verify -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNotEquals -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.simulator.kotlin.runSimulation -import java.util.UUID - -/** - * Test suite for the [ServiceServer] implementation. - */ -class ServiceServerTest { - @Test - fun testEquality() { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - - val a = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val b = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - assertEquals(a, b) - } - - @Test - fun testInequalityWithDifferentType() { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val a = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - val b = mockk(relaxUnitFun = true) - every { b.uid } returns UUID.randomUUID() - - assertNotEquals(a, b) - } - - @Test - fun testInequalityWithIncorrectType() { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val a = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - assertNotEquals(a, Unit) - } - - @Test - fun testStartTerminatedServer() = - runSimulation { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - every { service.schedule(any()) } answers { ComputeService.SchedulingRequest(it.invocation.args[0] as ServiceServer, 0) } - - server.start() - - verify(exactly = 1) { service.schedule(server) } - assertEquals(ServerState.PROVISIONING, server.state) - } - - @Test - fun testStartDeletedServer() = - runSimulation { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - server.setState(ServerState.DELETED) - - assertThrows { server.start() } - } - - @Test - fun testStartProvisioningServer() = - runSimulation { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - server.setState(ServerState.PROVISIONING) - - server.start() - - assertEquals(ServerState.PROVISIONING, server.state) - } - - @Test - fun testStartRunningServer() = - runSimulation { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - server.setState(ServerState.RUNNING) - - server.start() - - assertEquals(ServerState.RUNNING, server.state) - } - - @Test - fun testStopProvisioningServer() = - runSimulation { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeService.SchedulingRequest(server, 0) - - every { service.schedule(any()) } returns request - - server.start() - server.stop() - - assertTrue(request.isCancelled) - assertEquals(ServerState.TERMINATED, server.state) - } - - @Test - fun testStopTerminatedServer() = - runSimulation { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - server.setState(ServerState.TERMINATED) - server.stop() - - assertEquals(ServerState.TERMINATED, server.state) - } - - @Test - fun testStopDeletedServer() = - runSimulation { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - server.setState(ServerState.DELETED) - server.stop() - - assertEquals(ServerState.DELETED, server.state) - } - - @Test - fun testStopRunningServer() = - runSimulation { - val service = mockk() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val host = mockk(relaxUnitFun = true) - - server.setState(ServerState.RUNNING) - server.host = host - server.stop() - yield() - - verify { host.stop(server) } - } - - @Test - fun testDeleteProvisioningServer() = - runSimulation { - val service = mockk(relaxUnitFun = true) - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeService.SchedulingRequest(server, 0) - - every { service.schedule(any()) } returns request - - server.start() - server.delete() - - assertTrue(request.isCancelled) - assertEquals(ServerState.DELETED, server.state) - verify { service.delete(server) } - } - - @Test - fun testDeleteTerminatedServer() = - runSimulation { - val service = mockk(relaxUnitFun = true) - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - server.setState(ServerState.TERMINATED) - server.delete() - - assertEquals(ServerState.DELETED, server.state) - - verify { service.delete(server) } - } - - @Test - fun testDeleteDeletedServer() = - runSimulation { - val service = mockk(relaxUnitFun = true) - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - - server.setState(ServerState.DELETED) - server.delete() - - assertEquals(ServerState.DELETED, server.state) - } - - @Test - fun testDeleteRunningServer() = - runSimulation { - val service = mockk(relaxUnitFun = true) - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val host = mockk(relaxUnitFun = true) - - server.setState(ServerState.RUNNING) - server.host = host - server.delete() - yield() - - verify { host.delete(server) } - verify { service.delete(server) } - } - - private fun mockFlavor(): ServiceFlavor { - val flavor = mockk() - every { flavor.name } returns "c5.large" - every { flavor.uid } returns UUID.randomUUID() - every { flavor.coreCount } returns 2 - every { flavor.memorySize } returns 4096 - return flavor - } - - private fun mockImage(): ServiceImage { - val image = mockk() - every { image.name } returns "ubuntu-20.04" - every { image.uid } returns UUID.randomUUID() - return image - } -} diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceTaskTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceTaskTest.kt new file mode 100644 index 00000000..e77665fe --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceTaskTest.kt @@ -0,0 +1,442 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.compute.api.Task +import org.opendc.compute.api.TaskState +import org.opendc.compute.service.driver.Host +import org.opendc.simulator.kotlin.runSimulation +import java.util.UUID + +/** + * Test suite for the [ServiceTask] implementation. + */ +class ServiceTaskTest { + @Test + fun testEquality() { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + + val a = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + val b = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + assertEquals(a, b) + } + + @Test + fun testInequalityWithDifferentType() { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val a = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + val b = mockk(relaxUnitFun = true) + every { b.uid } returns UUID.randomUUID() + + assertNotEquals(a, b) + } + + @Test + fun testInequalityWithIncorrectType() { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val a = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + assertNotEquals(a, Unit) + } + + @Test + fun testStartTerminatedServer() = + runSimulation { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + every { service.schedule(any()) } answers { ComputeService.SchedulingRequest(it.invocation.args[0] as ServiceTask, 0) } + + server.start() + + verify(exactly = 1) { service.schedule(server) } + assertEquals(TaskState.PROVISIONING, server.state) + } + + @Test + fun testStartDeletedServer() = + runSimulation { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + server.setState(TaskState.DELETED) + + assertThrows { server.start() } + } + + @Test + fun testStartProvisioningServer() = + runSimulation { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + server.setState(TaskState.PROVISIONING) + + server.start() + + assertEquals(TaskState.PROVISIONING, server.state) + } + + @Test + fun testStartRunningServer() = + runSimulation { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + server.setState(TaskState.RUNNING) + + server.start() + + assertEquals(TaskState.RUNNING, server.state) + } + + @Test + fun testStopProvisioningServer() = + runSimulation { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + val request = ComputeService.SchedulingRequest(server, 0) + + every { service.schedule(any()) } returns request + + server.start() + server.stop() + + assertTrue(request.isCancelled) + assertEquals(TaskState.TERMINATED, server.state) + } + + @Test + fun testStopTerminatedServer() = + runSimulation { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + server.setState(TaskState.TERMINATED) + server.stop() + + assertEquals(TaskState.TERMINATED, server.state) + } + + @Test + fun testStopDeletedServer() = + runSimulation { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + server.setState(TaskState.DELETED) + server.stop() + + assertEquals(TaskState.DELETED, server.state) + } + + @Test + fun testStopRunningServer() = + runSimulation { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + val host = mockk(relaxUnitFun = true) + + server.setState(TaskState.RUNNING) + server.host = host + server.stop() + yield() + + verify { host.stop(server) } + } + + @Test + fun testDeleteProvisioningServer() = + runSimulation { + val service = mockk(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + val request = ComputeService.SchedulingRequest(server, 0) + + every { service.schedule(any()) } returns request + + server.start() + server.delete() + + assertTrue(request.isCancelled) + assertEquals(TaskState.DELETED, server.state) + verify { service.delete(server) } + } + + @Test + fun testDeleteTerminatedServer() = + runSimulation { + val service = mockk(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + server.setState(TaskState.TERMINATED) + server.delete() + + assertEquals(TaskState.DELETED, server.state) + + verify { service.delete(server) } + } + + @Test + fun testDeleteDeletedServer() = + runSimulation { + val service = mockk(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + + server.setState(TaskState.DELETED) + server.delete() + + assertEquals(TaskState.DELETED, server.state) + } + + @Test + fun testDeleteRunningServer() = + runSimulation { + val service = mockk(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = + ServiceTask( + service, + uid, + "test", + flavor, + image, + mutableMapOf(), + mutableMapOf(), + ) + val host = mockk(relaxUnitFun = true) + + server.setState(TaskState.RUNNING) + server.host = host + server.delete() + yield() + + verify { host.delete(server) } + verify { service.delete(server) } + } + + private fun mockFlavor(): ServiceFlavor { + val flavor = mockk() + every { flavor.name } returns "c5.large" + every { flavor.uid } returns UUID.randomUUID() + every { flavor.coreCount } returns 2 + every { flavor.memorySize } returns 4096 + return flavor + } + + private fun mockImage(): ServiceImage { + val image = mockk() + every { image.name } returns "ubuntu-20.04" + every { image.uid } returns UUID.randomUUID() + return image + } +} diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt index 3bcecf9b..2478bf82 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt @@ -29,7 +29,7 @@ import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.service.HostView import org.opendc.compute.service.driver.HostModel import org.opendc.compute.service.driver.HostState @@ -78,11 +78,11 @@ internal class FilterSchedulerTest { weighers = emptyList(), ) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertNull(scheduler.select(server)) + assertNull(scheduler.select(task)) } @Test @@ -102,14 +102,14 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 // Make sure we get the first host both times assertAll( - { assertEquals(hostA, scheduler.select(server)) }, - { assertEquals(hostA, scheduler.select(server)) }, + { assertEquals(hostA, scheduler.select(task)) }, + { assertEquals(hostA, scheduler.select(task)) }, ) } @@ -132,14 +132,14 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 // Make sure we get the first host both times assertAll( - { assertEquals(hostB, scheduler.select(server)) }, - { assertEquals(hostA, scheduler.select(server)) }, + { assertEquals(hostB, scheduler.select(task)) }, + { assertEquals(hostA, scheduler.select(task)) }, ) } @@ -156,11 +156,11 @@ internal class FilterSchedulerTest { scheduler.addHost(host) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertNull(scheduler.select(server)) + assertNull(scheduler.select(task)) } @Test @@ -176,11 +176,11 @@ internal class FilterSchedulerTest { scheduler.addHost(host) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertEquals(host, scheduler.select(server)) + assertEquals(host, scheduler.select(task)) } @Test @@ -204,11 +204,11 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertEquals(hostB, scheduler.select(server)) + assertEquals(hostB, scheduler.select(task)) } @Test @@ -226,11 +226,11 @@ internal class FilterSchedulerTest { scheduler.addHost(host) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 2300 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 2300 - assertNull(scheduler.select(server)) + assertNull(scheduler.select(task)) } @Test @@ -254,11 +254,11 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertEquals(hostB, scheduler.select(server)) + assertEquals(hostB, scheduler.select(task)) } @Test @@ -276,11 +276,11 @@ internal class FilterSchedulerTest { scheduler.addHost(host) - val server = mockk() - every { server.flavor.coreCount } returns 8 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 8 + every { task.flavor.memorySize } returns 1024 - assertNull(scheduler.select(server)) + assertNull(scheduler.select(task)) } // TODO: fix when schedulers are reworked @@ -305,12 +305,12 @@ internal class FilterSchedulerTest { scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 - every { server.flavor.meta } returns mapOf("cpu-capacity" to 2 * 3200.0) + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 + every { task.flavor.meta } returns mapOf("cpu-capacity" to 2 * 3200.0) - assertEquals(hostB, scheduler.select(server)) + assertEquals(hostB, scheduler.select(task)) } @Test @@ -334,11 +334,11 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertEquals(hostB, scheduler.select(server)) + assertEquals(hostB, scheduler.select(task)) } @Test @@ -349,10 +349,10 @@ internal class FilterSchedulerTest { weighers = emptyList(), ) - val serverA = mockk() - every { serverA.uid } returns UUID.randomUUID() - every { serverA.flavor.coreCount } returns 2 - every { serverA.flavor.memorySize } returns 1024 + val taskA = mockk() + every { taskA.uid } returns UUID.randomUUID() + every { taskA.flavor.coreCount } returns 2 + every { taskA.flavor.memorySize } returns 1024 val hostA = mockk() every { hostA.host.state } returns HostState.UP @@ -363,22 +363,22 @@ internal class FilterSchedulerTest { val hostB = mockk() every { hostB.host.state } returns HostState.UP every { hostB.host.model } returns HostModel(4 * 2600.0, 1, 4, 2048) - every { hostB.host.instances } returns setOf(serverA) + every { hostB.host.instances } returns setOf(taskA) every { hostB.provisionedCores } returns 0 scheduler.addHost(hostA) scheduler.addHost(hostB) - val serverB = mockk() - every { serverB.flavor.coreCount } returns 2 - every { serverB.flavor.memorySize } returns 1024 - every { serverB.meta } returns emptyMap() + val taskB = mockk() + every { taskB.flavor.coreCount } returns 2 + every { taskB.flavor.memorySize } returns 1024 + every { taskB.meta } returns emptyMap() - assertEquals(hostA, scheduler.select(serverB)) + assertEquals(hostA, scheduler.select(taskB)) - every { serverB.meta } returns mapOf("scheduler_hint:same_host" to setOf(serverA.uid)) + every { taskB.meta } returns mapOf("scheduler_hint:same_host" to setOf(taskA.uid)) - assertEquals(hostB, scheduler.select(serverB)) + assertEquals(hostB, scheduler.select(taskB)) } @Test @@ -389,15 +389,15 @@ internal class FilterSchedulerTest { weighers = emptyList(), ) - val serverA = mockk() - every { serverA.uid } returns UUID.randomUUID() - every { serverA.flavor.coreCount } returns 2 - every { serverA.flavor.memorySize } returns 1024 + val taskA = mockk() + every { taskA.uid } returns UUID.randomUUID() + every { taskA.flavor.coreCount } returns 2 + every { taskA.flavor.memorySize } returns 1024 val hostA = mockk() every { hostA.host.state } returns HostState.UP every { hostA.host.model } returns HostModel(4 * 2600.0, 1, 4, 2048) - every { hostA.host.instances } returns setOf(serverA) + every { hostA.host.instances } returns setOf(taskA) every { hostA.provisionedCores } returns 3 val hostB = mockk() @@ -409,16 +409,16 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val serverB = mockk() - every { serverB.flavor.coreCount } returns 2 - every { serverB.flavor.memorySize } returns 1024 - every { serverB.meta } returns emptyMap() + val taskB = mockk() + every { taskB.flavor.coreCount } returns 2 + every { taskB.flavor.memorySize } returns 1024 + every { taskB.meta } returns emptyMap() - assertEquals(hostA, scheduler.select(serverB)) + assertEquals(hostA, scheduler.select(taskB)) - every { serverB.meta } returns mapOf("scheduler_hint:different_host" to setOf(serverA.uid)) + every { taskB.meta } returns mapOf("scheduler_hint:different_host" to setOf(taskA.uid)) - assertEquals(hostB, scheduler.select(serverB)) + assertEquals(hostB, scheduler.select(taskB)) } @Test @@ -442,11 +442,11 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertEquals(hostA, scheduler.select(server)) + assertEquals(hostA, scheduler.select(task)) } // TODO: fix test when updating schedulers @@ -471,11 +471,11 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertEquals(hostB, scheduler.select(server)) + assertEquals(hostB, scheduler.select(task)) } @Test @@ -499,11 +499,11 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertEquals(hostB, scheduler.select(server)) + assertEquals(hostB, scheduler.select(task)) } @Test @@ -527,10 +527,10 @@ internal class FilterSchedulerTest { scheduler.addHost(hostA) scheduler.addHost(hostB) - val server = mockk() - every { server.flavor.coreCount } returns 2 - every { server.flavor.memorySize } returns 1024 + val task = mockk() + every { task.flavor.coreCount } returns 2 + every { task.flavor.memorySize } returns 1024 - assertEquals(hostB, scheduler.select(server)) + assertEquals(hostB, scheduler.select(task)) } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 624a612f..ac0a8043 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -23,8 +23,8 @@ package org.opendc.compute.simulator import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState +import org.opendc.compute.api.Task +import org.opendc.compute.api.TaskState import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostModel @@ -60,7 +60,7 @@ import java.util.function.Supplier * @param clock The (virtual) clock used to track time. * @param machine The [SimBareMetalMachine] on which the host runs. * @param hypervisor The [SimHypervisor] to run on top of the machine. - * @param mapper A [SimWorkloadMapper] to map a [Server] to a [SimWorkload]. + * @param mapper A [SimWorkloadMapper] to map a [Task] to a [SimWorkload]. * @param bootModel A [Supplier] providing the [SimWorkload] to execute during the boot procedure of the hypervisor. * @param optimize A flag to indicate to optimize the machine models of the virtual machines. */ @@ -83,7 +83,7 @@ public class SimHost( /** * The virtual machines running on the hypervisor. */ - private val guests = HashMap() + private val guests = HashMap() private val localGuests = mutableListOf() private var localState: HostState = HostState.DOWN @@ -108,11 +108,11 @@ public class SimHost( private val guestListener = object : GuestListener { override fun onStart(guest: Guest) { - listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + listeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) } } override fun onStop(guest: Guest) { - listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + listeners.forEach { it.onStateChanged(this@SimHost, guest.task, guest.state) } } } @@ -140,21 +140,21 @@ public class SimHost( return localState } - override fun getInstances(): Set { + override fun getInstances(): Set { return guests.keys } - override fun canFit(server: Server): Boolean { - val sufficientMemory = model.memoryCapacity >= server.flavor.memorySize - val enoughCpus = model.coreCount >= server.flavor.coreCount - val canFit = hypervisor.canFit(server.flavor.toMachineModel()) + override fun canFit(task: Task): Boolean { + val sufficientMemory = model.memoryCapacity >= task.flavor.memorySize + val enoughCpus = model.coreCount >= task.flavor.coreCount + val canFit = hypervisor.canFit(task.flavor.toMachineModel()) return sufficientMemory && enoughCpus && canFit } - override fun spawn(server: Server) { - guests.computeIfAbsent(server) { key -> - require(canFit(key)) { "Server does not fit" } + override fun spawn(task: Task) { + guests.computeIfAbsent(task) { key -> + require(canFit(key)) { "Task does not fit" } val machine = hypervisor.newMachine(key.flavor.toMachineModel()) val newGuest = @@ -164,7 +164,7 @@ public class SimHost( hypervisor, mapper, guestListener, - server, + task, machine, ) @@ -173,25 +173,25 @@ public class SimHost( } } - override fun contains(server: Server): Boolean { - return server in guests + override fun contains(task: Task): Boolean { + return task in guests } - override fun start(server: Server) { - val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + override fun start(task: Task) { + val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" } guest.start() } - override fun stop(server: Server) { - val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + override fun stop(task: Task) { + val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" } guest.stop() } - override fun delete(server: Server) { - val guest = guests[server] ?: return + override fun delete(task: Task) { + val guest = guests[task] ?: return guest.delete() - guests.remove(server) + guests.remove(task) localGuests.remove(guest) } @@ -219,12 +219,12 @@ public class SimHost( val guests = localGuests.listIterator() for (guest in guests) { when (guest.state) { - ServerState.TERMINATED -> terminated++ - ServerState.RUNNING -> running++ - ServerState.ERROR -> error++ - ServerState.DELETED -> { + TaskState.TERMINATED -> terminated++ + TaskState.RUNNING -> running++ + TaskState.ERROR -> error++ + TaskState.DELETED -> { // Remove guests that have been deleted - this.guests.remove(guest.server) + this.guests.remove(guest.task) guests.remove() } else -> invalid++ @@ -244,8 +244,8 @@ public class SimHost( ) } - override fun getSystemStats(server: Server): GuestSystemStats { - val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + override fun getSystemStats(task: Task): GuestSystemStats { + val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" } return guest.getSystemStats() } @@ -265,8 +265,8 @@ public class SimHost( ) } - override fun getCpuStats(server: Server): GuestCpuStats { - val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + override fun getCpuStats(task: Task): GuestCpuStats { + val guest = requireNotNull(guests[task]) { "Unknown task ${task.uid} at host $uid" } return guest.getCpuStats() } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt index c05f1a2c..907f6acd 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt @@ -22,14 +22,14 @@ package org.opendc.compute.simulator -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.simulator.compute.workload.SimWorkload /** - * A [SimWorkloadMapper] that maps a [Server] to a workload via the meta-data. + * A [SimWorkloadMapper] that maps a [Task] to a workload via the meta-data. */ public class SimMetaWorkloadMapper(private val key: String = "workload") : SimWorkloadMapper { - override fun createWorkload(server: Server): SimWorkload { - return requireNotNull(server.meta[key] ?: server.image.meta[key]) as SimWorkload + override fun createWorkload(task: Task): SimWorkload { + return requireNotNull(task.meta[key] ?: task.image.meta[key]) as SimWorkload } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt index 83baa61a..a85091a0 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt @@ -23,15 +23,15 @@ package org.opendc.compute.simulator import org.opendc.compute.api.Image -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.simulator.compute.workload.SimWorkload /** - * A [SimWorkloadMapper] is responsible for mapping a [Server] and [Image] to a [SimWorkload] that can be simulated. + * A [SimWorkloadMapper] is responsible for mapping a [Task] and [Image] to a [SimWorkload] that can be simulated. */ public fun interface SimWorkloadMapper { /** - * Map the specified [server] to a [SimWorkload] that can be simulated. + * Map the specified [task] to a [SimWorkload] that can be simulated. */ - public fun createWorkload(server: Server): SimWorkload + public fun createWorkload(task: Task): SimWorkload } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt index 7d4b7302..24c382dd 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt @@ -22,7 +22,7 @@ package org.opendc.compute.simulator.internal -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.simulator.SimMetaWorkloadMapper import org.opendc.compute.simulator.SimWorkloadMapper import org.opendc.simulator.compute.workload.SimWorkload @@ -36,8 +36,8 @@ import java.time.Duration internal object DefaultWorkloadMapper : SimWorkloadMapper { private val delegate = SimMetaWorkloadMapper() - override fun createWorkload(server: Server): SimWorkload { - val workload = delegate.createWorkload(server) + override fun createWorkload(task: Task): SimWorkload { + val workload = delegate.createWorkload(task) // FIXME: look at connecting this to frontend. Probably not needed since the duration is so small val bootWorkload = SimWorkloads.runtime(Duration.ofMillis(1), 0.8, 0L, 0L) return SimWorkloads.chain(bootWorkload, workload) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index cbdda0cb..1925233f 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -23,8 +23,8 @@ package org.opendc.compute.simulator.internal import mu.KotlinLogging -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState +import org.opendc.compute.api.Task +import org.opendc.compute.api.TaskState import org.opendc.compute.service.driver.telemetry.GuestCpuStats import org.opendc.compute.service.driver.telemetry.GuestSystemStats import org.opendc.compute.simulator.SimHost @@ -46,16 +46,16 @@ internal class Guest( private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper, private val listener: GuestListener, - val server: Server, + val task: Task, val machine: SimVirtualMachine, ) { /** * The state of the [Guest]. * - * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for - * a server. + * [TaskState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for + * a task. */ - var state: ServerState = ServerState.TERMINATED + var state: TaskState = TaskState.TERMINATED private set /** @@ -63,14 +63,14 @@ internal class Guest( */ fun start() { when (state) { - ServerState.TERMINATED, ServerState.ERROR -> { - LOGGER.info { "User requested to start server ${server.uid}" } + TaskState.TERMINATED, TaskState.ERROR -> { + LOGGER.info { "User requested to start task ${task.uid}" } doStart() } - ServerState.RUNNING -> return - ServerState.DELETED -> { - LOGGER.warn { "User tried to start deleted server" } - throw IllegalArgumentException("Server is deleted") + TaskState.RUNNING -> return + TaskState.DELETED -> { + LOGGER.warn { "User tried to start deleted task" } + throw IllegalArgumentException("Task is deleted") } else -> assert(false) { "Invalid state transition" } } @@ -81,9 +81,9 @@ internal class Guest( */ fun stop() { when (state) { - ServerState.RUNNING -> doStop(ServerState.TERMINATED) - ServerState.ERROR -> doRecover() - ServerState.TERMINATED, ServerState.DELETED -> return + TaskState.RUNNING -> doStop(TaskState.TERMINATED) + TaskState.ERROR -> doRecover() + TaskState.TERMINATED, TaskState.DELETED -> return else -> assert(false) { "Invalid state transition" } } } @@ -97,28 +97,28 @@ internal class Guest( fun delete() { stop() - state = ServerState.DELETED + state = TaskState.DELETED hypervisor.removeMachine(machine) } /** * Fail the guest if it is active. * - * This operation forcibly stops the guest and puts the server into an error state. + * This operation forcibly stops the guest and puts the task into an error state. */ fun fail() { - if (state != ServerState.RUNNING) { + if (state != TaskState.RUNNING) { return } - doStop(ServerState.ERROR) + doStop(TaskState.ERROR) } /** * Recover the guest if it is in an error state. */ fun recover() { - if (state != ServerState.ERROR) { + if (state != TaskState.ERROR) { return } @@ -170,23 +170,23 @@ internal class Guest( onStart() - val workload: SimWorkload = mapper.createWorkload(server) + val workload: SimWorkload = mapper.createWorkload(task) workload.setOffset(clock.millis()) - val meta = mapOf("driver" to host, "server" to server) + server.meta + val meta = mapOf("driver" to host, "task" to task) + task.meta ctx = machine.startWorkload(workload, meta) { cause -> - onStop(if (cause != null) ServerState.ERROR else ServerState.TERMINATED) + onStop(if (cause != null) TaskState.ERROR else TaskState.TERMINATED) ctx = null } } /** - * Attempt to stop the server and put it into [target] state. + * Attempt to stop the task and put it into [target] state. */ - private fun doStop(target: ServerState) { + private fun doStop(target: TaskState) { assert(ctx != null) { "Invalid job state" } val ctx = ctx ?: return - if (target == ServerState.ERROR) { + if (target == TaskState.ERROR) { ctx.shutdown(Exception("Stopped because of ERROR")) } else { ctx.shutdown() @@ -199,7 +199,7 @@ internal class Guest( * Attempt to recover from an error state. */ private fun doRecover() { - state = ServerState.TERMINATED + state = TaskState.TERMINATED } /** @@ -207,14 +207,14 @@ internal class Guest( */ private fun onStart() { localBootTime = clock.instant() - state = ServerState.RUNNING + state = TaskState.RUNNING listener.onStart(this) } /** * This method is invoked when the guest stopped. */ - private fun onStop(target: ServerState) { + private fun onStop(target: TaskState) { updateUptime() state = target @@ -235,9 +235,9 @@ internal class Guest( val duration = now - localLastReport localLastReport = now - if (state == ServerState.RUNNING) { + if (state == TaskState.RUNNING) { localUptime += duration - } else if (state == ServerState.ERROR) { + } else if (state == TaskState.ERROR) { localDowntime += duration } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 8c2ffd0c..ec3bf225 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -31,9 +31,9 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.api.Flavor import org.opendc.compute.api.Image -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.Task +import org.opendc.compute.api.TaskState +import org.opendc.compute.api.TaskWatcher import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.simulator.compute.SimBareMetalMachine @@ -119,16 +119,16 @@ internal class SimHostTest { override fun onStateChanged( host: Host, - server: Server, - newState: ServerState, + task: Task, + newState: TaskState, ) { - if (newState == ServerState.TERMINATED && ++finished == 1) { + if (newState == TaskState.TERMINATED && ++finished == 1) { cont.resume(Unit) } } }, ) - val server = MockServer(UUID.randomUUID(), "a", flavor, vmImage) + val server = MockTask(UUID.randomUUID(), "a", flavor, vmImage) host.spawn(server) host.start(server) } @@ -211,18 +211,18 @@ internal class SimHostTest { override fun onStateChanged( host: Host, - server: Server, - newState: ServerState, + task: Task, + newState: TaskState, ) { - if (newState == ServerState.TERMINATED && ++finished == 2) { + if (newState == TaskState.TERMINATED && ++finished == 2) { cont.resume(Unit) } } }, ) - val serverA = MockServer(UUID.randomUUID(), "a", flavor, vmImageA) + val serverA = MockTask(UUID.randomUUID(), "a", flavor, vmImageA) host.spawn(serverA) - val serverB = MockServer(UUID.randomUUID(), "b", flavor, vmImageB) + val serverB = MockTask(UUID.randomUUID(), "b", flavor, vmImageB) host.spawn(serverB) host.start(serverA) @@ -282,7 +282,7 @@ internal class SimHostTest { ), ) val flavor = MockFlavor(2, 0) - val server = MockServer(UUID.randomUUID(), "a", flavor, image) + val server = MockTask(UUID.randomUUID(), "a", flavor, image) coroutineScope { host.spawn(server) @@ -297,10 +297,10 @@ internal class SimHostTest { object : HostListener { override fun onStateChanged( host: Host, - server: Server, - newState: ServerState, + task: Task, + newState: TaskState, ) { - if (newState == ServerState.TERMINATED) { + if (newState == TaskState.TERMINATED) { cont.resume(Unit) } } @@ -360,17 +360,17 @@ internal class SimHostTest { } } - private class MockServer( + private class MockTask( override val uid: UUID, override val name: String, override val flavor: Flavor, override val image: Image, - ) : Server { + ) : Task { override val labels: Map = emptyMap() override val meta: Map = emptyMap() - override val state: ServerState = ServerState.TERMINATED + override val state: TaskState = TaskState.TERMINATED override val launchedAt: Instant? = null @@ -380,9 +380,9 @@ internal class SimHostTest { override fun delete() {} - override fun watch(watcher: ServerWatcher) {} + override fun watch(watcher: TaskWatcher) {} - override fun unwatch(watcher: ServerWatcher) {} + override fun unwatch(watcher: TaskWatcher) {} override fun reload() {} } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt index 0b11b57d..5bd237fd 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt @@ -29,15 +29,15 @@ import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.common.Dispatcher import org.opendc.common.asCoroutineDispatcher -import org.opendc.compute.api.Server +import org.opendc.compute.api.Task import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.telemetry.table.HostInfo import org.opendc.compute.telemetry.table.HostTableReader -import org.opendc.compute.telemetry.table.ServerInfo -import org.opendc.compute.telemetry.table.ServerTableReader import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.compute.telemetry.table.TaskInfo +import org.opendc.compute.telemetry.table.TaskTableReader import java.time.Duration import java.time.Instant @@ -73,9 +73,9 @@ public class ComputeMetricReader( private val hostTableReaders = mutableMapOf() /** - * Mapping from [Server] instances to [ServerTableReaderImpl] + * Mapping from [Task] instances to [TaskTableReaderImpl] */ - private val serverTableReaders = mutableMapOf() + private val taskTableReaders = mutableMapOf() /** * The background job that is responsible for collecting the metrics every cycle. @@ -109,8 +109,8 @@ public class ComputeMetricReader( reader.reset() } - for (server in this.service.servers) { - val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it, startTime) } + for (task in this.service.tasks) { + val reader = this.taskTableReaders.computeIfAbsent(task) { TaskTableReaderImpl(service, it, startTime) } reader.record(now) this.monitor.record(reader.copy()) reader.reset() @@ -147,9 +147,9 @@ public class ComputeMetricReader( _hostsUp = table.hostsUp _hostsDown = table.hostsDown - _serversTotal = table.serversTotal - _serversPending = table.serversPending - _serversActive = table.serversActive + _tasksTotal = table.tasksTotal + _tasksPending = table.tasksPending + _tasksActive = table.tasksActive _attemptsSuccess = table.attemptsSuccess _attemptsFailure = table.attemptsFailure _attemptsError = table.attemptsError @@ -171,17 +171,17 @@ public class ComputeMetricReader( get() = _hostsDown private var _hostsDown = 0 - override val serversTotal: Int - get() = _serversTotal - private var _serversTotal = 0 + override val tasksTotal: Int + get() = _tasksTotal + private var _tasksTotal = 0 - override val serversPending: Int - get() = _serversPending - private var _serversPending = 0 + override val tasksPending: Int + get() = _tasksPending + private var _tasksPending = 0 - override val serversActive: Int - get() = _serversActive - private var _serversActive = 0 + override val tasksActive: Int + get() = _tasksActive + private var _tasksActive = 0 override val attemptsSuccess: Int get() = _attemptsSuccess @@ -205,9 +205,9 @@ public class ComputeMetricReader( val stats = service.getSchedulerStats() _hostsUp = stats.hostsAvailable _hostsDown = stats.hostsUnavailable - _serversTotal = stats.serversTotal - _serversPending = stats.serversPending - _serversActive = stats.serversActive + _tasksTotal = stats.tasksTotal + _tasksPending = stats.tasksPending + _tasksActive = stats.tasksActive _attemptsSuccess = stats.attemptsSuccess.toInt() _attemptsFailure = stats.attemptsFailure.toInt() _attemptsError = stats.attemptsError.toInt() @@ -418,21 +418,21 @@ public class ComputeMetricReader( } /** - * An aggregator for server metrics before they are reported. + * An aggregator for task metrics before they are reported. */ - private class ServerTableReaderImpl( + private class TaskTableReaderImpl( private val service: ComputeService, - server: Server, + task: Task, private val startTime: Duration = Duration.ofMillis(0), - ) : ServerTableReader { - override fun copy(): ServerTableReader { - val newServerTable = ServerTableReaderImpl(service, _server) - newServerTable.setValues(this) + ) : TaskTableReader { + override fun copy(): TaskTableReader { + val newTaskTable = TaskTableReaderImpl(service, _task) + newTaskTable.setValues(this) - return newServerTable + return newTaskTable } - override fun setValues(table: ServerTableReader) { + override fun setValues(table: TaskTableReader) { host = table.host _timestamp = table.timestamp @@ -450,25 +450,25 @@ public class ComputeMetricReader( _bootTimeAbsolute = table.bootTimeAbsolute } - private val _server = server + private val _task = task /** - * The static information about this server. + * The static information about this task. */ - override val server = - ServerInfo( - server.uid.toString(), - server.name, + override val task = + TaskInfo( + task.uid.toString(), + task.name, "vm", "x86", - server.image.uid.toString(), - server.image.name, - server.flavor.coreCount, - server.flavor.memorySize, + task.image.uid.toString(), + task.image.name, + task.flavor.coreCount, + task.flavor.memorySize, ) /** - * The [HostInfo] of the host on which the server is hosted. + * The [HostInfo] of the host on which the task is hosted. */ override var host: HostInfo? = null private var _host: Host? = null @@ -531,14 +531,14 @@ public class ComputeMetricReader( * Record the next cycle. */ fun record(now: Instant) { - val newHost = service.lookupHost(_server) + val newHost = service.lookupHost(_task) if (newHost != null && newHost.uid != _host?.uid) { _host = newHost host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) } - val cpuStats = _host?.getCpuStats(_server) - val sysStats = _host?.getSystemStats(_server) + val cpuStats = _host?.getCpuStats(_task) + val sysStats = _host?.getSystemStats(_task) _timestamp = now _timestampAbsolute = now + startTime @@ -550,7 +550,7 @@ public class ComputeMetricReader( _cpuLostTime = cpuStats?.lostTime ?: 0 _uptime = sysStats?.uptime?.toMillis() ?: 0 _downtime = sysStats?.downtime?.toMillis() ?: 0 - _provisionTime = _server.launchedAt + _provisionTime = _task.launchedAt _bootTime = sysStats?.bootTime if (sysStats != null) { diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt index b236a7df..1df058fb 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt @@ -23,8 +23,8 @@ package org.opendc.compute.telemetry import org.opendc.compute.telemetry.table.HostTableReader -import org.opendc.compute.telemetry.table.ServerTableReader import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.compute.telemetry.table.TaskTableReader /** * A monitor that tracks the metrics and events of the OpenDC Compute service. @@ -33,7 +33,7 @@ public interface ComputeMonitor { /** * Record an entry with the specified [reader]. */ - public fun record(reader: ServerTableReader) {} + public fun record(reader: TaskTableReader) {} /** * Record an entry with the specified [reader]. diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt index 02e3e0bb..161c0936 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt @@ -36,8 +36,8 @@ import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.jsonObject import org.opendc.common.logger.logger import org.opendc.compute.telemetry.table.HostTableReader -import org.opendc.compute.telemetry.table.ServerTableReader import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.compute.telemetry.table.TaskTableReader import org.opendc.trace.util.parquet.exporter.ColListSerializer import org.opendc.trace.util.parquet.exporter.ExportColumn import org.opendc.trace.util.parquet.exporter.Exportable @@ -48,22 +48,22 @@ import org.opendc.trace.util.parquet.exporter.columnSerializer * parquet files for compute workloads. * * @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file. - * @param[serverExportColumns] the columns that will be included in the `server.parquet` raw output file. + * @param[taskExportColumns] the columns that will be included in the `task.parquet` raw output file. * @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file. */ @Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class) public data class ComputeExportConfig( public val hostExportColumns: Set>, - public val serverExportColumns: Set>, + public val taskExportColumns: Set>, public val serviceExportColumns: Set>, ) { public constructor( hostExportColumns: Collection>, - serverExportColumns: Collection>, + taskExportColumns: Collection>, serviceExportColumns: Collection>, ) : this( hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS, - serverExportColumns.toSet() + DfltServerExportColumns.BASE_EXPORT_COLUMNS, + taskExportColumns.toSet() + DfltTaskExportColumns.BASE_EXPORT_COLUMNS, serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS, ) @@ -74,7 +74,7 @@ public data class ComputeExportConfig( """ | === Compute Export Config === | Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')} - | Server columns : ${serverExportColumns.map { it.name }.toString().trim('[', ']')} + | Task columns : ${taskExportColumns.map { it.name }.toString().trim('[', ']')} | Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')} """.trimIndent() @@ -87,20 +87,20 @@ public data class ComputeExportConfig( */ public fun loadDfltColumns() { DfltHostExportColumns - DfltServerExportColumns + DfltTaskExportColumns DfltServiceExportColumns } /** * Config that includes all columns defined in [DfltHostExportColumns], - * [DfltServerExportColumns], [DfltServiceExportColumns] among all other loaded - * columns for [HostTableReader], [ServerTableReader] and [ServiceTableReader]. + * [DfltTaskExportColumns], [DfltServiceExportColumns] among all other loaded + * columns for [HostTableReader], [TaskTableReader] and [ServiceTableReader]. */ public val ALL_COLUMNS: ComputeExportConfig by lazy { loadDfltColumns() ComputeExportConfig( hostExportColumns = ExportColumn.getAllLoadedColumns(), - serverExportColumns = ExportColumn.getAllLoadedColumns(), + taskExportColumns = ExportColumn.getAllLoadedColumns(), serviceExportColumns = ExportColumn.getAllLoadedColumns(), ) } @@ -118,8 +118,8 @@ public data class ComputeExportConfig( ListSerializer(columnSerializer()).descriptor, ) element( - "serverExportColumns", - ListSerializer(columnSerializer()).descriptor, + "taskExportColumns", + ListSerializer(columnSerializer()).descriptor, ) element( "serviceExportColumns", @@ -139,12 +139,12 @@ public data class ComputeExportConfig( val elem = jsonDec.decodeJsonElement().jsonObject val hostFields: List> = elem["hostExportColumns"].toFieldList() - val serverFields: List> = elem["serverExportColumns"].toFieldList() + val taskFields: List> = elem["taskExportColumns"].toFieldList() val serviceFields: List> = elem["serviceExportColumns"].toFieldList() return ComputeExportConfig( hostExportColumns = hostFields, - serverExportColumns = serverFields, + taskExportColumns = taskFields, serviceExportColumns = serviceFields, ) } @@ -163,8 +163,8 @@ public data class ComputeExportConfig( encodeSerializableElement( descriptor, 1, - ColListSerializer(columnSerializer()), - value.serverExportColumns.toList(), + ColListSerializer(columnSerializer()), + value.taskExportColumns.toList(), ) encodeSerializableElement( descriptor, diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt deleted file mode 100644 index 91d6c9bf..00000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.ServerTableReader -import org.opendc.trace.util.parquet.exporter.ExportColumn - -/** - * This object wraps the [ExportColumn]s to solves ambiguity for field - * names that are included in more than 1 exportable - * - * Additionally, it allows to load all the fields at once by just its symbol, - * so that these columns can be deserialized. Additional fields can be added - * from anywhere, and they are deserializable as long as they are loaded by the jvm. - * - * ```kotlin - * ... - * // Loads the column - * DfltServerExportColumns - * ... - * ``` - */ -public object DfltServerExportColumns { - public val TIMESTAMP: ExportColumn = - ExportColumn( - field = Types.required(INT64).named("timestamp"), - ) { it.timestamp.toEpochMilli() } - - public val TIMESTAMP_ABS: ExportColumn = - ExportColumn( - field = Types.required(INT64).named("timestamp_absolute"), - ) { it.timestampAbsolute.toEpochMilli() } - - public val SERVER_ID: ExportColumn = - ExportColumn( - field = - Types.required(BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_id"), - ) { Binary.fromString(it.server.id) } - - public val HOST_ID: ExportColumn = - ExportColumn( - field = - Types.optional(BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - ) { it.host?.id?.let { Binary.fromString(it) } } - - public val SERVER_NAME: ExportColumn = - ExportColumn( - field = - Types.required(BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_name"), - ) { Binary.fromString(it.server.name) } - - public val CPU_COUNT: ExportColumn = - ExportColumn( - field = Types.required(INT32).named("cpu_count"), - ) { it.server.cpuCount } - - public val MEM_CAPACITY: ExportColumn = - ExportColumn( - field = Types.required(INT64).named("mem_capacity"), - ) { it.server.memCapacity } - - public val CPU_LIMIT: ExportColumn = - ExportColumn( - field = Types.required(DOUBLE).named("cpu_limit"), - ) { it.cpuLimit } - - public val CPU_TIME_ACTIVE: ExportColumn = - ExportColumn( - field = Types.required(INT64).named("cpu_time_active"), - ) { it.cpuActiveTime } - - public val CPU_TIME_IDLE: ExportColumn = - ExportColumn( - field = Types.required(INT64).named("cpu_time_idle"), - ) { it.cpuIdleTime } - - public val CPU_TIME_STEAL: ExportColumn = - ExportColumn( - field = Types.required(INT64).named("cpu_time_steal"), - ) { it.cpuStealTime } - - public val CPU_TIME_LOST: ExportColumn = - ExportColumn( - field = Types.required(INT64).named("cpu_time_lost"), - ) { it.cpuLostTime } - - public val UP_TIME: ExportColumn = - ExportColumn( - field = Types.required(INT64).named("uptime"), - ) { it.uptime } - - public val DOWN_TIME: ExportColumn = - ExportColumn( - field = Types.required(INT64).named("downtime"), - ) { it.downtime } - - public val PROVISION_TIME: ExportColumn = - ExportColumn( - field = Types.optional(INT64).named("provision_time"), - ) { it.provisionTime?.toEpochMilli() } - - public val BOOT_TIME: ExportColumn = - ExportColumn( - field = Types.optional(INT64).named("boot_time"), - ) { it.bootTime?.toEpochMilli() } - - public val BOOT_TIME_ABS: ExportColumn = - ExportColumn( - field = Types.optional(INT64).named("boot_time_absolute"), - ) { it.bootTimeAbsolute?.toEpochMilli() } - - /** - * The columns that are always included in the output file. - */ - internal val BASE_EXPORT_COLUMNS = - setOf( - TIMESTAMP_ABS, - TIMESTAMP, - ) -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt index 89396545..8038060d 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt @@ -59,15 +59,15 @@ public object DfltServiceExportColumns { field = Types.required(INT32).named("hosts_up"), ) { it.hostsUp } - public val SERVERS_PENDING: ExportColumn = + public val TASKS_PENDING: ExportColumn = ExportColumn( - field = Types.required(INT32).named("servers_pending"), - ) { it.serversPending } + field = Types.required(INT32).named("tasks_pending"), + ) { it.tasksPending } - public val SERVERS_ACTIVE: ExportColumn = + public val TASKS_ACTIVE: ExportColumn = ExportColumn( - field = Types.required(INT32).named("servers_active"), - ) { it.serversActive } + field = Types.required(INT32).named("tasks_active"), + ) { it.tasksActive } public val ATTEMPTS_SUCCESS: ExportColumn = ExportColumn( diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt new file mode 100644 index 00000000..5bb7dd1f --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.TaskTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltTaskExportColumns + * ... + * ``` + */ +public object DfltTaskExportColumns { + public val TIMESTAMP: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val TASK_ID: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("task_id"), + ) { Binary.fromString(it.task.id) } + + public val HOST_ID: ExportColumn = + ExportColumn( + field = + Types.optional(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + ) { it.host?.id?.let { Binary.fromString(it) } } + + public val TASK_NAME: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("task_name"), + ) { Binary.fromString(it.task.name) } + + public val CPU_COUNT: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("cpu_count"), + ) { it.task.cpuCount } + + public val MEM_CAPACITY: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("mem_capacity"), + ) { it.task.memCapacity } + + public val CPU_LIMIT: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_limit"), + ) { it.cpuLimit } + + public val CPU_TIME_ACTIVE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_active"), + ) { it.cpuActiveTime } + + public val CPU_TIME_IDLE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_idle"), + ) { it.cpuIdleTime } + + public val CPU_TIME_STEAL: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_steal"), + ) { it.cpuStealTime } + + public val CPU_TIME_LOST: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_lost"), + ) { it.cpuLostTime } + + public val UP_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("uptime"), + ) { it.uptime } + + public val DOWN_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("downtime"), + ) { it.downtime } + + public val PROVISION_TIME: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("provision_time"), + ) { it.provisionTime?.toEpochMilli() } + + public val BOOT_TIME: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time"), + ) { it.bootTime?.toEpochMilli() } + + public val BOOT_TIME_ABS: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time_absolute"), + ) { it.bootTimeAbsolute?.toEpochMilli() } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt index 6bea4cc2..3b7a7c0c 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt @@ -24,8 +24,8 @@ package org.opendc.compute.telemetry.export.parquet import org.opendc.compute.telemetry.ComputeMonitor import org.opendc.compute.telemetry.table.HostTableReader -import org.opendc.compute.telemetry.table.ServerTableReader import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.compute.telemetry.table.TaskTableReader import org.opendc.trace.util.parquet.exporter.ExportColumn import org.opendc.trace.util.parquet.exporter.Exportable import org.opendc.trace.util.parquet.exporter.Exporter @@ -36,15 +36,15 @@ import java.io.File */ public class ParquetComputeMonitor( private val hostExporter: Exporter, - private val serverExporter: Exporter, + private val taskExporter: Exporter, private val serviceExporter: Exporter, ) : ComputeMonitor, AutoCloseable { override fun record(reader: HostTableReader) { hostExporter.write(reader) } - override fun record(reader: ServerTableReader) { - serverExporter.write(reader) + override fun record(reader: TaskTableReader) { + taskExporter.write(reader) } override fun record(reader: ServiceTableReader) { @@ -53,7 +53,7 @@ public class ParquetComputeMonitor( override fun close() { hostExporter.close() - serverExporter.close() + taskExporter.close() serviceExporter.close() } @@ -76,13 +76,13 @@ public class ParquetComputeMonitor( partition = partition, bufferSize = bufferSize, hostExportColumns = computeExportConfig.hostExportColumns, - serverExportColumns = computeExportConfig.serverExportColumns, + taskExportColumns = computeExportConfig.taskExportColumns, serviceExportColumns = computeExportConfig.serviceExportColumns, ) /** * Constructor that loads default [ExportColumn]s defined in - * [DfltHostExportColumns], [DfltServerExportColumns], [DfltServiceExportColumns] + * [DfltHostExportColumns], [DfltTaskExportColumns], [DfltServiceExportColumns] * in case optional parameters are omitted and all fields need to be retrieved. * * @param[base] parent pathname for output file. @@ -94,7 +94,7 @@ public class ParquetComputeMonitor( partition: String, bufferSize: Int, hostExportColumns: Collection>? = null, - serverExportColumns: Collection>? = null, + taskExportColumns: Collection>? = null, serviceExportColumns: Collection>? = null, ): ParquetComputeMonitor { // Loads the fields in case they need to be retrieved if optional params are omitted. @@ -107,10 +107,10 @@ public class ParquetComputeMonitor( columns = hostExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, ), - serverExporter = + taskExporter = Exporter( - outputFile = File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() }, - columns = serverExportColumns ?: Exportable.getAllLoadedColumns(), + outputFile = File(base, "$partition/task.parquet").also { it.parentFile.mkdirs() }, + columns = taskExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, ), serviceExporter = diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md index f48bc229..aee63fc9 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md @@ -1,8 +1,8 @@ ### Summary -Added output configuration, that can be defined in the scenario `.json` file, that allows to select which columns are to be included in the raw oputput files `host.parquet`, `server.parquet` and `service.parquet`. +Added output configuration, that can be defined in the scenario `.json` file, that allows to select which columns are to be included in the raw oputput files `host.parquet`, `task.parquet` and `service.parquet`. ### Columns -The 'default' columns are defined in `DfltHostExportcolumns`, `DfltServerExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be definied anywhere (`ExportColumn`) and it is going to be deserializable as long as it is loaded by the jvm. +The 'default' columns are defined in `DfltHostExportcolumns`, `DfltTaskExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be definied anywhere (`ExportColumn`) and it is going to be deserializable as long as it is loaded by the jvm. ### Deserialization Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex is provided, the default one is used. The default regex matches the column name in case-insensitive manner, either with `_` as in the name or with ` ` (blank space). @@ -21,7 +21,7 @@ Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex "type": "object", "properties": { "hostExportColumns": { "type": "array" }, - "serverExportColumns": { "type": "array" } , + "taskExportColumns": { "type": "array" } , "serviceExportColumns": { "type": "array" } , "required": [ /* NONE REQUIRED */ ] } @@ -49,8 +49,8 @@ Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex ... "computeExportConfig": { "hostExportColumns": ["timestamp", "timestamp_absolute", "invalid-entry1", "guests_invalid"], - "serverExportColumns": ["invalid-entry2"], - "serviceExportColumns": ["timestamp", "servers_active", "servers_pending"] + "taskExportColumns": ["invalid-entry2"], + "serviceExportColumns": ["timestamp", "tasks_active", "tasks_pending"] }, ... ``` @@ -59,12 +59,12 @@ Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex // console output 10:51:56.561 [ERROR] ColListSerializer - no match found for column "invalid-entry1", ignoring... 10:51:56.563 [ERROR] ColListSerializer - no match found for column "invalid-entry2", ignoring... -10:51:56.564 [WARN] ComputeExportConfig - deserialized list of export columns for exportable ServerTableReader produced empty list, falling back to all loaded columns +10:51:56.564 [WARN] ComputeExportConfig - deserialized list of export columns for exportable TaskTableReader produced empty list, falling back to all loaded columns 10:51:56.584 [INFO] ScenariosSpec - | === Compute Export Config === | Host columns : timestamp, timestamp_absolute, guests_invalid -| Server columns : timestamp, timestamp_absolute, server_id, server_name, cpu_count, mem_capacity, cpu_limit, cpu_time_active, cpu_time_idle, cpu_time_steal, cpu_time_lost, uptime, downtime, provision_time, boot_time, boot_time_absolute -| Service columns : timestamp, servers_active, servers_pending +| Task columns : timestamp, timestamp_absolute, task_id, task_name, cpu_count, mem_capacity, cpu_limit, cpu_time_active, cpu_time_idle, cpu_time_steal, cpu_time_lost, uptime, downtime, provision_time, boot_time, boot_time_absolute +| Service columns : timestamp, tasks_active, tasks_pending ``` diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt deleted file mode 100644 index fb83bf06..00000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.table - -/** - * Static information about a server exposed to the telemetry service. - */ -public data class ServerInfo( - val id: String, - val name: String, - val type: String, - val arch: String, - val imageId: String, - val imageName: String, - val cpuCount: Int, - val memCapacity: Long, -) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt deleted file mode 100644 index a1aed778..00000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.table - -import org.opendc.compute.telemetry.export.parquet.DfltServerExportColumns -import org.opendc.trace.util.parquet.exporter.Exportable -import java.time.Instant - -/** - * An interface that is used to read a row of a server trace entry. - */ -public interface ServerTableReader : Exportable { - public fun copy(): ServerTableReader - - public fun setValues(table: ServerTableReader) - - /** - * The timestamp of the current entry of the reader relative to the start of the workload. - */ - public val timestamp: Instant - - /** - * The timestamp of the current entry of the reader. - */ - public val timestampAbsolute: Instant - - /** - * The [ServerInfo] of the server to which the row belongs to. - */ - public val server: ServerInfo - - /** - * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. - */ - public val host: HostInfo? - - /** - * The uptime of the host since last time in ms. - */ - public val uptime: Long - - /** - * The downtime of the host since last time in ms. - */ - public val downtime: Long - - /** - * The [Instant] at which the server was enqueued for the scheduler. - */ - public val provisionTime: Instant? - - /** - * The [Instant] at which the server booted relative to the start of the workload. - */ - public val bootTime: Instant? - - /** - * The [Instant] at which the server booted. - */ - public val bootTimeAbsolute: Instant? - - /** - * The capacity of the CPUs of Host on which the server is running (in MHz). - */ - public val cpuLimit: Double - - /** - * The duration (in seconds) that a CPU was active in the server. - */ - public val cpuActiveTime: Long - - /** - * The duration (in seconds) that a CPU was idle in the server. - */ - public val cpuIdleTime: Long - - /** - * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. - */ - public val cpuStealTime: Long - - /** - * The duration (in seconds) of CPU time that was lost due to interference. - */ - public val cpuLostTime: Long -} - -// Loads the default export fields for deserialization whenever this file is loaded. -private val _ignore = DfltServerExportColumns diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt index ad4b3d49..7a8ba6a7 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt @@ -31,9 +31,9 @@ public data class ServiceData( val timestamp: Instant, val hostsUp: Int, val hostsDown: Int, - val serversTotal: Int, - val serversPending: Int, - val serversActive: Int, + val tasksTotal: Int, + val tasksPending: Int, + val tasksActive: Int, val attemptsSuccess: Int, val attemptsFailure: Int, val attemptsError: Int, @@ -47,9 +47,9 @@ public fun ServiceTableReader.toServiceData(): ServiceData { timestamp, hostsUp, hostsDown, - serversTotal, - serversPending, - serversActive, + tasksTotal, + tasksPending, + tasksActive, attemptsSuccess, attemptsFailure, attemptsError, diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt index c3a92fc7..23630fb4 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt @@ -54,19 +54,19 @@ public interface ServiceTableReader : Exportable { public val hostsDown: Int /** - * The number of servers that are registered with the compute service. + * The number of tasks that are registered with the compute service. */ - public val serversTotal: Int + public val tasksTotal: Int /** - * The number of servers that are pending to be scheduled. + * The number of tasks that are pending to be scheduled. */ - public val serversPending: Int + public val tasksPending: Int /** - * The number of servers that are currently active. + * The number of tasks that are currently active. */ - public val serversActive: Int + public val tasksActive: Int /** * The scheduling attempts that were successful. diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskInfo.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskInfo.kt new file mode 100644 index 00000000..2d1ae91a --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.table + +/** + * Static information about a task exposed to the telemetry service. + */ +public data class TaskInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long, +) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt new file mode 100644 index 00000000..1e38d5eb --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.table + +import org.opendc.compute.telemetry.export.parquet.DfltTaskExportColumns +import org.opendc.trace.util.parquet.exporter.Exportable +import java.time.Instant + +/** + * An interface that is used to read a row of a task trace entry. + */ +public interface TaskTableReader : Exportable { + public fun copy(): TaskTableReader + + public fun setValues(table: TaskTableReader) + + /** + * The timestamp of the current entry of the reader relative to the start of the workload. + */ + public val timestamp: Instant + + /** + * The timestamp of the current entry of the reader. + */ + public val timestampAbsolute: Instant + + /** + * The [TaskInfo] of the task to which the row belongs to. + */ + public val task: TaskInfo + + /** + * The [HostInfo] of the host on which the task is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the task was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the task booted relative to the start of the workload. + */ + public val bootTime: Instant? + + /** + * The [Instant] at which the task booted. + */ + public val bootTimeAbsolute: Instant? + + /** + * The capacity of the CPUs of Host on which the task is running (in MHz). + */ + public val cpuLimit: Double + + /** + * The duration (in seconds) that a CPU was active in the task. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the task. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long +} + +// Loads the default export fields for deserialization whenever this file is loaded. +private val _ignore = DfltTaskExportColumns diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt index 8723f88b..dd46c7c5 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -33,7 +33,7 @@ import org.opendc.compute.workload.internal.TraceComputeWorkload * Construct a workload from a trace. */ public fun trace( - name: String, + name: String = "", format: String = "opendc-vm", ): ComputeWorkload = TraceComputeWorkload(name, format) -- cgit v1.2.3