diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-21 22:04:31 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:51:27 +0100 |
| commit | 76bfeb44c5a02be143c152c52bc1029cff360744 (patch) | |
| tree | be467a0be698df2ebb4dd9fd3c5410d1e53ffa46 /opendc | |
| parent | bc64182612ad06f15bff5b48637ed7d241e293b2 (diff) | |
refactor: Migrate to Flow for event listeners
Diffstat (limited to 'opendc')
21 files changed, 320 insertions, 186 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index c8caaca6..e0a491c8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -30,7 +30,7 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.ServiceKey /** - * Represents the execution context in which an bootable [Image] runs on a [Server]. + * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index 8b8d1596..7cb4c0c5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -33,7 +33,7 @@ import java.util.UUID /** * A bare-metal compute node. */ -data class Node( +public data class Node( /** * The unique identifier of the node. */ @@ -45,7 +45,7 @@ data class Node( public override val name: String, /** - * Meta data of the node. + * Metadata of the node. */ public val metadata: Map<String, Any>, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 5d1db378..41cec291 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -38,6 +38,11 @@ import java.util.UUID */ public interface BareMetalDriver : Powerable, FailureDomain { /** + * The [Node] that is controlled by this driver. + */ + public val node: Flow<Node> + + /** * The amount of work done by the machine in percentage with respect to the total amount of processing power * available. */ diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 49c3fa2e..67069c03 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -48,10 +48,13 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.scanReduce import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -96,33 +99,40 @@ public class SimpleBareMetalDriver( /** * The flow containing the load of the server. */ - private val usageSignal = StateFlow(0.0) + private val usageState = StateFlow(0.0) /** * The machine state. */ - private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events) - set(value) { + private val nodeState = StateFlow(Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) + + override val node: Flow<Node> = nodeState + + override val usage: Flow<Double> = usageState + + override val powerDraw: Flow<Double> = powerModel(this) + + init { + @OptIn(ExperimentalCoroutinesApi::class) + nodeState.scanReduce { field, value -> if (field.state != value.state) { events.emit(NodeEvent.StateChanged(value, field.state)) } - if (field.server != null && value.server != null && field.server!!.state != value.server.state) { - serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server!!.state)) + if (field.server != null && value.server != null && field.server.state != value.server.state) { + serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state)) } - field = value - } - - override val usage: Flow<Double> = usageSignal - - override val powerDraw: Flow<Double> = powerModel(this) + value + }.launchIn(domain) + } override suspend fun init(): Node = withContext(domain.coroutineContext) { - node + nodeState.value } override suspend fun start(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value if (node.state != NodeState.SHUTOFF) { return@withContext node } @@ -139,12 +149,13 @@ public class SimpleBareMetalDriver( events ) - node = node.copy(state = NodeState.BOOT, server = server) + nodeState.value = node.copy(state = NodeState.BOOT, server = server) serverContext = BareMetalServerContext(events) - return@withContext node + return@withContext nodeState.value } override suspend fun stop(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value if (node.state == NodeState.SHUTOFF) { return@withContext node } @@ -153,7 +164,7 @@ public class SimpleBareMetalDriver( serverContext!!.cancel(fail = false) serverContext = null - node = node.copy(state = NodeState.SHUTOFF, server = null) + nodeState.value = node.copy(state = NodeState.SHUTOFF, server = null) return@withContext node } @@ -163,11 +174,11 @@ public class SimpleBareMetalDriver( } override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - node = node.copy(image = image) - return@withContext node + nodeState.value = nodeState.value.copy(image = image) + return@withContext nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } + override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext { private var finalized: Boolean = false @@ -175,7 +186,7 @@ public class SimpleBareMetalDriver( override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus override val server: Server - get() = node.server!! + get() = nodeState.value.server!! private val job = domain.launch { delay(1) // TODO Introduce boot time @@ -193,15 +204,15 @@ public class SimpleBareMetalDriver( */ suspend fun cancel(fail: Boolean) { if (fail) - domain.cancel(ShutdownException(cause = Exception("Random failure"))) + job.cancel(ShutdownException(cause = Exception("Random failure"))) else - domain.cancel(ShutdownException()) + job.cancel(ShutdownException()) job.join() } override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { val server = server.copy(services = server.services.put(key, service)) - node = node.copy(server = server) + nodeState.value = nodeState.value.copy(server = server) events.emit(ServerEvent.ServicePublished(server, key)) } @@ -209,24 +220,24 @@ public class SimpleBareMetalDriver( assert(!finalized) { "Machine is already finalized" } val server = server.copy(state = ServerState.ACTIVE) - node = node.copy(state = NodeState.ACTIVE, server = server) + nodeState.value = nodeState.value.copy(state = NodeState.ACTIVE, server = server) } override suspend fun exit(cause: Throwable?) { finalized = true - val serverState = + val newServerState = if (cause == null || (cause is ShutdownException && cause.cause == null)) ServerState.SHUTOFF else ServerState.ERROR - val nodeState = + val newNodeState = if (cause == null || (cause is ShutdownException && cause.cause != null)) - node.state + nodeState.value.state else NodeState.ERROR - val server = server.copy(state = serverState) - node = node.copy(state = nodeState, server = server) + val server = server.copy(state = newServerState) + nodeState.value = nodeState.value.copy(state = newNodeState, server = server) } private var flush: Job? = null @@ -256,7 +267,7 @@ public class SimpleBareMetalDriver( } } - usageSignal.value = totalUsage / cpus.size + usageState.value = totalUsage / cpus.size try { delay(duration) @@ -269,7 +280,7 @@ public class SimpleBareMetalDriver( // Flush the load if the do not receive a new run call for the same timestamp flush = domain.launch(job) { delay(1) - usageSignal.value = 0.0 + usageState.value = 0.0 } flush!!.invokeOnCompletion { flush = null @@ -289,5 +300,6 @@ public class SimpleBareMetalDriver( override suspend fun fail() { serverContext?.cancel(fail = true) + domain.cancel() } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt index 3c77d57a..69b0124d 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -22,32 +22,37 @@ * SOFTWARE. */ -package com.atlarge.opendc.workflows.monitor +package com.atlarge.opendc.compute.virt -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task +import com.atlarge.opendc.core.Identity +import kotlinx.coroutines.flow.Flow +import java.util.UUID /** - * An interface for monitoring the progression of workflows. + * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment + * into several virtual guest machines. */ -public interface WorkflowMonitor { +public class Hypervisor( /** - * This method is invoked when a job has become active. + * The unique identifier of the hypervisor. */ - public suspend fun onJobStart(job: Job, time: Long) + override val uid: UUID, /** - * This method is invoked when a job has finished processing. + * The optional name of the hypervisor. */ - public suspend fun onJobFinish(job: Job, time: Long) + override val name: String, /** - * This method is invoked when a task of a job has started processing. + * Metadata of the hypervisor. */ - public suspend fun onTaskStart(job: Job, task: Task, time: Long) + public val metadata: Map<String, Any>, /** - * This method is invoked when a task has finished processing. + * The events that are emitted by the hypervisor. */ - public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) + public val events: Flow<HypervisorEvent> +) : Identity { + override fun hashCode(): Int = uid.hashCode() + override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index ccbe8b3c..3230c2ba 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -22,12 +22,14 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.compute.virt.driver.VirtDriver /** * An event that is emitted by a [VirtDriver]. */ -public sealed class VirtDriverEvent { +public sealed class HypervisorEvent { /** * The driver that emitted the event. */ @@ -40,7 +42,11 @@ public sealed class VirtDriverEvent { * @property numberOfActiveServers The number of active servers. * @property availableMemory The available memory, in MB. */ - public data class VmsUpdated(override val driver: VirtDriver, public val numberOfActiveServers: Int, public val availableMemory: Long) : VirtDriverEvent() + public data class VmsUpdated( + override val driver: VirtDriver, + public val numberOfActiveServers: Int, + public val availableMemory: Long + ) : HypervisorEvent() /** * This event is emitted when a slice is finished. @@ -55,5 +61,5 @@ public sealed class VirtDriverEvent { public val requestedBurst: Long, public val grantedBurst: Long, public val numberOfDeployedImages: Int - ) : VirtDriverEvent() + ) : HypervisorEvent() } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index 1eb0e0ff..c21b002d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -22,10 +22,11 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.core.resource.TagContainer import kotlinx.coroutines.coroutineScope @@ -42,7 +43,7 @@ object HypervisorImage : Image { override suspend fun invoke(ctx: ServerContext) { coroutineScope { - val driver = HypervisorVirtDriver(ctx, this) + val driver = SimpleVirtDriver(ctx, this) ctx.publishService(VirtDriver.Key, driver) // Suspend image until it is cancelled diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt index 926234b5..0586ae00 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt @@ -1,3 +1,3 @@ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt.driver public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 0b4a7109..fc4c7634 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt.driver import com.atlarge.odcsim.Domain import com.atlarge.odcsim.flow.EventFlow @@ -37,8 +37,7 @@ import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverEvent +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL @@ -62,7 +61,7 @@ import kotlin.math.min * A [VirtDriver] that is backed by a simple hypervisor implementation. */ @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -class HypervisorVirtDriver( +class SimpleVirtDriver( private val hostContext: ServerContext, private val coroutineScope: CoroutineScope ) : VirtDriver { @@ -85,9 +84,9 @@ class HypervisorVirtDriver( /** * The [EventFlow] to emit the events. */ - internal val eventFlow = EventFlow<VirtDriverEvent>() + internal val eventFlow = EventFlow<HypervisorEvent>() - override val events: Flow<VirtDriverEvent> = eventFlow + override val events: Flow<HypervisorEvent> = eventFlow override suspend fun spawn( image: Image, @@ -106,7 +105,7 @@ class HypervisorVirtDriver( ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events, simulationContext.domain)) - eventFlow.emit(VirtDriverEvent.VmsUpdated(this, vms.size, availableMemory)) + eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server } @@ -223,7 +222,7 @@ class HypervisorVirtDriver( } } - eventFlow.emit(VirtDriverEvent.SliceFinished(this@HypervisorVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) + eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) } this.call = call } @@ -312,7 +311,7 @@ class HypervisorVirtDriver( availableMemory += server.flavor.memorySize vms.remove(this) events.close() - eventFlow.emit(VirtDriverEvent.VmsUpdated(this@HypervisorVirtDriver, vms.size, availableMemory)) + eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) } override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { @@ -322,7 +321,14 @@ class HypervisorVirtDriver( this.burst = burst requests = cpus.asSequence() .take(burst.size) - .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) } + .mapIndexed { i, cpu -> + CpuRequest( + this, + cpu, + burst[i], + limit[i] + ) + } .toList() // Wait until the burst has been run or the coroutine is cancelled diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index 296f170e..d7ae0c12 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey import kotlinx.coroutines.flow.Flow import java.util.UUID @@ -39,7 +40,7 @@ public interface VirtDriver { /** * The events emitted by the driver. */ - public val events: Flow<VirtDriverEvent> + public val events: Flow<HypervisorEvent> /** * Spawn the given [Image] on the compute resource of this driver. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 8365f8c9..8393dfa9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -8,22 +8,26 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage -import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException +import com.atlarge.opendc.compute.virt.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, private val ctx: SimulationContext, private val provisioningService: ProvisioningService -) : VirtProvisioningService { +) : VirtProvisioningService, CoroutineScope by ctx.domain { /** * The hypervisors that have been launched by the service. */ @@ -45,18 +49,17 @@ class SimpleVirtProvisioningService( private val activeImages: MutableSet<ImageView> = mutableSetOf() init { - ctx.domain.launch { + launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage val node = provisioningService.deploy(node, hypervisorImage) node.server!!.events.onEach { event -> - when (event) { - is ServerEvent.StateChanged -> stateChanged(event.server, event.previousState) - is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) - } + when (event) { + is ServerEvent.StateChanged -> stateChanged(event.server) + is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) } - .launchIn(ctx.domain) + }.collect() } } } @@ -64,8 +67,8 @@ class SimpleVirtProvisioningService( override suspend fun deploy( image: Image, flavor: Flavor - ) { - val vmInstance = ImageView(image, flavor) + ): Server = suspendCancellableCoroutine { cont -> + val vmInstance = ImageView(image, flavor, cont) incomingImages += vmInstance requestCycle() } @@ -77,7 +80,7 @@ class SimpleVirtProvisioningService( return } - val call = ctx.domain.launch { + val call = launch { schedule() } call.invokeOnCompletion { this.call = null } @@ -92,10 +95,12 @@ class SimpleVirtProvisioningService( try { println("Spawning ${imageInstance.image}") incomingImages -= imageInstance - imageInstance.server = selectedHv.driver.spawn( + val server = selectedHv.driver.spawn( imageInstance.image, imageInstance.flavor ) + imageInstance.server = server + imageInstance.continuation.resume(server) activeImages += imageInstance } catch (e: InsufficientMemoryOnServerException) { println("Unable to deploy image due to insufficient memory") @@ -103,7 +108,7 @@ class SimpleVirtProvisioningService( } } - private fun stateChanged(server: Server, previousState: ServerState) { + private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { val hvView = HypervisorView( @@ -134,6 +139,7 @@ class SimpleVirtProvisioningService( data class ImageView( val image: Image, val flavor: Flavor, + val continuation: Continuation<Server>, var server: Server? = null ) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index da72d742..12543ce3 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy @@ -16,5 +17,5 @@ interface VirtProvisioningService { * @param image The image to be deployed. * @param flavor The flavor of the machine instance to run this [image] on. */ - public suspend fun deploy(image: Image, flavor: Flavor) + public suspend fun deploy(image: Image, flavor: Flavor): Server } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index d86045c0..bcaafb59 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -30,6 +30,7 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.virt.HypervisorImage import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay @@ -75,8 +76,10 @@ internal class HypervisorTest { val flavor = Flavor(1, 0) val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] - vmDriver.spawn(workloadA, flavor).events.onEach { println(it) }.launchIn(this) - vmDriver.spawn(workloadB, flavor) + val vmA = vmDriver.spawn(workloadA, flavor) + vmA.events.onEach { println(it) }.launchIn(this) + val vmB = vmDriver.spawn(workloadB, flavor) + vmB.events.onEach { println(it) }.launchIn(this) } runBlocking { diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index d5e1404a..b0182ab3 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -29,8 +29,8 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.WorkflowEvent import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy @@ -38,12 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File @@ -62,28 +62,6 @@ fun main(args: Array<String>) { var finished = 0 val token = Channel<Boolean>() - - val monitor = object : WorkflowMonitor { - override suspend fun onJobStart(job: Job, time: Long) { - println("Job ${job.uid} started") - } - - override suspend fun onJobFinish(job: Job, time: Long) { - finished += 1 - println("Jobs $finished/$total finished (${job.tasks.size} tasks)") - - if (finished == total) { - token.send(true) - } - } - - override suspend fun onTaskStart(job: Job, task: Task, time: Long) { - } - - override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { - } - } - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider(name = "sim") @@ -106,6 +84,27 @@ fun main(args: Array<String>) { } val broker = system.newDomain(name = "broker") + + broker.launch { + val scheduler = schedulerAsync.await() + scheduler.events + .onEach { event -> + when (event) { + is WorkflowEvent.JobStarted -> { + println("Job ${event.job.uid} started") + } + is WorkflowEvent.JobFinished -> { + finished += 1 + println("Jobs $finished/$total finished (${event.job.tasks.size} tasks)") + + if (finished == total) { + token.send(true) + } + } + } + } + .collect() + } broker.launch { val ctx = simulationContext val reader = GwfTraceReader(File(args[0])) @@ -115,7 +114,7 @@ fun main(args: Array<String>) { val (time, job) = reader.next() total += 1 delay(max(0, time * 1000 - ctx.clock.millis())) - scheduler.submit(job, monitor) + scheduler.submit(job) } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 0f4d0c1b..e18bbe30 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -2,9 +2,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.flow.first import java.io.BufferedWriter import java.io.Closeable @@ -12,7 +10,7 @@ import java.io.FileWriter class Sc20Monitor( destination: String -) : HypervisorMonitor, ServerMonitor, Closeable { +) : Closeable { private val outputFile = BufferedWriter(FileWriter(destination)) private var failed: Int = 0 @@ -20,14 +18,14 @@ class Sc20Monitor( outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - override fun stateChanged(server: Server, previousState: ServerState) { + fun stateChanged(server: Server) { println("${server.uid} ${server.state}") if (server.state == ServerState.ERROR) { failed++ } } - override suspend fun onSliceFinish( + suspend fun onSliceFinish( time: Long, requestedBurst: Long, grantedBurst: Long, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 4273c39e..96033ea7 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy @@ -41,6 +42,8 @@ import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File @@ -107,11 +110,10 @@ fun main(args: Array<String>) { println(simulationContext.clock.instant()) val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] - val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, - bareMetalProvisioner, + bareMetalProvisioner ) val faultInjectorDomain = root.newDomain(name = "failures") @@ -131,8 +133,11 @@ fun main(args: Array<String>) { while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) - chan.send(Unit) - scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) + launch { + chan.send(Unit) + val server = scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) + server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() + } } println(simulationContext.clock.instant()) diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt index b444f91c..1cb2de97 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt @@ -24,10 +24,9 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job -class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) { +class JobState(val job: Job, val submittedAt: Long) { /** * A flag to indicate whether this job is finished. */ diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index a055a3fe..7a20363c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -25,13 +25,13 @@ package com.atlarge.opendc.workflows.service import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy @@ -39,6 +39,11 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import java.util.PriorityQueue import java.util.Queue import kotlinx.coroutines.launch @@ -58,7 +63,7 @@ class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy, resourceFilterPolicy: ResourceFilterPolicy, resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowService, ServerMonitor { +) : WorkflowService, CoroutineScope by domain { /** * The incoming jobs ready to be processed by the scheduler. @@ -167,6 +172,7 @@ class StageWorkflowService( private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private val resourceFilterPolicy: ResourceFilterPolicy.Logic private val resourceSelectionPolicy: Comparator<Node> + private val eventFlow = EventFlow<WorkflowEvent>() init { domain.launch { @@ -183,9 +189,11 @@ class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) { + override val events: Flow<WorkflowEvent> = eventFlow + + override suspend fun submit(job: Job) = withContext(domain.coroutineContext) { // J1 Incoming Jobs - val jobInstance = JobState(job, monitor, simulationContext.clock.millis()) + val jobInstance = JobState(job, simulationContext.clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -217,6 +225,7 @@ class StageWorkflowService( /** * Perform a scheduling cycle immediately. */ + @OptIn(ExperimentalCoroutinesApi::class) internal suspend fun schedule() { // J2 Create list of eligible jobs val iterator = incomingJobs.iterator() @@ -232,7 +241,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis())) rootListener.jobStarted(jobInstance) } @@ -280,10 +289,13 @@ class StageWorkflowService( // T4 Submit task to machine available -= host instance.state = TaskStatus.ACTIVE - - val newHost = provisioningService.deploy(host, instance.task.image, this) + val newHost = provisioningService.deploy(host, instance.task.image) + val server = newHost.server!! instance.host = newHost - taskByServer[newHost.server!!] = instance + taskByServer[server] = instance + server.events + .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } + .launchIn(this) activeTasks += instance taskQueue.poll() @@ -294,50 +306,48 @@ class StageWorkflowService( } } - override fun stateChanged(server: Server, previousState: ServerState) { - domain.launch { - when (server.state) { - ServerState.ACTIVE -> { - val task = taskByServer.getValue(server) - task.startedAt = simulationContext.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) - rootListener.taskStarted(task) - } - ServerState.SHUTOFF, ServerState.ERROR -> { - val task = taskByServer.remove(server) ?: throw IllegalStateException() - val job = task.job - task.state = TaskStatus.FINISHED - task.finishedAt = simulationContext.clock.millis() - job.tasks.remove(task) - available += task.host!! - activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) - rootListener.taskFinished(task) - - // Add job roots to the scheduling queue - for (dependent in task.dependents) { - if (dependent.state != TaskStatus.READY) { - continue - } - - incomingTasks += dependent - rootListener.taskReady(dependent) + private suspend fun stateChanged(server: Server) { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.startedAt = simulationContext.clock.millis() + eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskStarted(task) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskStatus.FINISHED + task.finishedAt = simulationContext.clock.millis() + job.tasks.remove(task) + available += task.host!! + activeTasks -= task + eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue } - if (job.isFinished) { - finishJob(job) - } + incomingTasks += dependent + rootListener.taskReady(dependent) + } - requestCycle() + if (job.isFinished) { + finishJob(job) } - else -> throw IllegalStateException() + + requestCycle() } + else -> throw IllegalStateException() } } private suspend fun finishJob(job: JobState) { activeJobs -= job - job.monitor.onJobFinish(job.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis())) rootListener.jobFinished(job) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt new file mode 100644 index 00000000..2ca5a19d --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task + +/** + * An event emitted by the [WorkflowService]. + */ +public sealed class WorkflowEvent { + /** + * The [WorkflowService] that emitted the event. + */ + public abstract val service: WorkflowService + + /** + * This event is emitted when a job has become active. + */ + public data class JobStarted( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a job has finished processing. + */ + public data class JobFinished( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskStarted( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskFinished( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index 524f4f9e..38ea49c4 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.core.services.AbstractServiceKey -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -36,9 +36,14 @@ import java.util.UUID */ public interface WorkflowService { /** + * Thie events emitted by the workflow scheduler. + */ + public val events: Flow<WorkflowEvent> + + /** * Submit the specified [Job] to the workflow service for scheduling. */ - public suspend fun submit(job: Job, monitor: WorkflowMonitor) + public suspend fun submit(job: Job) /** * The service key for the workflow scheduler. diff --git a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 19e56482..5ee6d5e6 100644 --- a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -29,17 +29,16 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import kotlinx.coroutines.async import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals @@ -64,24 +63,6 @@ internal class StageWorkflowSchedulerIntegrationTest { var tasksStarted = 0L var tasksFinished = 0L - val monitor = object : WorkflowMonitor { - override suspend fun onJobStart(job: Job, time: Long) { - jobsStarted++ - } - - override suspend fun onJobFinish(job: Job, time: Long) { - jobsFinished++ - } - - override suspend fun onTaskStart(job: Job, task: Task, time: Long) { - tasksStarted++ - } - - override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { - tasksFinished++ - } - } - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider(name = "sim") @@ -104,6 +85,21 @@ internal class StageWorkflowSchedulerIntegrationTest { } val broker = system.newDomain(name = "broker") + + broker.launch { + val scheduler = schedulerAsync.await() + scheduler.events + .onEach { event -> + when (event) { + is WorkflowEvent.JobStarted -> jobsStarted++ + is WorkflowEvent.JobFinished -> jobsFinished++ + is WorkflowEvent.TaskStarted -> tasksStarted++ + is WorkflowEvent.TaskFinished -> tasksFinished++ + } + } + .collect() + } + broker.launch { val ctx = simulationContext val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) @@ -113,7 +109,7 @@ internal class StageWorkflowSchedulerIntegrationTest { val (time, job) = reader.next() jobsSubmitted++ delay(max(0, time * 1000 - ctx.clock.millis())) - scheduler.submit(job, monitor) + scheduler.submit(job) } } |
