diff options
6 files changed, 65 insertions, 81 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index fad8757e..efcc0f2c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -70,11 +70,8 @@ public interface Host { /** * Register the specified [instance][server] on the host. - * - * Once the method returns, the instance should be running if [start] is true or else the instance should be - * stopped. */ - public suspend fun spawn(server: Server, start: Boolean = true) + public fun spawn(server: Server) /** * Determine whether the specified [instance][server] exists on the host. @@ -86,19 +83,19 @@ public interface Host { * * @throws IllegalArgumentException if the server is not present on the host. */ - public suspend fun start(server: Server) + public fun start(server: Server) /** * Stop the server [instance][server] if it is currently running on this host. * * @throws IllegalArgumentException if the server is not present on the host. */ - public suspend fun stop(server: Server) + public fun stop(server: Server) /** * Delete the specified [instance][server] on this host and cleanup all resources associated with it. */ - public suspend fun delete(server: Server) + public fun delete(server: Server) /** * Add a [HostListener] to this host. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 0fe016aa..b377c3e3 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,11 +22,6 @@ package org.opendc.compute.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.common.util.Pacer import org.opendc.compute.api.ComputeClient @@ -53,23 +48,18 @@ import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param context The [CoroutineContext] to use in the service. + * @param coroutineContext The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( - private val context: CoroutineContext, + coroutineContext: CoroutineContext, private val clock: Clock, private val scheduler: ComputeScheduler, schedulingQuantum: Duration ) : ComputeService, HostListener { /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context + Job()) - - /** * The logger instance of this server. */ private val logger = KotlinLogging.logger {} @@ -115,6 +105,9 @@ internal class ComputeServiceImpl( private val serverById = mutableMapOf<UUID, InternalServer>() override val servers: MutableList<Server> = mutableListOf() + override val hosts: Set<Host> + get() = hostToView.keys + private var maxCores = 0 private var maxMemory = 0L private var _attemptsSuccess = 0L @@ -122,17 +115,15 @@ internal class ComputeServiceImpl( private var _attemptsError = 0L private var _serversPending = 0 private var _serversActive = 0 + private var isClosed = false /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } - - override val hosts: Set<Host> - get() = hostToView.keys + private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } override fun newClient(): ComputeClient { - check(scope.isActive) { "Service is already closed" } + check(!isClosed) { "Service is already closed" } return object : ComputeClient { private var isClosed: Boolean = false @@ -285,7 +276,12 @@ internal class ComputeServiceImpl( } override fun close() { - scope.cancel() + if (isClosed) { + return + } + + isClosed = true + pacer.cancel() } override fun getSchedulerStats(): SchedulerStats { @@ -379,29 +375,23 @@ internal class ComputeServiceImpl( logger.info { "Assigned server $server to host $host." } - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - hv.instanceCount++ - hv.provisionedCores += server.flavor.cpuCount - hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack + try { + server.host = host - scope.launch { - try { - server.host = host - host.spawn(server) - activeServers[server] = host + host.spawn(server) + host.start(server) - _serversActive++ - _attemptsSuccess++ - } catch (e: Throwable) { - logger.error(e) { "Failed to deploy VM" } + _serversActive++ + _attemptsSuccess++ - hv.instanceCount-- - hv.provisionedCores -= server.flavor.cpuCount - hv.availableMemory += server.flavor.memorySize + hv.instanceCount++ + hv.provisionedCores += server.flavor.cpuCount + hv.availableMemory -= server.flavor.memorySize - _attemptsError++ - } + activeServers[server] = host + } catch (e: Throwable) { + logger.error(e) { "Failed to deploy VM" } + _attemptsError++ } } } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 73e9b3d7..c18709f3 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -348,7 +348,7 @@ internal class ComputeServiceTest { // Start server server.start() delay(5L * 60 * 1000) - coVerify { host.spawn(capture(slot), true) } + coVerify { host.spawn(capture(slot)) } listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } @@ -376,7 +376,7 @@ internal class ComputeServiceTest { every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } - coEvery { host.spawn(any(), true) } throws IllegalStateException() + coEvery { host.spawn(any()) } throws IllegalStateException() service.addHost(host) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index ee607066..b3e56f38 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,7 +22,6 @@ package org.opendc.compute.simulator -import kotlinx.coroutines.yield import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState @@ -132,8 +131,8 @@ public class SimHost( return sufficientMemory && enoughCpus && canFit } - override suspend fun spawn(server: Server, start: Boolean) { - val guest = guests.computeIfAbsent(server) { key -> + override fun spawn(server: Server) { + guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } val machine = hypervisor.newMachine(key.flavor.toMachineModel()) @@ -150,27 +149,23 @@ public class SimHost( _guests.add(newGuest) newGuest } - - if (start) { - guest.start() - } } override fun contains(server: Server): Boolean { return server in guests } - override suspend fun start(server: Server) { + override fun start(server: Server) { val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } guest.start() } - override suspend fun stop(server: Server) { + override fun stop(server: Server) { val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } guest.stop() } - override suspend fun delete(server: Server) { + override fun delete(server: Server) { val guest = guests[server] ?: return guest.delete() } @@ -266,17 +261,10 @@ public class SimHost( } } - public suspend fun recover() { + public fun recover() { updateUptime() launch() - - // Wait for the hypervisor to launch before recovering the guests - yield() - - for (guest in _guests) { - guest.recover() - } } /** @@ -298,6 +286,11 @@ public class SimHost( _bootTime = clock.instant() _state = HostState.UP hypervisor.onStart(ctx) + + // Recover the guests that were running on the hypervisor. + for (guest in _guests) { + guest.recover() + } } catch (cause: Throwable) { _state = HostState.ERROR throw cause diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 6d3a5bc7..c12e6fad 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -60,6 +60,7 @@ internal class Guest( * a server. */ var state: ServerState = ServerState.TERMINATED + private set /** * Start the guest. diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 27151422..fc581d3e 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,7 +24,6 @@ package org.opendc.compute.simulator import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -107,20 +106,19 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) - coroutineScope { - launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImage)) } + suspendCancellableCoroutine { cont -> + host.addListener(object : HostListener { + private var finished = 0 - suspendCancellableCoroutine { cont -> - host.addListener(object : HostListener { - private var finished = 0 - - override fun onStateChanged(host: Host, server: Server, newState: ServerState) { - if (newState == ServerState.TERMINATED && ++finished == 1) { - cont.resume(Unit) - } + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED && ++finished == 1) { + cont.resume(Unit) } - }) - } + } + }) + val server = MockServer(UUID.randomUUID(), "a", flavor, vmImage) + host.spawn(server) + host.start(server) } // Ensure last cycle is collected @@ -190,9 +188,6 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) coroutineScope { - launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } - launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } - suspendCancellableCoroutine { cont -> host.addListener(object : HostListener { private var finished = 0 @@ -203,6 +198,13 @@ internal class SimHostTest { } } }) + val serverA = MockServer(UUID.randomUUID(), "a", flavor, vmImageA) + host.spawn(serverA) + val serverB = MockServer(UUID.randomUUID(), "b", flavor, vmImageB) + host.spawn(serverB) + + host.start(serverA) + host.start(serverB) } } @@ -259,12 +261,13 @@ internal class SimHostTest { coroutineScope { host.spawn(server) + host.start(server) delay(5000L) host.fail() delay(duration * 1000) host.recover() - suspendCancellableCoroutine<Unit> { cont -> + suspendCancellableCoroutine { cont -> host.addListener(object : HostListener { override fun onStateChanged(host: Host, server: Server, newState: ServerState) { if (newState == ServerState.TERMINATED) { |
