diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-21 22:04:31 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:51:27 +0100 |
| commit | 76bfeb44c5a02be143c152c52bc1029cff360744 (patch) | |
| tree | be467a0be698df2ebb4dd9fd3c5410d1e53ffa46 /opendc/opendc-compute/src/main | |
| parent | bc64182612ad06f15bff5b48637ed7d241e293b2 (diff) | |
refactor: Migrate to Flow for event listeners
Diffstat (limited to 'opendc/opendc-compute/src/main')
12 files changed, 163 insertions, 67 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index c8caaca6..e0a491c8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -30,7 +30,7 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.ServiceKey /** - * Represents the execution context in which an bootable [Image] runs on a [Server]. + * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index 8b8d1596..7cb4c0c5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -33,7 +33,7 @@ import java.util.UUID /** * A bare-metal compute node. */ -data class Node( +public data class Node( /** * The unique identifier of the node. */ @@ -45,7 +45,7 @@ data class Node( public override val name: String, /** - * Meta data of the node. + * Metadata of the node. */ public val metadata: Map<String, Any>, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 5d1db378..41cec291 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -38,6 +38,11 @@ import java.util.UUID */ public interface BareMetalDriver : Powerable, FailureDomain { /** + * The [Node] that is controlled by this driver. + */ + public val node: Flow<Node> + + /** * The amount of work done by the machine in percentage with respect to the total amount of processing power * available. */ diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 49c3fa2e..67069c03 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -48,10 +48,13 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.scanReduce import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -96,33 +99,40 @@ public class SimpleBareMetalDriver( /** * The flow containing the load of the server. */ - private val usageSignal = StateFlow(0.0) + private val usageState = StateFlow(0.0) /** * The machine state. */ - private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events) - set(value) { + private val nodeState = StateFlow(Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) + + override val node: Flow<Node> = nodeState + + override val usage: Flow<Double> = usageState + + override val powerDraw: Flow<Double> = powerModel(this) + + init { + @OptIn(ExperimentalCoroutinesApi::class) + nodeState.scanReduce { field, value -> if (field.state != value.state) { events.emit(NodeEvent.StateChanged(value, field.state)) } - if (field.server != null && value.server != null && field.server!!.state != value.server.state) { - serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server!!.state)) + if (field.server != null && value.server != null && field.server.state != value.server.state) { + serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state)) } - field = value - } - - override val usage: Flow<Double> = usageSignal - - override val powerDraw: Flow<Double> = powerModel(this) + value + }.launchIn(domain) + } override suspend fun init(): Node = withContext(domain.coroutineContext) { - node + nodeState.value } override suspend fun start(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value if (node.state != NodeState.SHUTOFF) { return@withContext node } @@ -139,12 +149,13 @@ public class SimpleBareMetalDriver( events ) - node = node.copy(state = NodeState.BOOT, server = server) + nodeState.value = node.copy(state = NodeState.BOOT, server = server) serverContext = BareMetalServerContext(events) - return@withContext node + return@withContext nodeState.value } override suspend fun stop(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value if (node.state == NodeState.SHUTOFF) { return@withContext node } @@ -153,7 +164,7 @@ public class SimpleBareMetalDriver( serverContext!!.cancel(fail = false) serverContext = null - node = node.copy(state = NodeState.SHUTOFF, server = null) + nodeState.value = node.copy(state = NodeState.SHUTOFF, server = null) return@withContext node } @@ -163,11 +174,11 @@ public class SimpleBareMetalDriver( } override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - node = node.copy(image = image) - return@withContext node + nodeState.value = nodeState.value.copy(image = image) + return@withContext nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } + override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext { private var finalized: Boolean = false @@ -175,7 +186,7 @@ public class SimpleBareMetalDriver( override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus override val server: Server - get() = node.server!! + get() = nodeState.value.server!! private val job = domain.launch { delay(1) // TODO Introduce boot time @@ -193,15 +204,15 @@ public class SimpleBareMetalDriver( */ suspend fun cancel(fail: Boolean) { if (fail) - domain.cancel(ShutdownException(cause = Exception("Random failure"))) + job.cancel(ShutdownException(cause = Exception("Random failure"))) else - domain.cancel(ShutdownException()) + job.cancel(ShutdownException()) job.join() } override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { val server = server.copy(services = server.services.put(key, service)) - node = node.copy(server = server) + nodeState.value = nodeState.value.copy(server = server) events.emit(ServerEvent.ServicePublished(server, key)) } @@ -209,24 +220,24 @@ public class SimpleBareMetalDriver( assert(!finalized) { "Machine is already finalized" } val server = server.copy(state = ServerState.ACTIVE) - node = node.copy(state = NodeState.ACTIVE, server = server) + nodeState.value = nodeState.value.copy(state = NodeState.ACTIVE, server = server) } override suspend fun exit(cause: Throwable?) { finalized = true - val serverState = + val newServerState = if (cause == null || (cause is ShutdownException && cause.cause == null)) ServerState.SHUTOFF else ServerState.ERROR - val nodeState = + val newNodeState = if (cause == null || (cause is ShutdownException && cause.cause != null)) - node.state + nodeState.value.state else NodeState.ERROR - val server = server.copy(state = serverState) - node = node.copy(state = nodeState, server = server) + val server = server.copy(state = newServerState) + nodeState.value = nodeState.value.copy(state = newNodeState, server = server) } private var flush: Job? = null @@ -256,7 +267,7 @@ public class SimpleBareMetalDriver( } } - usageSignal.value = totalUsage / cpus.size + usageState.value = totalUsage / cpus.size try { delay(duration) @@ -269,7 +280,7 @@ public class SimpleBareMetalDriver( // Flush the load if the do not receive a new run call for the same timestamp flush = domain.launch(job) { delay(1) - usageSignal.value = 0.0 + usageState.value = 0.0 } flush!!.invokeOnCompletion { flush = null @@ -289,5 +300,6 @@ public class SimpleBareMetalDriver( override suspend fun fail() { serverContext?.cancel(fail = true) + domain.cancel() } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt new file mode 100644 index 00000000..69b0124d --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.core.Identity +import kotlinx.coroutines.flow.Flow +import java.util.UUID + +/** + * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment + * into several virtual guest machines. + */ +public class Hypervisor( + /** + * The unique identifier of the hypervisor. + */ + override val uid: UUID, + + /** + * The optional name of the hypervisor. + */ + override val name: String, + + /** + * Metadata of the hypervisor. + */ + public val metadata: Map<String, Any>, + + /** + * The events that are emitted by the hypervisor. + */ + public val events: Flow<HypervisorEvent> +) : Identity { + override fun hashCode(): Int = uid.hashCode() + override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index ccbe8b3c..3230c2ba 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -22,12 +22,14 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.compute.virt.driver.VirtDriver /** * An event that is emitted by a [VirtDriver]. */ -public sealed class VirtDriverEvent { +public sealed class HypervisorEvent { /** * The driver that emitted the event. */ @@ -40,7 +42,11 @@ public sealed class VirtDriverEvent { * @property numberOfActiveServers The number of active servers. * @property availableMemory The available memory, in MB. */ - public data class VmsUpdated(override val driver: VirtDriver, public val numberOfActiveServers: Int, public val availableMemory: Long) : VirtDriverEvent() + public data class VmsUpdated( + override val driver: VirtDriver, + public val numberOfActiveServers: Int, + public val availableMemory: Long + ) : HypervisorEvent() /** * This event is emitted when a slice is finished. @@ -55,5 +61,5 @@ public sealed class VirtDriverEvent { public val requestedBurst: Long, public val grantedBurst: Long, public val numberOfDeployedImages: Int - ) : VirtDriverEvent() + ) : HypervisorEvent() } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index 1eb0e0ff..c21b002d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -22,10 +22,11 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.core.resource.TagContainer import kotlinx.coroutines.coroutineScope @@ -42,7 +43,7 @@ object HypervisorImage : Image { override suspend fun invoke(ctx: ServerContext) { coroutineScope { - val driver = HypervisorVirtDriver(ctx, this) + val driver = SimpleVirtDriver(ctx, this) ctx.publishService(VirtDriver.Key, driver) // Suspend image until it is cancelled diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt index 926234b5..0586ae00 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt @@ -1,3 +1,3 @@ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt.driver public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 0b4a7109..fc4c7634 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt.driver import com.atlarge.odcsim.Domain import com.atlarge.odcsim.flow.EventFlow @@ -37,8 +37,7 @@ import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverEvent +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL @@ -62,7 +61,7 @@ import kotlin.math.min * A [VirtDriver] that is backed by a simple hypervisor implementation. */ @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -class HypervisorVirtDriver( +class SimpleVirtDriver( private val hostContext: ServerContext, private val coroutineScope: CoroutineScope ) : VirtDriver { @@ -85,9 +84,9 @@ class HypervisorVirtDriver( /** * The [EventFlow] to emit the events. */ - internal val eventFlow = EventFlow<VirtDriverEvent>() + internal val eventFlow = EventFlow<HypervisorEvent>() - override val events: Flow<VirtDriverEvent> = eventFlow + override val events: Flow<HypervisorEvent> = eventFlow override suspend fun spawn( image: Image, @@ -106,7 +105,7 @@ class HypervisorVirtDriver( ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events, simulationContext.domain)) - eventFlow.emit(VirtDriverEvent.VmsUpdated(this, vms.size, availableMemory)) + eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server } @@ -223,7 +222,7 @@ class HypervisorVirtDriver( } } - eventFlow.emit(VirtDriverEvent.SliceFinished(this@HypervisorVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) + eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) } this.call = call } @@ -312,7 +311,7 @@ class HypervisorVirtDriver( availableMemory += server.flavor.memorySize vms.remove(this) events.close() - eventFlow.emit(VirtDriverEvent.VmsUpdated(this@HypervisorVirtDriver, vms.size, availableMemory)) + eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) } override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { @@ -322,7 +321,14 @@ class HypervisorVirtDriver( this.burst = burst requests = cpus.asSequence() .take(burst.size) - .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) } + .mapIndexed { i, cpu -> + CpuRequest( + this, + cpu, + burst[i], + limit[i] + ) + } .toList() // Wait until the burst has been run or the coroutine is cancelled diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index 296f170e..d7ae0c12 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey import kotlinx.coroutines.flow.Flow import java.util.UUID @@ -39,7 +40,7 @@ public interface VirtDriver { /** * The events emitted by the driver. */ - public val events: Flow<VirtDriverEvent> + public val events: Flow<HypervisorEvent> /** * Spawn the given [Image] on the compute resource of this driver. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 8365f8c9..8393dfa9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -8,22 +8,26 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage -import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException +import com.atlarge.opendc.compute.virt.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, private val ctx: SimulationContext, private val provisioningService: ProvisioningService -) : VirtProvisioningService { +) : VirtProvisioningService, CoroutineScope by ctx.domain { /** * The hypervisors that have been launched by the service. */ @@ -45,18 +49,17 @@ class SimpleVirtProvisioningService( private val activeImages: MutableSet<ImageView> = mutableSetOf() init { - ctx.domain.launch { + launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage val node = provisioningService.deploy(node, hypervisorImage) node.server!!.events.onEach { event -> - when (event) { - is ServerEvent.StateChanged -> stateChanged(event.server, event.previousState) - is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) - } + when (event) { + is ServerEvent.StateChanged -> stateChanged(event.server) + is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) } - .launchIn(ctx.domain) + }.collect() } } } @@ -64,8 +67,8 @@ class SimpleVirtProvisioningService( override suspend fun deploy( image: Image, flavor: Flavor - ) { - val vmInstance = ImageView(image, flavor) + ): Server = suspendCancellableCoroutine { cont -> + val vmInstance = ImageView(image, flavor, cont) incomingImages += vmInstance requestCycle() } @@ -77,7 +80,7 @@ class SimpleVirtProvisioningService( return } - val call = ctx.domain.launch { + val call = launch { schedule() } call.invokeOnCompletion { this.call = null } @@ -92,10 +95,12 @@ class SimpleVirtProvisioningService( try { println("Spawning ${imageInstance.image}") incomingImages -= imageInstance - imageInstance.server = selectedHv.driver.spawn( + val server = selectedHv.driver.spawn( imageInstance.image, imageInstance.flavor ) + imageInstance.server = server + imageInstance.continuation.resume(server) activeImages += imageInstance } catch (e: InsufficientMemoryOnServerException) { println("Unable to deploy image due to insufficient memory") @@ -103,7 +108,7 @@ class SimpleVirtProvisioningService( } } - private fun stateChanged(server: Server, previousState: ServerState) { + private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { val hvView = HypervisorView( @@ -134,6 +139,7 @@ class SimpleVirtProvisioningService( data class ImageView( val image: Image, val flavor: Flavor, + val continuation: Continuation<Server>, var server: Server? = null ) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index da72d742..12543ce3 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy @@ -16,5 +17,5 @@ interface VirtProvisioningService { * @param image The image to be deployed. * @param flavor The flavor of the machine instance to run this [image] on. */ - public suspend fun deploy(image: Image, flavor: Flavor) + public suspend fun deploy(image: Image, flavor: Flavor): Server } |
