From b3a271794d64bd97ef93abf650137c5a0a1785df Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 9 Mar 2021 17:57:57 +0100 Subject: compute: Extend capabilities of compute API --- .../kotlin/org/opendc/compute/api/ComputeClient.kt | 14 ++ .../api/InsufficientServerCapacityException.kt | 29 +++ .../compute/service/internal/ComputeServiceImpl.kt | 246 +++++++++------------ .../compute/service/internal/InternalServer.kt | 126 +++++++++++ .../kotlin/org/opendc/compute/simulator/SimHost.kt | 2 +- 5 files changed, 278 insertions(+), 139 deletions(-) create mode 100644 simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt create mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt (limited to 'simulator/opendc-compute') diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt index 8ae08284..4fd32f98 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -22,10 +22,24 @@ package org.opendc.compute.api +import java.util.UUID + /** * A client interface for the OpenDC Compute service. */ public interface ComputeClient : AutoCloseable { + /** + * Obtain the list of [Server]s accessible by the requesting user. + */ + public suspend fun queryServers(): List + + /** + * Obtain a [Server] by its unique identifier. + * + * @param id The identifier of the server. + */ + public suspend fun findServer(id: UUID): Server? + /** * Create a new [Server] instance at this compute service. * diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt new file mode 100644 index 00000000..8fbb7308 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.api + +/** + * This exception is thrown to indicate that the compute service does not have enough capacity at the moment to + * fulfill a launch request. + */ +public class InsufficientServerCapacityException(override val cause: Throwable? = null) : Exception("There was insufficient capacity available to satisfy the launch request") diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 0d4f379d..3feb80ad 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,10 +22,8 @@ package org.opendc.compute.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.cancel +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService @@ -84,13 +82,18 @@ public class ComputeServiceImpl( /** * The servers that should be launched by the service. */ - private val queue: Deque = ArrayDeque() + private val queue: Deque = ArrayDeque() /** * The active servers in the system. */ private val activeServers: MutableMap = mutableMapOf() + /** + * The registered servers for this compute service. + */ + private val servers = mutableMapOf() + public var submittedVms: Int = 0 public var queuedVms: Int = 0 public var runningVms: Int = 0 @@ -147,7 +150,8 @@ public class ComputeServiceImpl( ) ) - val server = ServerImpl( + val server = InternalServer( + this@ComputeServiceImpl, uid = UUID(random.nextLong(), random.nextLong()), name, flavor, @@ -162,6 +166,18 @@ public class ComputeServiceImpl( return ClientServer(server) } + override suspend fun findServer(id: UUID): Server? { + check(!isClosed) { "Client is already closed" } + + return servers[id]?.let { ClientServer(it) } + } + + override suspend fun queryServers(): List { + check(!isClosed) { "Client is already closed" } + + return servers.values.map { ClientServer(it) } + } + override fun close() { isClosed = true } @@ -195,9 +211,19 @@ public class ComputeServiceImpl( scope.cancel() } - private fun requestCycle() { - // Bail out in case we have already requested a new cycle. - if (scheduler.isTimerActive(Unit)) { + internal fun schedule(server: InternalServer) { + logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } + + queue.add(SchedulingRequest(server)) + requestSchedulingCycle() + } + + /** + * Indicate that a new scheduling cycle is needed due to a change to the service's state. + */ + private fun requestSchedulingCycle() { + // Bail out in case we have already requested a new cycle or the queue is empty. + if (scheduler.isTimerActive(Unit) || queue.isEmpty()) { return } @@ -207,20 +233,28 @@ public class ComputeServiceImpl( val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) scheduler.startSingleTimer(Unit, delay) { - schedule() + doSchedule() } } - private fun schedule() { + /** + * Run a single scheduling iteration. + */ + private fun doSchedule() { while (queue.isNotEmpty()) { - val (server) = queue.peekFirst() - val requiredMemory = server.flavor.memorySize - val selectedHv = allocationLogic.select(availableHosts, server) + val request = queue.peek() + + if (request.isCancelled) { + queue.poll() + continue + } - if (selectedHv == null || !selectedHv.host.canFit(server)) { + val server = request.server + val hv = allocationLogic.select(availableHosts, request.server) + if (hv == null || !hv.host.canFit(server)) { logger.trace { "Server $server selected for scheduling but no capacity available for it." } - if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) { + if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { tracer.commit(VmSubmissionInvalidEvent(server.name)) _events.emit( @@ -246,44 +280,62 @@ public class ComputeServiceImpl( } } - logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.host.uid} ${selectedHv.host.name} ${selectedHv.host.model}" } - queue.poll() - - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - selectedHv.numberOfActiveServers++ - selectedHv.provisionedCores += server.flavor.cpuCount - selectedHv.availableMemory -= requiredMemory // XXX Temporary hack + val host = hv.host - scope.launch { - try { - selectedHv.host.spawn(server) - activeServers[server] = selectedHv.host + // Remove request from queue + queue.poll() - tracer.commit(VmScheduledEvent(server.name)) - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms + logger.info { "Assigned server $server to host $host." } + try { + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + hv.numberOfActiveServers++ + hv.provisionedCores += server.flavor.cpuCount + hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack + + scope.launch { + try { + server.assignHost(host) + host.spawn(server) + activeServers[server] = host + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) ) - ) - } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + } catch (e: Throwable) { + logger.error("Failed to deploy VM", e) - selectedHv.numberOfActiveServers-- - selectedHv.provisionedCores -= server.flavor.cpuCount - selectedHv.availableMemory += requiredMemory + hv.numberOfActiveServers-- + hv.provisionedCores -= server.flavor.cpuCount + hv.availableMemory += server.flavor.memorySize + } } + } catch (e: Exception) { + logger.warn(e) { "Failed to assign server $server to $host. " } } } } + /** + * A request to schedule an [InternalServer] onto one of the [Host]s. + */ + private data class SchedulingRequest(val server: InternalServer) { + /** + * A flag to indicate that the request is cancelled. + */ + var isCancelled: Boolean = false + } + override fun onStateChanged(host: Host, newState: HostState) { when (newState) { HostState.UP -> { @@ -311,9 +363,7 @@ public class ComputeServiceImpl( ) // Re-schedule on the new machine - if (queue.isNotEmpty()) { - requestCycle() - } + requestSchedulingCycle() } HostState.DOWN -> { logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } @@ -336,16 +386,21 @@ public class ComputeServiceImpl( ) ) - if (queue.isNotEmpty()) { - requestCycle() - } + requestSchedulingCycle() } } } override fun onStateChanged(host: Host, server: Server, newState: ServerState) { - val serverImpl = server as ServerImpl - serverImpl.state = newState + require(server is InternalServer) { "Invalid server type passed to service" } + + if (server.host != host) { + // This can happen when a server is rescheduled and started on another machine, while being deleted from + // the old machine. + return + } + + server.state = newState if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } @@ -376,92 +431,7 @@ public class ComputeServiceImpl( } // Try to reschedule if needed - if (queue.isNotEmpty()) { - requestCycle() - } + requestSchedulingCycle() } } - - private data class LaunchRequest(val server: ServerImpl) - - private inner class ServerImpl( - override val uid: UUID, - override val name: String, - override val flavor: Flavor, - override val image: Image, - override val labels: MutableMap, - override val meta: MutableMap - ) : Server { - val watchers = mutableListOf() - - override suspend fun start() { - when (state) { - ServerState.RUNNING -> { - logger.debug { "User tried to start server but server is already running" } - return - } - ServerState.PROVISIONING -> { - logger.debug { "User tried to start server but request is already pending: doing nothing" } - return - } - ServerState.DELETED -> { - logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") - } - else -> { - logger.info { "User requested to start server $uid" } - state = ServerState.PROVISIONING - val request = LaunchRequest(this) - queue += request - requestCycle() - } - } - } - - override suspend fun stop() { - when (state) { - ServerState.PROVISIONING -> {} // TODO Find way to interrupt these - ServerState.RUNNING, ServerState.ERROR -> { - // Warn: possible race condition on activeServers - val host = checkNotNull(activeServers[this]) { "Server not running" } - host.stop(this) - } - ServerState.TERMINATED -> {} // No work needed - ServerState.DELETED -> throw IllegalStateException("Server is terminated") - } - } - - override suspend fun delete() { - when (state) { - ServerState.PROVISIONING -> {} // TODO Find way to interrupt these - ServerState.RUNNING -> { - // Warn: possible race condition on activeServers - val host = checkNotNull(activeServers[this]) { "Server not running" } - host.delete(this) - } - else -> {} // No work needed - } - } - - override fun watch(watcher: ServerWatcher) { - watchers += watcher - } - - override fun unwatch(watcher: ServerWatcher) { - watchers -= watcher - } - - override suspend fun refresh() { - // No-op: this object is the source-of-truth - } - - override var state: ServerState = ServerState.TERMINATED - set(value) { - if (value != field) { - watchers.forEach { it.onStateChanged(this, value) } - } - - field = value - } - } } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt new file mode 100644 index 00000000..2656a488 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.internal + +import mu.KotlinLogging +import org.opendc.compute.api.* +import org.opendc.compute.service.driver.Host +import java.util.UUID + +/** + * Internal implementation of the [Server] interface. + */ +internal class InternalServer( + private val service: ComputeServiceImpl, + override val uid: UUID, + override val name: String, + override val flavor: Flavor, + override val image: Image, + override val labels: MutableMap, + override val meta: MutableMap +) : Server { + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The watchers of this server object. + */ + private val watchers = mutableListOf() + + /** + * The [Host] that has been assigned to host the server. + */ + internal var host: Host? = null + + override suspend fun start() { + when (state) { + ServerState.RUNNING -> { + logger.debug { "User tried to start server but server is already running" } + return + } + ServerState.PROVISIONING -> { + logger.debug { "User tried to start server but request is already pending: doing nothing" } + return + } + ServerState.DELETED -> { + logger.warn { "User tried to start terminated server" } + throw IllegalArgumentException("Server is terminated") + } + else -> { + logger.info { "User requested to start server $uid" } + state = ServerState.PROVISIONING + service.schedule(this) + } + } + } + + override suspend fun stop() { + when (state) { + ServerState.PROVISIONING -> {} // TODO Find way to interrupt these + ServerState.RUNNING, ServerState.ERROR -> { + val host = checkNotNull(host) { "Server not running" } + host.stop(this) + } + ServerState.TERMINATED -> {} // No work needed + ServerState.DELETED -> throw IllegalStateException("Server is terminated") + } + } + + override suspend fun delete() { + when (state) { + ServerState.PROVISIONING -> {} // TODO Find way to interrupt these + ServerState.RUNNING -> { + val host = checkNotNull(host) { "Server not running" } + host.delete(this) + } + else -> {} // No work needed + } + } + + override fun watch(watcher: ServerWatcher) { + watchers += watcher + } + + override fun unwatch(watcher: ServerWatcher) { + watchers -= watcher + } + + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override var state: ServerState = ServerState.TERMINATED + set(value) { + if (value != field) { + watchers.forEach { it.onStateChanged(this, value) } + } + + field = value + } + + internal fun assignHost(host: Host) { + this.host = host + } +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index b18964a8..19fa3e97 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -255,7 +255,7 @@ public class SimHost( * A virtual machine instance that the driver manages. */ private inner class Guest(val server: Server, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? var state: ServerState = ServerState.TERMINATED -- cgit v1.2.3