summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-28 14:37:22 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-28 15:24:17 +0200
commitdd5bbd55fc6e25efdfe93ec16bd37c5350e04c16 (patch)
tree30b4cb964836b57632404ccd944fbf03e0eb32eb
parentc6f2d16a20bfac466480c0e98341b08b12fc0772 (diff)
refactor(compute/service): Do not suspend on guest start
This change updates the `Host` interface to remove the suspend modifiers to the start, stop, spawn, and delete methods of this interface. We now assume that the host immediately launches the guest on invocation of this method.
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt11
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt64
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt29
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt37
6 files changed, 65 insertions, 81 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
index fad8757e..efcc0f2c 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -70,11 +70,8 @@ public interface Host {
/**
* Register the specified [instance][server] on the host.
- *
- * Once the method returns, the instance should be running if [start] is true or else the instance should be
- * stopped.
*/
- public suspend fun spawn(server: Server, start: Boolean = true)
+ public fun spawn(server: Server)
/**
* Determine whether the specified [instance][server] exists on the host.
@@ -86,19 +83,19 @@ public interface Host {
*
* @throws IllegalArgumentException if the server is not present on the host.
*/
- public suspend fun start(server: Server)
+ public fun start(server: Server)
/**
* Stop the server [instance][server] if it is currently running on this host.
*
* @throws IllegalArgumentException if the server is not present on the host.
*/
- public suspend fun stop(server: Server)
+ public fun stop(server: Server)
/**
* Delete the specified [instance][server] on this host and cleanup all resources associated with it.
*/
- public suspend fun delete(server: Server)
+ public fun delete(server: Server)
/**
* Add a [HostListener] to this host.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 0fe016aa..b377c3e3 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,11 +22,6 @@
package org.opendc.compute.service.internal
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
@@ -53,23 +48,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use in the service.
+ * @param coroutineContext The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param scheduler The scheduler implementation to use.
* @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
- private val context: CoroutineContext,
+ coroutineContext: CoroutineContext,
private val clock: Clock,
private val scheduler: ComputeScheduler,
schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -115,6 +105,9 @@ internal class ComputeServiceImpl(
private val serverById = mutableMapOf<UUID, InternalServer>()
override val servers: MutableList<Server> = mutableListOf()
+ override val hosts: Set<Host>
+ get() = hostToView.keys
+
private var maxCores = 0
private var maxMemory = 0L
private var _attemptsSuccess = 0L
@@ -122,17 +115,15 @@ internal class ComputeServiceImpl(
private var _attemptsError = 0L
private var _serversPending = 0
private var _serversActive = 0
+ private var isClosed = false
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
-
- override val hosts: Set<Host>
- get() = hostToView.keys
+ private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
override fun newClient(): ComputeClient {
- check(scope.isActive) { "Service is already closed" }
+ check(!isClosed) { "Service is already closed" }
return object : ComputeClient {
private var isClosed: Boolean = false
@@ -285,7 +276,12 @@ internal class ComputeServiceImpl(
}
override fun close() {
- scope.cancel()
+ if (isClosed) {
+ return
+ }
+
+ isClosed = true
+ pacer.cancel()
}
override fun getSchedulerStats(): SchedulerStats {
@@ -379,29 +375,23 @@ internal class ComputeServiceImpl(
logger.info { "Assigned server $server to host $host." }
- // Speculatively update the hypervisor view information to prevent other images in the queue from
- // deciding on stale values.
- hv.instanceCount++
- hv.provisionedCores += server.flavor.cpuCount
- hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack
+ try {
+ server.host = host
- scope.launch {
- try {
- server.host = host
- host.spawn(server)
- activeServers[server] = host
+ host.spawn(server)
+ host.start(server)
- _serversActive++
- _attemptsSuccess++
- } catch (e: Throwable) {
- logger.error(e) { "Failed to deploy VM" }
+ _serversActive++
+ _attemptsSuccess++
- hv.instanceCount--
- hv.provisionedCores -= server.flavor.cpuCount
- hv.availableMemory += server.flavor.memorySize
+ hv.instanceCount++
+ hv.provisionedCores += server.flavor.cpuCount
+ hv.availableMemory -= server.flavor.memorySize
- _attemptsError++
- }
+ activeServers[server] = host
+ } catch (e: Throwable) {
+ logger.error(e) { "Failed to deploy VM" }
+ _attemptsError++
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index 73e9b3d7..c18709f3 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -348,7 +348,7 @@ internal class ComputeServiceTest {
// Start server
server.start()
delay(5L * 60 * 1000)
- coVerify { host.spawn(capture(slot), true) }
+ coVerify { host.spawn(capture(slot)) }
listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) }
@@ -376,7 +376,7 @@ internal class ComputeServiceTest {
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns true
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
- coEvery { host.spawn(any(), true) } throws IllegalStateException()
+ coEvery { host.spawn(any()) } throws IllegalStateException()
service.addHost(host)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index ee607066..b3e56f38 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,7 +22,6 @@
package org.opendc.compute.simulator
-import kotlinx.coroutines.yield
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
@@ -132,8 +131,8 @@ public class SimHost(
return sufficientMemory && enoughCpus && canFit
}
- override suspend fun spawn(server: Server, start: Boolean) {
- val guest = guests.computeIfAbsent(server) { key ->
+ override fun spawn(server: Server) {
+ guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
val machine = hypervisor.newMachine(key.flavor.toMachineModel())
@@ -150,27 +149,23 @@ public class SimHost(
_guests.add(newGuest)
newGuest
}
-
- if (start) {
- guest.start()
- }
}
override fun contains(server: Server): Boolean {
return server in guests
}
- override suspend fun start(server: Server) {
+ override fun start(server: Server) {
val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
guest.start()
}
- override suspend fun stop(server: Server) {
+ override fun stop(server: Server) {
val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
guest.stop()
}
- override suspend fun delete(server: Server) {
+ override fun delete(server: Server) {
val guest = guests[server] ?: return
guest.delete()
}
@@ -266,17 +261,10 @@ public class SimHost(
}
}
- public suspend fun recover() {
+ public fun recover() {
updateUptime()
launch()
-
- // Wait for the hypervisor to launch before recovering the guests
- yield()
-
- for (guest in _guests) {
- guest.recover()
- }
}
/**
@@ -298,6 +286,11 @@ public class SimHost(
_bootTime = clock.instant()
_state = HostState.UP
hypervisor.onStart(ctx)
+
+ // Recover the guests that were running on the hypervisor.
+ for (guest in _guests) {
+ guest.recover()
+ }
} catch (cause: Throwable) {
_state = HostState.ERROR
throw cause
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 6d3a5bc7..c12e6fad 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -60,6 +60,7 @@ internal class Guest(
* a server.
*/
var state: ServerState = ServerState.TERMINATED
+ private set
/**
* Start the guest.
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 27151422..fc581d3e 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -24,7 +24,6 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -107,20 +106,19 @@ internal class SimHostTest {
val flavor = MockFlavor(2, 0)
- coroutineScope {
- launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImage)) }
+ suspendCancellableCoroutine { cont ->
+ host.addListener(object : HostListener {
+ private var finished = 0
- suspendCancellableCoroutine { cont ->
- host.addListener(object : HostListener {
- private var finished = 0
-
- override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
- if (newState == ServerState.TERMINATED && ++finished == 1) {
- cont.resume(Unit)
- }
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ if (newState == ServerState.TERMINATED && ++finished == 1) {
+ cont.resume(Unit)
}
- })
- }
+ }
+ })
+ val server = MockServer(UUID.randomUUID(), "a", flavor, vmImage)
+ host.spawn(server)
+ host.start(server)
}
// Ensure last cycle is collected
@@ -190,9 +188,6 @@ internal class SimHostTest {
val flavor = MockFlavor(2, 0)
coroutineScope {
- launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
- launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
-
suspendCancellableCoroutine { cont ->
host.addListener(object : HostListener {
private var finished = 0
@@ -203,6 +198,13 @@ internal class SimHostTest {
}
}
})
+ val serverA = MockServer(UUID.randomUUID(), "a", flavor, vmImageA)
+ host.spawn(serverA)
+ val serverB = MockServer(UUID.randomUUID(), "b", flavor, vmImageB)
+ host.spawn(serverB)
+
+ host.start(serverA)
+ host.start(serverB)
}
}
@@ -259,12 +261,13 @@ internal class SimHostTest {
coroutineScope {
host.spawn(server)
+ host.start(server)
delay(5000L)
host.fail()
delay(duration * 1000)
host.recover()
- suspendCancellableCoroutine<Unit> { cont ->
+ suspendCancellableCoroutine { cont ->
host.addListener(object : HostListener {
override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
if (newState == ServerState.TERMINATED) {