diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-25 10:23:47 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-26 15:40:26 +0100 |
| commit | 0bbb0adb97ba4783bbd0073f845781725e6212e8 (patch) | |
| tree | d185a7c0e556946aff1334e3b92d6f3163046c21 /simulator/opendc-compute/opendc-compute-service/src/main | |
| parent | 074dee1cbca7b3a024d45a3b9dd7d8b51acdd4ee (diff) | |
compute: Add test suite for ComputeService
This change adds a test suite for the OpenDC compute service.
Diffstat (limited to 'simulator/opendc-compute/opendc-compute-service/src/main')
9 files changed, 202 insertions, 153 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt index 29f10e27..4a8d3046 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt @@ -59,4 +59,10 @@ internal class ClientFlavor(private val delegate: Flavor) : Flavor { labels = delegate.labels meta = delegate.meta } + + override fun equals(other: Any?): Boolean = other is Flavor && other.uid == uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Flavor[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt index 6c5b2ab0..e0b5c171 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt @@ -52,4 +52,10 @@ internal class ClientImage(private val delegate: Image) : Image { labels = delegate.labels meta = delegate.meta } + + override fun equals(other: Any?): Boolean = other is Image && other.uid == uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Image[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index ae4cee3b..f2929bf3 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -104,4 +104,10 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche watcher.onStateChanged(this, newState) } } + + override fun equals(other: Any?): Boolean = other is Server && other.uid == uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Server[uid=$uid,name=$name,state=$state]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index aa7e0aa1..62808b4d 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -57,7 +57,7 @@ public class ComputeServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - private val scope = CoroutineScope(context) + private val scope = CoroutineScope(context + Job()) /** * The logger instance of this server. @@ -133,130 +133,133 @@ public class ComputeServiceImpl( override val hostCount: Int get() = hostToView.size - override fun newClient(): ComputeClient = object : ComputeClient { - private var isClosed: Boolean = false + override fun newClient(): ComputeClient { + check(scope.isActive) { "Service is already closed" } + return object : ComputeClient { + private var isClosed: Boolean = false - override suspend fun queryFlavors(): List<Flavor> { - check(!isClosed) { "Client is already closed" } + override suspend fun queryFlavors(): List<Flavor> { + check(!isClosed) { "Client is already closed" } - return flavors.values.map { ClientFlavor(it) } - } + return flavors.values.map { ClientFlavor(it) } + } - override suspend fun findFlavor(id: UUID): Flavor? { - check(!isClosed) { "Client is already closed" } + override suspend fun findFlavor(id: UUID): Flavor? { + check(!isClosed) { "Client is already closed" } - return flavors[id]?.let { ClientFlavor(it) } - } + return flavors[id]?.let { ClientFlavor(it) } + } - override suspend fun newFlavor( - name: String, - cpuCount: Int, - memorySize: Long, - labels: Map<String, String>, - meta: Map<String, Any> - ): Flavor { - check(!isClosed) { "Client is already closed" } - - val uid = UUID(clock.millis(), random.nextLong()) - val flavor = InternalFlavor( - this@ComputeServiceImpl, - uid, - name, - cpuCount, - memorySize, - labels, - meta - ) + override suspend fun newFlavor( + name: String, + cpuCount: Int, + memorySize: Long, + labels: Map<String, String>, + meta: Map<String, Any> + ): Flavor { + check(!isClosed) { "Client is already closed" } + + val uid = UUID(clock.millis(), random.nextLong()) + val flavor = InternalFlavor( + this@ComputeServiceImpl, + uid, + name, + cpuCount, + memorySize, + labels, + meta + ) - flavors[uid] = flavor + flavors[uid] = flavor - return ClientFlavor(flavor) - } + return ClientFlavor(flavor) + } - override suspend fun queryImages(): List<Image> { - check(!isClosed) { "Client is already closed" } + override suspend fun queryImages(): List<Image> { + check(!isClosed) { "Client is already closed" } - return images.values.map { ClientImage(it) } - } + return images.values.map { ClientImage(it) } + } - override suspend fun findImage(id: UUID): Image? { - check(!isClosed) { "Client is already closed" } + override suspend fun findImage(id: UUID): Image? { + check(!isClosed) { "Client is already closed" } - return images[id]?.let { ClientImage(it) } - } + return images[id]?.let { ClientImage(it) } + } - override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image { - check(!isClosed) { "Client is already closed" } + override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image { + check(!isClosed) { "Client is already closed" } - val uid = UUID(clock.millis(), random.nextLong()) - val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) + val uid = UUID(clock.millis(), random.nextLong()) + val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) - images[uid] = image + images[uid] = image - return ClientImage(image) - } + return ClientImage(image) + } - override suspend fun newServer( - name: String, - image: Image, - flavor: Flavor, - labels: Map<String, String>, - meta: Map<String, Any>, - start: Boolean - ): Server { - check(!isClosed) { "Client is closed" } - tracer.commit(VmSubmissionEvent(name, image, flavor)) + override suspend fun newServer( + name: String, + image: Image, + flavor: Flavor, + labels: Map<String, String>, + meta: Map<String, Any>, + start: Boolean + ): Server { + check(!isClosed) { "Client is closed" } + tracer.commit(VmSubmissionEvent(name, image, flavor)) - _events.emit( - ComputeServiceEvent.MetricsAvailable( + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + ) + ) + + val uid = UUID(clock.millis(), random.nextLong()) + val server = InternalServer( this@ComputeServiceImpl, - hostCount, - availableHosts.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms + uid, + name, + requireNotNull(flavors[flavor.uid]) { "Unknown flavor" }, + requireNotNull(images[image.uid]) { "Unknown image" }, + labels.toMutableMap(), + meta.toMutableMap() ) - ) - val uid = UUID(clock.millis(), random.nextLong()) - val server = InternalServer( - this@ComputeServiceImpl, - uid, - name, - flavor, - image, - labels.toMutableMap(), - meta.toMutableMap() - ) + servers[uid] = server - servers[uid] = server + if (start) { + server.start() + } - if (start) { - server.start() + return ClientServer(server) } - return ClientServer(server) - } + override suspend fun findServer(id: UUID): Server? { + check(!isClosed) { "Client is already closed" } - override suspend fun findServer(id: UUID): Server? { - check(!isClosed) { "Client is already closed" } + return servers[id]?.let { ClientServer(it) } + } - return servers[id]?.let { ClientServer(it) } - } + override suspend fun queryServers(): List<Server> { + check(!isClosed) { "Client is already closed" } - override suspend fun queryServers(): List<Server> { - check(!isClosed) { "Client is already closed" } + return servers.values.map { ClientServer(it) } + } - return servers.values.map { ClientServer(it) } - } + override fun close() { + isClosed = true + } - override fun close() { - isClosed = true + override fun toString(): String = "ComputeClient" } - - override fun toString(): String = "ComputeClient" } override fun addHost(host: Host) { @@ -285,23 +288,25 @@ public class ComputeServiceImpl( scope.cancel() } - internal fun schedule(server: InternalServer) { + internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } - queue.add(SchedulingRequest(server)) + val request = SchedulingRequest(server) + queue.add(request) requestSchedulingCycle() + return request } internal fun delete(flavor: InternalFlavor) { - checkNotNull(flavors.remove(flavor.uid)) { "Flavor was not known" } + flavors.remove(flavor.uid) } internal fun delete(image: InternalImage) { - checkNotNull(images.remove(image.uid)) { "Image was not known" } + images.remove(image.uid) } internal fun delete(server: InternalServer) { - checkNotNull(servers.remove(server.uid)) { "Server was not known" } + servers.remove(server.uid) } /** @@ -338,7 +343,7 @@ public class ComputeServiceImpl( val server = request.server val hv = allocationLogic.select(availableHosts, request.server) if (hv == null || !hv.host.canFit(server)) { - logger.trace { "Server $server selected for scheduling but no capacity available for it." } + logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" } if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { tracer.commit(VmSubmissionInvalidEvent(server.name)) @@ -360,6 +365,8 @@ public class ComputeServiceImpl( queue.poll() logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + + server.state = ServerState.ERROR continue } else { break @@ -372,42 +379,39 @@ public class ComputeServiceImpl( queue.poll() logger.info { "Assigned server $server to host $host." } - try { - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - hv.numberOfActiveServers++ - hv.provisionedCores += server.flavor.cpuCount - hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack - - scope.launch { - try { - server.assignHost(host) - host.spawn(server) - activeServers[server] = host - - tracer.commit(VmScheduledEvent(server.name)) - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) + + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + hv.numberOfActiveServers++ + hv.provisionedCores += server.flavor.cpuCount + hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack + + scope.launch { + try { + server.host = host + host.spawn(server) + activeServers[server] = host + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms ) - } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + ) + } catch (e: Throwable) { + logger.error("Failed to deploy VM", e) - hv.numberOfActiveServers-- - hv.provisionedCores -= server.flavor.cpuCount - hv.availableMemory += server.flavor.memorySize - } + hv.numberOfActiveServers-- + hv.provisionedCores -= server.flavor.cpuCount + hv.availableMemory += server.flavor.memorySize } - } catch (e: Exception) { - logger.warn(e) { "Failed to assign server $server to $host. " } } } } @@ -415,7 +419,7 @@ public class ComputeServiceImpl( /** * A request to schedule an [InternalServer] onto one of the [Host]s. */ - private data class SchedulingRequest(val server: InternalServer) { + internal data class SchedulingRequest(val server: InternalServer) { /** * A flag to indicate that the request is cancelled. */ diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt index 1bdfdf1a..5793541f 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt @@ -32,4 +32,6 @@ public class HostView(public val host: Host) { public var numberOfActiveServers: Int = 0 public var availableMemory: Long = host.model.memorySize public var provisionedCores: Int = 0 + + override fun toString(): String = "HostView[host=$host]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt index 95e280df..b8fb6279 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt @@ -58,7 +58,9 @@ internal class InternalFlavor( service.delete(this) } - override fun equals(other: Any?): Boolean = other is InternalFlavor && uid == other.uid + override fun equals(other: Any?): Boolean = other is Flavor && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Flavor[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt index 86f2f6b9..d9ed5896 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt @@ -48,7 +48,9 @@ internal class InternalImage( service.delete(this) } - override fun equals(other: Any?): Boolean = other is InternalImage && uid == other.uid + override fun equals(other: Any?): Boolean = other is Image && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Image[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index ff7c1d15..d9d0f3fc 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -34,8 +34,8 @@ internal class InternalServer( private val service: ComputeServiceImpl, override val uid: UUID, override val name: String, - override val flavor: Flavor, - override val image: Image, + override val flavor: InternalFlavor, + override val image: InternalImage, override val labels: MutableMap<String, String>, override val meta: MutableMap<String, Any> ) : Server { @@ -54,6 +54,11 @@ internal class InternalServer( */ internal var host: Host? = null + /** + * The current scheduling request. + */ + private var request: ComputeServiceImpl.SchedulingRequest? = null + override suspend fun start() { when (state) { ServerState.RUNNING -> { @@ -66,35 +71,43 @@ internal class InternalServer( } ServerState.DELETED -> { logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") + throw IllegalStateException("Server is terminated") } else -> { logger.info { "User requested to start server $uid" } state = ServerState.PROVISIONING - service.schedule(this) + assert(request == null) { "Scheduling request already active" } + request = service.schedule(this) } } } override suspend fun stop() { when (state) { - ServerState.PROVISIONING -> {} // TODO Find way to interrupt these + ServerState.PROVISIONING -> { + cancelProvisioningRequest() + state = ServerState.TERMINATED + } ServerState.RUNNING, ServerState.ERROR -> { val host = checkNotNull(host) { "Server not running" } host.stop(this) } - ServerState.TERMINATED -> {} // No work needed - ServerState.DELETED -> throw IllegalStateException("Server is terminated") + ServerState.TERMINATED, ServerState.DELETED -> {} // No work needed } } override suspend fun delete() { when (state) { - ServerState.PROVISIONING -> {} // TODO Find way to interrupt these - ServerState.RUNNING -> { + ServerState.PROVISIONING, ServerState.TERMINATED -> { + cancelProvisioningRequest() + service.delete(this) + state = ServerState.DELETED + } + ServerState.RUNNING, ServerState.ERROR -> { val host = checkNotNull(host) { "Server not running" } host.delete(this) service.delete(this) + state = ServerState.DELETED } else -> {} // No work needed } @@ -121,11 +134,20 @@ internal class InternalServer( field = value } - internal fun assignHost(host: Host) { - this.host = host + /** + * Cancel the provisioning request if active. + */ + private fun cancelProvisioningRequest() { + val request = request + if (request != null) { + this.request = null + request.isCancelled = true + } } - override fun equals(other: Any?): Boolean = other is InternalServer && uid == other.uid + override fun equals(other: Any?): Boolean = other is Server && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Server[uid=$uid,state=$state]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt index ed1dc662..2c953f8b 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt @@ -20,14 +20,11 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView -import org.opendc.compute.service.scheduler.AllocationPolicy - -private val logger = KotlinLogging.logger {} /** * Policy replaying VM-cluster assignment. @@ -36,6 +33,8 @@ private val logger = KotlinLogging.logger {} * assigned the VM image. */ public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String>) : AllocationPolicy { + private val logger = KotlinLogging.logger {} + override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( hypervisors: Set<HostView>, |
