summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt11
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt64
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt29
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt37
6 files changed, 65 insertions, 81 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
index fad8757e..efcc0f2c 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -70,11 +70,8 @@ public interface Host {
/**
* Register the specified [instance][server] on the host.
- *
- * Once the method returns, the instance should be running if [start] is true or else the instance should be
- * stopped.
*/
- public suspend fun spawn(server: Server, start: Boolean = true)
+ public fun spawn(server: Server)
/**
* Determine whether the specified [instance][server] exists on the host.
@@ -86,19 +83,19 @@ public interface Host {
*
* @throws IllegalArgumentException if the server is not present on the host.
*/
- public suspend fun start(server: Server)
+ public fun start(server: Server)
/**
* Stop the server [instance][server] if it is currently running on this host.
*
* @throws IllegalArgumentException if the server is not present on the host.
*/
- public suspend fun stop(server: Server)
+ public fun stop(server: Server)
/**
* Delete the specified [instance][server] on this host and cleanup all resources associated with it.
*/
- public suspend fun delete(server: Server)
+ public fun delete(server: Server)
/**
* Add a [HostListener] to this host.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 0fe016aa..b377c3e3 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,11 +22,6 @@
package org.opendc.compute.service.internal
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
@@ -53,23 +48,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use in the service.
+ * @param coroutineContext The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param scheduler The scheduler implementation to use.
* @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
- private val context: CoroutineContext,
+ coroutineContext: CoroutineContext,
private val clock: Clock,
private val scheduler: ComputeScheduler,
schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -115,6 +105,9 @@ internal class ComputeServiceImpl(
private val serverById = mutableMapOf<UUID, InternalServer>()
override val servers: MutableList<Server> = mutableListOf()
+ override val hosts: Set<Host>
+ get() = hostToView.keys
+
private var maxCores = 0
private var maxMemory = 0L
private var _attemptsSuccess = 0L
@@ -122,17 +115,15 @@ internal class ComputeServiceImpl(
private var _attemptsError = 0L
private var _serversPending = 0
private var _serversActive = 0
+ private var isClosed = false
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
-
- override val hosts: Set<Host>
- get() = hostToView.keys
+ private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
override fun newClient(): ComputeClient {
- check(scope.isActive) { "Service is already closed" }
+ check(!isClosed) { "Service is already closed" }
return object : ComputeClient {
private var isClosed: Boolean = false
@@ -285,7 +276,12 @@ internal class ComputeServiceImpl(
}
override fun close() {
- scope.cancel()
+ if (isClosed) {
+ return
+ }
+
+ isClosed = true
+ pacer.cancel()
}
override fun getSchedulerStats(): SchedulerStats {
@@ -379,29 +375,23 @@ internal class ComputeServiceImpl(
logger.info { "Assigned server $server to host $host." }
- // Speculatively update the hypervisor view information to prevent other images in the queue from
- // deciding on stale values.
- hv.instanceCount++
- hv.provisionedCores += server.flavor.cpuCount
- hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack
+ try {
+ server.host = host
- scope.launch {
- try {
- server.host = host
- host.spawn(server)
- activeServers[server] = host
+ host.spawn(server)
+ host.start(server)
- _serversActive++
- _attemptsSuccess++
- } catch (e: Throwable) {
- logger.error(e) { "Failed to deploy VM" }
+ _serversActive++
+ _attemptsSuccess++
- hv.instanceCount--
- hv.provisionedCores -= server.flavor.cpuCount
- hv.availableMemory += server.flavor.memorySize
+ hv.instanceCount++
+ hv.provisionedCores += server.flavor.cpuCount
+ hv.availableMemory -= server.flavor.memorySize
- _attemptsError++
- }
+ activeServers[server] = host
+ } catch (e: Throwable) {
+ logger.error(e) { "Failed to deploy VM" }
+ _attemptsError++
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index 73e9b3d7..c18709f3 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -348,7 +348,7 @@ internal class ComputeServiceTest {
// Start server
server.start()
delay(5L * 60 * 1000)
- coVerify { host.spawn(capture(slot), true) }
+ coVerify { host.spawn(capture(slot)) }
listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) }
@@ -376,7 +376,7 @@ internal class ComputeServiceTest {
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns true
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
- coEvery { host.spawn(any(), true) } throws IllegalStateException()
+ coEvery { host.spawn(any()) } throws IllegalStateException()
service.addHost(host)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index ee607066..b3e56f38 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,7 +22,6 @@
package org.opendc.compute.simulator
-import kotlinx.coroutines.yield
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
@@ -132,8 +131,8 @@ public class SimHost(
return sufficientMemory && enoughCpus && canFit
}
- override suspend fun spawn(server: Server, start: Boolean) {
- val guest = guests.computeIfAbsent(server) { key ->
+ override fun spawn(server: Server) {
+ guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
val machine = hypervisor.newMachine(key.flavor.toMachineModel())
@@ -150,27 +149,23 @@ public class SimHost(
_guests.add(newGuest)
newGuest
}
-
- if (start) {
- guest.start()
- }
}
override fun contains(server: Server): Boolean {
return server in guests
}
- override suspend fun start(server: Server) {
+ override fun start(server: Server) {
val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
guest.start()
}
- override suspend fun stop(server: Server) {
+ override fun stop(server: Server) {
val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
guest.stop()
}
- override suspend fun delete(server: Server) {
+ override fun delete(server: Server) {
val guest = guests[server] ?: return
guest.delete()
}
@@ -266,17 +261,10 @@ public class SimHost(
}
}
- public suspend fun recover() {
+ public fun recover() {
updateUptime()
launch()
-
- // Wait for the hypervisor to launch before recovering the guests
- yield()
-
- for (guest in _guests) {
- guest.recover()
- }
}
/**
@@ -298,6 +286,11 @@ public class SimHost(
_bootTime = clock.instant()
_state = HostState.UP
hypervisor.onStart(ctx)
+
+ // Recover the guests that were running on the hypervisor.
+ for (guest in _guests) {
+ guest.recover()
+ }
} catch (cause: Throwable) {
_state = HostState.ERROR
throw cause
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 6d3a5bc7..c12e6fad 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -60,6 +60,7 @@ internal class Guest(
* a server.
*/
var state: ServerState = ServerState.TERMINATED
+ private set
/**
* Start the guest.
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 27151422..fc581d3e 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -24,7 +24,6 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -107,20 +106,19 @@ internal class SimHostTest {
val flavor = MockFlavor(2, 0)
- coroutineScope {
- launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImage)) }
+ suspendCancellableCoroutine { cont ->
+ host.addListener(object : HostListener {
+ private var finished = 0
- suspendCancellableCoroutine { cont ->
- host.addListener(object : HostListener {
- private var finished = 0
-
- override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
- if (newState == ServerState.TERMINATED && ++finished == 1) {
- cont.resume(Unit)
- }
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ if (newState == ServerState.TERMINATED && ++finished == 1) {
+ cont.resume(Unit)
}
- })
- }
+ }
+ })
+ val server = MockServer(UUID.randomUUID(), "a", flavor, vmImage)
+ host.spawn(server)
+ host.start(server)
}
// Ensure last cycle is collected
@@ -190,9 +188,6 @@ internal class SimHostTest {
val flavor = MockFlavor(2, 0)
coroutineScope {
- launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
- launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
-
suspendCancellableCoroutine { cont ->
host.addListener(object : HostListener {
private var finished = 0
@@ -203,6 +198,13 @@ internal class SimHostTest {
}
}
})
+ val serverA = MockServer(UUID.randomUUID(), "a", flavor, vmImageA)
+ host.spawn(serverA)
+ val serverB = MockServer(UUID.randomUUID(), "b", flavor, vmImageB)
+ host.spawn(serverB)
+
+ host.start(serverA)
+ host.start(serverB)
}
}
@@ -259,12 +261,13 @@ internal class SimHostTest {
coroutineScope {
host.spawn(server)
+ host.start(server)
delay(5000L)
host.fail()
delay(duration * 1000)
host.recover()
- suspendCancellableCoroutine<Unit> { cont ->
+ suspendCancellableCoroutine { cont ->
host.addListener(object : HostListener {
override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
if (newState == ServerState.TERMINATED) {