summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-09 17:57:57 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-09 17:57:57 +0100
commitb3a271794d64bd97ef93abf650137c5a0a1785df (patch)
tree9a01a045a0feb950df250dbdd3d672705b552e30 /simulator
parent79f37dd7d5184d2b8bf7cbe08da00785e4daed02 (diff)
compute: Extend capabilities of compute API
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt14
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt29
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt246
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt126
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt2
5 files changed, 278 insertions, 139 deletions
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
index 8ae08284..4fd32f98 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
@@ -22,11 +22,25 @@
package org.opendc.compute.api
+import java.util.UUID
+
/**
* A client interface for the OpenDC Compute service.
*/
public interface ComputeClient : AutoCloseable {
/**
+ * Obtain the list of [Server]s accessible by the requesting user.
+ */
+ public suspend fun queryServers(): List<Server>
+
+ /**
+ * Obtain a [Server] by its unique identifier.
+ *
+ * @param id The identifier of the server.
+ */
+ public suspend fun findServer(id: UUID): Server?
+
+ /**
* Create a new [Server] instance at this compute service.
*
* @param name The name of the server to deploy.
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt
new file mode 100644
index 00000000..8fbb7308
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.api
+
+/**
+ * This exception is thrown to indicate that the compute service does not have enough capacity at the moment to
+ * fulfill a launch request.
+ */
+public class InsufficientServerCapacityException(override val cause: Throwable? = null) : Exception("There was insufficient capacity available to satisfy the launch request")
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 0d4f379d..3feb80ad 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,10 +22,8 @@
package org.opendc.compute.service.internal
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.cancel
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
@@ -84,13 +82,18 @@ public class ComputeServiceImpl(
/**
* The servers that should be launched by the service.
*/
- private val queue: Deque<LaunchRequest> = ArrayDeque()
+ private val queue: Deque<SchedulingRequest> = ArrayDeque()
/**
* The active servers in the system.
*/
private val activeServers: MutableMap<Server, Host> = mutableMapOf()
+ /**
+ * The registered servers for this compute service.
+ */
+ private val servers = mutableMapOf<UUID, InternalServer>()
+
public var submittedVms: Int = 0
public var queuedVms: Int = 0
public var runningVms: Int = 0
@@ -147,7 +150,8 @@ public class ComputeServiceImpl(
)
)
- val server = ServerImpl(
+ val server = InternalServer(
+ this@ComputeServiceImpl,
uid = UUID(random.nextLong(), random.nextLong()),
name,
flavor,
@@ -162,6 +166,18 @@ public class ComputeServiceImpl(
return ClientServer(server)
}
+ override suspend fun findServer(id: UUID): Server? {
+ check(!isClosed) { "Client is already closed" }
+
+ return servers[id]?.let { ClientServer(it) }
+ }
+
+ override suspend fun queryServers(): List<Server> {
+ check(!isClosed) { "Client is already closed" }
+
+ return servers.values.map { ClientServer(it) }
+ }
+
override fun close() {
isClosed = true
}
@@ -195,9 +211,19 @@ public class ComputeServiceImpl(
scope.cancel()
}
- private fun requestCycle() {
- // Bail out in case we have already requested a new cycle.
- if (scheduler.isTimerActive(Unit)) {
+ internal fun schedule(server: InternalServer) {
+ logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
+
+ queue.add(SchedulingRequest(server))
+ requestSchedulingCycle()
+ }
+
+ /**
+ * Indicate that a new scheduling cycle is needed due to a change to the service's state.
+ */
+ private fun requestSchedulingCycle() {
+ // Bail out in case we have already requested a new cycle or the queue is empty.
+ if (scheduler.isTimerActive(Unit) || queue.isEmpty()) {
return
}
@@ -207,20 +233,28 @@ public class ComputeServiceImpl(
val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
scheduler.startSingleTimer(Unit, delay) {
- schedule()
+ doSchedule()
}
}
- private fun schedule() {
+ /**
+ * Run a single scheduling iteration.
+ */
+ private fun doSchedule() {
while (queue.isNotEmpty()) {
- val (server) = queue.peekFirst()
- val requiredMemory = server.flavor.memorySize
- val selectedHv = allocationLogic.select(availableHosts, server)
+ val request = queue.peek()
+
+ if (request.isCancelled) {
+ queue.poll()
+ continue
+ }
- if (selectedHv == null || !selectedHv.host.canFit(server)) {
+ val server = request.server
+ val hv = allocationLogic.select(availableHosts, request.server)
+ if (hv == null || !hv.host.canFit(server)) {
logger.trace { "Server $server selected for scheduling but no capacity available for it." }
- if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) {
+ if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
tracer.commit(VmSubmissionInvalidEvent(server.name))
_events.emit(
@@ -246,44 +280,62 @@ public class ComputeServiceImpl(
}
}
- logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.host.uid} ${selectedHv.host.name} ${selectedHv.host.model}" }
- queue.poll()
-
- // Speculatively update the hypervisor view information to prevent other images in the queue from
- // deciding on stale values.
- selectedHv.numberOfActiveServers++
- selectedHv.provisionedCores += server.flavor.cpuCount
- selectedHv.availableMemory -= requiredMemory // XXX Temporary hack
+ val host = hv.host
- scope.launch {
- try {
- selectedHv.host.spawn(server)
- activeServers[server] = selectedHv.host
+ // Remove request from queue
+ queue.poll()
- tracer.commit(VmScheduledEvent(server.name))
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
+ logger.info { "Assigned server $server to host $host." }
+ try {
+ // Speculatively update the hypervisor view information to prevent other images in the queue from
+ // deciding on stale values.
+ hv.numberOfActiveServers++
+ hv.provisionedCores += server.flavor.cpuCount
+ hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack
+
+ scope.launch {
+ try {
+ server.assignHost(host)
+ host.spawn(server)
+ activeServers[server] = host
+
+ tracer.commit(VmScheduledEvent(server.name))
+ _events.emit(
+ ComputeServiceEvent.MetricsAvailable(
+ this@ComputeServiceImpl,
+ hostCount,
+ availableHosts.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ )
)
- )
- } catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
+ } catch (e: Throwable) {
+ logger.error("Failed to deploy VM", e)
- selectedHv.numberOfActiveServers--
- selectedHv.provisionedCores -= server.flavor.cpuCount
- selectedHv.availableMemory += requiredMemory
+ hv.numberOfActiveServers--
+ hv.provisionedCores -= server.flavor.cpuCount
+ hv.availableMemory += server.flavor.memorySize
+ }
}
+ } catch (e: Exception) {
+ logger.warn(e) { "Failed to assign server $server to $host. " }
}
}
}
+ /**
+ * A request to schedule an [InternalServer] onto one of the [Host]s.
+ */
+ private data class SchedulingRequest(val server: InternalServer) {
+ /**
+ * A flag to indicate that the request is cancelled.
+ */
+ var isCancelled: Boolean = false
+ }
+
override fun onStateChanged(host: Host, newState: HostState) {
when (newState) {
HostState.UP -> {
@@ -311,9 +363,7 @@ public class ComputeServiceImpl(
)
// Re-schedule on the new machine
- if (queue.isNotEmpty()) {
- requestCycle()
- }
+ requestSchedulingCycle()
}
HostState.DOWN -> {
logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
@@ -336,16 +386,21 @@ public class ComputeServiceImpl(
)
)
- if (queue.isNotEmpty()) {
- requestCycle()
- }
+ requestSchedulingCycle()
}
}
}
override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
- val serverImpl = server as ServerImpl
- serverImpl.state = newState
+ require(server is InternalServer) { "Invalid server type passed to service" }
+
+ if (server.host != host) {
+ // This can happen when a server is rescheduled and started on another machine, while being deleted from
+ // the old machine.
+ return
+ }
+
+ server.state = newState
if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
@@ -376,92 +431,7 @@ public class ComputeServiceImpl(
}
// Try to reschedule if needed
- if (queue.isNotEmpty()) {
- requestCycle()
- }
+ requestSchedulingCycle()
}
}
-
- private data class LaunchRequest(val server: ServerImpl)
-
- private inner class ServerImpl(
- override val uid: UUID,
- override val name: String,
- override val flavor: Flavor,
- override val image: Image,
- override val labels: MutableMap<String, String>,
- override val meta: MutableMap<String, Any>
- ) : Server {
- val watchers = mutableListOf<ServerWatcher>()
-
- override suspend fun start() {
- when (state) {
- ServerState.RUNNING -> {
- logger.debug { "User tried to start server but server is already running" }
- return
- }
- ServerState.PROVISIONING -> {
- logger.debug { "User tried to start server but request is already pending: doing nothing" }
- return
- }
- ServerState.DELETED -> {
- logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
- }
- else -> {
- logger.info { "User requested to start server $uid" }
- state = ServerState.PROVISIONING
- val request = LaunchRequest(this)
- queue += request
- requestCycle()
- }
- }
- }
-
- override suspend fun stop() {
- when (state) {
- ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
- ServerState.RUNNING, ServerState.ERROR -> {
- // Warn: possible race condition on activeServers
- val host = checkNotNull(activeServers[this]) { "Server not running" }
- host.stop(this)
- }
- ServerState.TERMINATED -> {} // No work needed
- ServerState.DELETED -> throw IllegalStateException("Server is terminated")
- }
- }
-
- override suspend fun delete() {
- when (state) {
- ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
- ServerState.RUNNING -> {
- // Warn: possible race condition on activeServers
- val host = checkNotNull(activeServers[this]) { "Server not running" }
- host.delete(this)
- }
- else -> {} // No work needed
- }
- }
-
- override fun watch(watcher: ServerWatcher) {
- watchers += watcher
- }
-
- override fun unwatch(watcher: ServerWatcher) {
- watchers -= watcher
- }
-
- override suspend fun refresh() {
- // No-op: this object is the source-of-truth
- }
-
- override var state: ServerState = ServerState.TERMINATED
- set(value) {
- if (value != field) {
- watchers.forEach { it.onStateChanged(this, value) }
- }
-
- field = value
- }
- }
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
new file mode 100644
index 00000000..2656a488
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.api.*
+import org.opendc.compute.service.driver.Host
+import java.util.UUID
+
+/**
+ * Internal implementation of the [Server] interface.
+ */
+internal class InternalServer(
+ private val service: ComputeServiceImpl,
+ override val uid: UUID,
+ override val name: String,
+ override val flavor: Flavor,
+ override val image: Image,
+ override val labels: MutableMap<String, String>,
+ override val meta: MutableMap<String, Any>
+) : Server {
+ /**
+ * The logger instance of this server.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The watchers of this server object.
+ */
+ private val watchers = mutableListOf<ServerWatcher>()
+
+ /**
+ * The [Host] that has been assigned to host the server.
+ */
+ internal var host: Host? = null
+
+ override suspend fun start() {
+ when (state) {
+ ServerState.RUNNING -> {
+ logger.debug { "User tried to start server but server is already running" }
+ return
+ }
+ ServerState.PROVISIONING -> {
+ logger.debug { "User tried to start server but request is already pending: doing nothing" }
+ return
+ }
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
+ else -> {
+ logger.info { "User requested to start server $uid" }
+ state = ServerState.PROVISIONING
+ service.schedule(this)
+ }
+ }
+ }
+
+ override suspend fun stop() {
+ when (state) {
+ ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
+ ServerState.RUNNING, ServerState.ERROR -> {
+ val host = checkNotNull(host) { "Server not running" }
+ host.stop(this)
+ }
+ ServerState.TERMINATED -> {} // No work needed
+ ServerState.DELETED -> throw IllegalStateException("Server is terminated")
+ }
+ }
+
+ override suspend fun delete() {
+ when (state) {
+ ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
+ ServerState.RUNNING -> {
+ val host = checkNotNull(host) { "Server not running" }
+ host.delete(this)
+ }
+ else -> {} // No work needed
+ }
+ }
+
+ override fun watch(watcher: ServerWatcher) {
+ watchers += watcher
+ }
+
+ override fun unwatch(watcher: ServerWatcher) {
+ watchers -= watcher
+ }
+
+ override suspend fun refresh() {
+ // No-op: this object is the source-of-truth
+ }
+
+ override var state: ServerState = ServerState.TERMINATED
+ set(value) {
+ if (value != field) {
+ watchers.forEach { it.onStateChanged(this, value) }
+ }
+
+ field = value
+ }
+
+ internal fun assignHost(host: Host) {
+ this.host = host
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index b18964a8..19fa3e97 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -255,7 +255,7 @@ public class SimHost(
* A virtual machine instance that the driver manages.
*/
private inner class Guest(val server: Server, val machine: SimMachine) {
- val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
var state: ServerState = ServerState.TERMINATED