summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-09 12:57:12 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-09 13:03:20 +0100
commit970f5c6f653c8442ecd9b73b208a53a2dbb9a150 (patch)
treeaa5724ee239cb831e48d45aef8327d7b487e2ad1
parente97774dbf274fcb57b9d173f9d674a2ef1b982af (diff)
compute: Add lifecycle methods for Server instances
This change adds more methods for controlling the lifecycle of Server instances.
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt15
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt18
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt15
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt99
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt33
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt8
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt12
9 files changed, 162 insertions, 48 deletions
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
index 025513e6..c7c507ed 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
@@ -32,11 +32,13 @@ public interface ComputeClient : AutoCloseable {
* @param name The name of the server to deploy.
* @param image The image to be deployed.
* @param flavor The flavor of the machine instance to run this [image] on.
+ * @param start A flag to indicate that the server should be started immediately.
*/
public suspend fun newServer(
name: String,
image: Image,
- flavor: Flavor
+ flavor: Flavor,
+ start: Boolean = true
): Server
/**
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
index ab1eb860..a4f61c03 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
@@ -54,6 +54,21 @@ public interface Server : Resource {
public val state: ServerState
/**
+ * Start the server.
+ */
+ public suspend fun start()
+
+ /**
+ * Stop the server.
+ */
+ public suspend fun stop()
+
+ /**
+ * Delete the server instance from the service.
+ */
+ public suspend fun delete()
+
+ /**
* Register the specified [ServerWatcher] to watch the state of the server.
*
* @param watcher The watcher to register for the server.
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt
index 25d2e519..a4d7d7d7 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt
@@ -27,27 +27,27 @@ package org.opendc.compute.api
*/
public enum class ServerState {
/**
- * The server has not yet finished the original build process.
+ * Resources are being allocated for the instance. The instance is not running yet.
*/
- BUILD,
+ PROVISIONING,
/**
- * The server was powered down by the user.
+ * A user shut down the instance.
*/
- SHUTOFF,
+ TERMINATED,
/**
- * The server is active and running.
+ * The server instance is booting up or running.
*/
- ACTIVE,
+ RUNNING,
/**
- * The server is in error.
+ * The server is in an error state.
*/
ERROR,
/**
- * The state of the server is unknown.
+ * The server has been deleted and cannot be started later on.
*/
- UNKNOWN,
+ DELETED,
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
index 060c35fd..c3c39572 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -79,7 +79,7 @@ public interface Host {
public operator fun contains(server: Server): Boolean
/**
- * Stat the server [instance][server] if it is currently not running on this host.
+ * Start the server [instance][server] if it is currently not running on this host.
*
* @throws IllegalArgumentException if the server is not present on the host.
*/
@@ -93,9 +93,9 @@ public interface Host {
public suspend fun stop(server: Server)
/**
- * Terminate the specified [instance][server] on this host and cleanup all resources associated with it.
+ * Delete the specified [instance][server] on this host and cleanup all resources associated with it.
*/
- public suspend fun terminate(server: Server)
+ public suspend fun delete(server: Server)
/**
* Add a [HostListener] to this host.
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
index f84b7435..e65c5f1c 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
@@ -52,6 +52,21 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche
override var state: ServerState = delegate.state
private set
+ override suspend fun start() {
+ delegate.start()
+ refresh()
+ }
+
+ override suspend fun stop() {
+ delegate.stop()
+ refresh()
+ }
+
+ override suspend fun delete() {
+ delegate.delete()
+ refresh()
+ }
+
override fun watch(watcher: ServerWatcher) {
if (watchers.isEmpty()) {
delegate.watch(this)
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 69d6bb59..453f5d65 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -92,7 +92,7 @@ public class ComputeServiceImpl(
/**
* The active servers in the system.
*/
- private val activeServers: MutableSet<Server> = mutableSetOf()
+ private val activeServers: MutableMap<Server, Host> = mutableMapOf()
public var submittedVms: Int = 0
public var queuedVms: Int = 0
@@ -126,7 +126,12 @@ public class ComputeServiceImpl(
override fun newClient(): ComputeClient = object : ComputeClient {
private var isClosed: Boolean = false
- override suspend fun newServer(name: String, image: Image, flavor: Flavor): Server {
+ override suspend fun newServer(
+ name: String,
+ image: Image,
+ flavor: Flavor,
+ start: Boolean
+ ): Server {
check(!isClosed) { "Client is closed" }
tracer.commit(VmSubmissionEvent(name, image, flavor))
@@ -143,11 +148,12 @@ public class ComputeServiceImpl(
)
)
- return suspendCancellableCoroutine { cont ->
- val request = LaunchRequest(createServer(name, image, flavor), cont)
- queue += request
- requestCycle()
+ val server = createServer(name, image, flavor)
+ if (start) {
+ server.start()
}
+
+ return ClientServer(server)
}
override fun close() {
@@ -186,13 +192,13 @@ public class ComputeServiceImpl(
private fun createServer(
name: String,
image: Image,
- flavor: Flavor
- ): Server {
+ flavor: Flavor,
+ ): ServerImpl {
return ServerImpl(
uid = UUID(random.nextLong(), random.nextLong()),
- name = name,
- flavor = flavor,
- image = image
+ name,
+ flavor,
+ image
)
}
@@ -258,9 +264,9 @@ public class ComputeServiceImpl(
scope.launch {
try {
- cont.resume(ClientServer(server))
selectedHv.host.spawn(server)
- activeServers += server
+ cont.resume(Unit)
+ activeServers[server] = selectedHv.host
tracer.commit(VmScheduledEvent(server.name))
_events.emit(
@@ -348,9 +354,8 @@ public class ComputeServiceImpl(
override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
val serverImpl = server as ServerImpl
serverImpl.state = newState
- serverImpl.watchers.forEach { it.onStateChanged(server, newState) }
- if (newState == ServerState.SHUTOFF) {
+ if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
tracer.commit(VmStoppedEvent(server.name))
@@ -385,9 +390,9 @@ public class ComputeServiceImpl(
}
}
- public data class LaunchRequest(val server: Server, val cont: Continuation<Server>)
+ private data class LaunchRequest(val server: ServerImpl, val cont: Continuation<Unit>)
- private class ServerImpl(
+ private inner class ServerImpl(
override val uid: UUID,
override val name: String,
override val flavor: Flavor,
@@ -395,6 +400,57 @@ public class ComputeServiceImpl(
) : Server {
val watchers = mutableListOf<ServerWatcher>()
+ override suspend fun start() {
+ when (state) {
+ ServerState.RUNNING -> {
+ logger.debug { "User tried to start server but server is already running" }
+ return
+ }
+ ServerState.PROVISIONING -> {
+ logger.debug { "User tried to start server but request is already pending: doing nothing" }
+ return
+ }
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
+ else -> {
+ logger.info { "User requested to start server $uid" }
+ state = ServerState.PROVISIONING
+ suspendCancellableCoroutine<Unit> { cont ->
+ val request = LaunchRequest(this, cont)
+ queue += request
+ requestCycle()
+ }
+ }
+ }
+ }
+
+ override suspend fun stop() {
+ when (state) {
+ ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
+ ServerState.RUNNING, ServerState.ERROR -> {
+ // Warn: possible race condition on activeServers
+ val host = checkNotNull(activeServers[this]) { "Server not running" }
+ host.stop(this)
+ }
+ ServerState.TERMINATED -> {} // No work needed
+ ServerState.DELETED -> throw IllegalStateException("Server is terminated")
+ }
+ }
+
+ override suspend fun delete() {
+ when (state) {
+ ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
+ ServerState.RUNNING -> {
+ // Warn: possible race condition on activeServers
+ val host = checkNotNull(activeServers[this]) { "Server not running" }
+ host.delete(this)
+ }
+ else -> {} // No work needed
+ }
+ }
+
override fun watch(watcher: ServerWatcher) {
watchers += watcher
}
@@ -409,6 +465,13 @@ public class ComputeServiceImpl(
override val tags: Map<String, String> = emptyMap()
- override var state: ServerState = ServerState.BUILD
+ override var state: ServerState = ServerState.TERMINATED
+ set(value) {
+ if (value != field) {
+ watchers.forEach { it.onStateChanged(this, value) }
+ }
+
+ field = value
+ }
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index e2d70dce..fb55244b 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -125,7 +125,9 @@ public class SimHost(
get() = _state
private var _state: HostState = HostState.DOWN
set(value) {
- listeners.forEach { it.onStateChanged(this, value) }
+ if (value != field) {
+ listeners.forEach { it.onStateChanged(this, value) }
+ }
field = value
}
@@ -172,7 +174,7 @@ public class SimHost(
guest.start()
}
- _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override fun contains(server: Server): Boolean {
@@ -189,7 +191,7 @@ public class SimHost(
guest.stop()
}
- override suspend fun terminate(server: Server) {
+ override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
guest.terminate()
}
@@ -221,7 +223,7 @@ public class SimHost(
private fun onGuestStart(vm: Guest) {
guests.forEach { _, guest ->
- if (guest.state == ServerState.ACTIVE) {
+ if (guest.state == ServerState.RUNNING) {
vm.performanceInterferenceModel?.onStart(vm.server.image.name)
}
}
@@ -231,14 +233,14 @@ public class SimHost(
private fun onGuestStop(vm: Guest) {
guests.forEach { _, guest ->
- if (guest.state == ServerState.ACTIVE) {
+ if (guest.state == ServerState.RUNNING) {
vm.performanceInterferenceModel?.onStop(vm.server.image.name)
}
}
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override suspend fun fail() {
@@ -255,33 +257,38 @@ public class SimHost(
private inner class Guest(val server: Server, val machine: SimMachine) {
val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
- var state: ServerState = ServerState.SHUTOFF
+ var state: ServerState = ServerState.TERMINATED
suspend fun start() {
when (state) {
- ServerState.SHUTOFF -> {
+ ServerState.TERMINATED -> {
logger.info { "User requested to start server ${server.uid}" }
launch()
}
- ServerState.ACTIVE -> return
+ ServerState.RUNNING -> return
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
else -> assert(false) { "Invalid state transition" }
}
}
suspend fun stop() {
when (state) {
- ServerState.ACTIVE, ServerState.ERROR -> {
+ ServerState.RUNNING, ServerState.ERROR -> {
val job = job ?: throw IllegalStateException("Server should be active")
job.cancel()
job.join()
}
- ServerState.SHUTOFF -> return
+ ServerState.TERMINATED, ServerState.DELETED -> return
else -> assert(false) { "Invalid state transition" }
}
}
suspend fun terminate() {
stop()
+ state = ServerState.DELETED
}
private var job: Job? = null
@@ -310,14 +317,14 @@ public class SimHost(
}
private fun init() {
- state = ServerState.ACTIVE
+ state = ServerState.RUNNING
onGuestStart(this)
}
private fun exit(cause: Throwable?) {
state =
if (cause == null)
- ServerState.SHUTOFF
+ ServerState.TERMINATED
else
ServerState.ERROR
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index e48fd947..931d8d68 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -148,7 +148,13 @@ internal class SimHostTest {
override val image: Image
) : Server {
override val tags: Map<String, String> = emptyMap()
- override val state: ServerState = ServerState.BUILD
+ override val state: ServerState = ServerState.TERMINATED
+
+ override suspend fun start() {}
+
+ override suspend fun stop() {}
+
+ override suspend fun delete() {}
override fun watch(watcher: ServerWatcher) {}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index 6b348ed4..5ae503a7 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -261,13 +261,14 @@ public class StageWorkflowService(
val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task
val image = instance.task.image
coroutineScope.launch {
- val server = computeClient.newServer(instance.task.name, image, flavor)
+ val server = computeClient.newServer(instance.task.name, image, flavor, start = false)
instance.state = TaskStatus.ACTIVE
instance.server = server
taskByServer[server] = instance
server.watch(this@StageWorkflowService)
+ server.start()
}
activeTasks += instance
@@ -278,7 +279,8 @@ public class StageWorkflowService(
public override fun onStateChanged(server: Server, newState: ServerState) {
when (newState) {
- ServerState.ACTIVE -> {
+ ServerState.PROVISIONING -> {}
+ ServerState.RUNNING -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
tracer.commit(
@@ -290,8 +292,11 @@ public class StageWorkflowService(
)
rootListener.taskStarted(task)
}
- ServerState.SHUTOFF, ServerState.ERROR -> {
+ ServerState.TERMINATED, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
+
+ coroutineScope.launch { server.delete() }
+
val job = task.job
task.state = TaskStatus.FINISHED
task.finishedAt = clock.millis()
@@ -322,6 +327,7 @@ public class StageWorkflowService(
requestCycle()
}
+ ServerState.DELETED -> {}
else -> throw IllegalStateException()
}
}