From 76bfeb44c5a02be143c152c52bc1029cff360744 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 21 Mar 2020 22:04:31 +0100 Subject: refactor: Migrate to Flow for event listeners --- .../opendc/compute/core/execution/ServerContext.kt | 2 +- .../com/atlarge/opendc/compute/metal/Node.kt | 4 +- .../opendc/compute/metal/driver/BareMetalDriver.kt | 5 + .../compute/metal/driver/SimpleBareMetalDriver.kt | 72 +++-- .../com/atlarge/opendc/compute/virt/Hypervisor.kt | 58 ++++ .../atlarge/opendc/compute/virt/HypervisorEvent.kt | 65 ++++ .../atlarge/opendc/compute/virt/HypervisorImage.kt | 57 ++++ .../driver/InsufficientMemoryOnServerException.kt | 3 + .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 348 +++++++++++++++++++++ .../opendc/compute/virt/driver/VirtDriver.kt | 3 +- .../opendc/compute/virt/driver/VirtDriverEvent.kt | 59 ---- .../virt/driver/hypervisor/HypervisorImage.kt | 56 ---- .../virt/driver/hypervisor/HypervisorVirtDriver.kt | 342 -------------------- .../InsufficientMemoryOnServerException.kt | 3 - .../virt/service/SimpleVirtProvisioningService.kt | 36 ++- .../virt/service/VirtProvisioningService.kt | 3 +- .../virt/driver/hypervisor/HypervisorTest.kt | 7 +- .../opendc/experiments/sc18/TestExperiment.kt | 51 ++- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 8 +- .../opendc/experiments/sc20/TestExperiment.kt | 13 +- .../opendc/workflows/monitor/WorkflowMonitor.kt | 53 ---- .../atlarge/opendc/workflows/service/JobState.kt | 3 +- .../workflows/service/StageWorkflowService.kt | 96 +++--- .../opendc/workflows/service/WorkflowEvent.kt | 76 +++++ .../opendc/workflows/service/WorkflowService.kt | 9 +- .../StageWorkflowSchedulerIntegrationTest.kt | 40 ++- 26 files changed, 803 insertions(+), 669 deletions(-) create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index c8caaca6..e0a491c8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -30,7 +30,7 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.ServiceKey /** - * Represents the execution context in which an bootable [Image] runs on a [Server]. + * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index 8b8d1596..7cb4c0c5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -33,7 +33,7 @@ import java.util.UUID /** * A bare-metal compute node. */ -data class Node( +public data class Node( /** * The unique identifier of the node. */ @@ -45,7 +45,7 @@ data class Node( public override val name: String, /** - * Meta data of the node. + * Metadata of the node. */ public val metadata: Map, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 5d1db378..41cec291 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -37,6 +37,11 @@ import java.util.UUID * A driver interface for the management interface of a bare-metal compute node. */ public interface BareMetalDriver : Powerable, FailureDomain { + /** + * The [Node] that is controlled by this driver. + */ + public val node: Flow + /** * The amount of work done by the machine in percentage with respect to the total amount of processing power * available. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 49c3fa2e..67069c03 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -48,10 +48,13 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.scanReduce import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -96,33 +99,40 @@ public class SimpleBareMetalDriver( /** * The flow containing the load of the server. */ - private val usageSignal = StateFlow(0.0) + private val usageState = StateFlow(0.0) /** * The machine state. */ - private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events) - set(value) { + private val nodeState = StateFlow(Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) + + override val node: Flow = nodeState + + override val usage: Flow = usageState + + override val powerDraw: Flow = powerModel(this) + + init { + @OptIn(ExperimentalCoroutinesApi::class) + nodeState.scanReduce { field, value -> if (field.state != value.state) { events.emit(NodeEvent.StateChanged(value, field.state)) } - if (field.server != null && value.server != null && field.server!!.state != value.server.state) { - serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server!!.state)) + if (field.server != null && value.server != null && field.server.state != value.server.state) { + serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state)) } - field = value - } - - override val usage: Flow = usageSignal - - override val powerDraw: Flow = powerModel(this) + value + }.launchIn(domain) + } override suspend fun init(): Node = withContext(domain.coroutineContext) { - node + nodeState.value } override suspend fun start(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value if (node.state != NodeState.SHUTOFF) { return@withContext node } @@ -139,12 +149,13 @@ public class SimpleBareMetalDriver( events ) - node = node.copy(state = NodeState.BOOT, server = server) + nodeState.value = node.copy(state = NodeState.BOOT, server = server) serverContext = BareMetalServerContext(events) - return@withContext node + return@withContext nodeState.value } override suspend fun stop(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value if (node.state == NodeState.SHUTOFF) { return@withContext node } @@ -153,7 +164,7 @@ public class SimpleBareMetalDriver( serverContext!!.cancel(fail = false) serverContext = null - node = node.copy(state = NodeState.SHUTOFF, server = null) + nodeState.value = node.copy(state = NodeState.SHUTOFF, server = null) return@withContext node } @@ -163,11 +174,11 @@ public class SimpleBareMetalDriver( } override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - node = node.copy(image = image) - return@withContext node + nodeState.value = nodeState.value.copy(image = image) + return@withContext nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } + override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } private inner class BareMetalServerContext(val events: EventFlow) : ServerManagementContext { private var finalized: Boolean = false @@ -175,7 +186,7 @@ public class SimpleBareMetalDriver( override val cpus: List = this@SimpleBareMetalDriver.cpus override val server: Server - get() = node.server!! + get() = nodeState.value.server!! private val job = domain.launch { delay(1) // TODO Introduce boot time @@ -193,15 +204,15 @@ public class SimpleBareMetalDriver( */ suspend fun cancel(fail: Boolean) { if (fail) - domain.cancel(ShutdownException(cause = Exception("Random failure"))) + job.cancel(ShutdownException(cause = Exception("Random failure"))) else - domain.cancel(ShutdownException()) + job.cancel(ShutdownException()) job.join() } override suspend fun publishService(key: ServiceKey, service: T) { val server = server.copy(services = server.services.put(key, service)) - node = node.copy(server = server) + nodeState.value = nodeState.value.copy(server = server) events.emit(ServerEvent.ServicePublished(server, key)) } @@ -209,24 +220,24 @@ public class SimpleBareMetalDriver( assert(!finalized) { "Machine is already finalized" } val server = server.copy(state = ServerState.ACTIVE) - node = node.copy(state = NodeState.ACTIVE, server = server) + nodeState.value = nodeState.value.copy(state = NodeState.ACTIVE, server = server) } override suspend fun exit(cause: Throwable?) { finalized = true - val serverState = + val newServerState = if (cause == null || (cause is ShutdownException && cause.cause == null)) ServerState.SHUTOFF else ServerState.ERROR - val nodeState = + val newNodeState = if (cause == null || (cause is ShutdownException && cause.cause != null)) - node.state + nodeState.value.state else NodeState.ERROR - val server = server.copy(state = serverState) - node = node.copy(state = nodeState, server = server) + val server = server.copy(state = newServerState) + nodeState.value = nodeState.value.copy(state = newNodeState, server = server) } private var flush: Job? = null @@ -256,7 +267,7 @@ public class SimpleBareMetalDriver( } } - usageSignal.value = totalUsage / cpus.size + usageState.value = totalUsage / cpus.size try { delay(duration) @@ -269,7 +280,7 @@ public class SimpleBareMetalDriver( // Flush the load if the do not receive a new run call for the same timestamp flush = domain.launch(job) { delay(1) - usageSignal.value = 0.0 + usageState.value = 0.0 } flush!!.invokeOnCompletion { flush = null @@ -289,5 +300,6 @@ public class SimpleBareMetalDriver( override suspend fun fail() { serverContext?.cancel(fail = true) + domain.cancel() } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt new file mode 100644 index 00000000..69b0124d --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.core.Identity +import kotlinx.coroutines.flow.Flow +import java.util.UUID + +/** + * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment + * into several virtual guest machines. + */ +public class Hypervisor( + /** + * The unique identifier of the hypervisor. + */ + override val uid: UUID, + + /** + * The optional name of the hypervisor. + */ + override val name: String, + + /** + * Metadata of the hypervisor. + */ + public val metadata: Map, + + /** + * The events that are emitted by the hypervisor. + */ + public val events: Flow +) : Identity { + override fun hashCode(): Int = uid.hashCode() + override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt new file mode 100644 index 00000000..3230c2ba --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -0,0 +1,65 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.compute.virt.driver.VirtDriver + +/** + * An event that is emitted by a [VirtDriver]. + */ +public sealed class HypervisorEvent { + /** + * The driver that emitted the event. + */ + public abstract val driver: VirtDriver + + /** + * This event is emitted when the number of active servers on the server managed by this driver is updated. + * + * @property driver The driver that emitted the event. + * @property numberOfActiveServers The number of active servers. + * @property availableMemory The available memory, in MB. + */ + public data class VmsUpdated( + override val driver: VirtDriver, + public val numberOfActiveServers: Int, + public val availableMemory: Long + ) : HypervisorEvent() + + /** + * This event is emitted when a slice is finished. + * + * @property driver The driver that emitted the event. + * @property requestedBurst The total requested CPU time (can be above capacity). + * @property grantedBurst The actual total granted capacity. + * @property numberOfDeployedImages The number of images deployed on this hypervisor. + */ + public data class SliceFinished( + override val driver: VirtDriver, + public val requestedBurst: Long, + public val grantedBurst: Long, + public val numberOfDeployedImages: Int + ) : HypervisorEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt new file mode 100644 index 00000000..c21b002d --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -0,0 +1,57 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.compute.core.execution.ServerContext +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.suspendCancellableCoroutine +import java.util.UUID + +/** + * A hypervisor managing the VMs of a node. + */ +object HypervisorImage : Image { + override val uid: UUID = UUID.randomUUID() + override val name: String = "vmm" + override val tags: TagContainer = emptyMap() + + override suspend fun invoke(ctx: ServerContext) { + coroutineScope { + val driver = SimpleVirtDriver(ctx, this) + ctx.publishService(VirtDriver.Key, driver) + + // Suspend image until it is cancelled + try { + suspendCancellableCoroutine {} + } finally { + driver.eventFlow.close() + } + } + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt new file mode 100644 index 00000000..0586ae00 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt @@ -0,0 +1,3 @@ +package com.atlarge.opendc.compute.virt.driver + +public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt new file mode 100644 index 00000000..fc4c7634 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -0,0 +1,348 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.driver + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.execution.ServerContext +import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.execution.ShutdownException +import com.atlarge.opendc.compute.core.execution.assertFailure +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.core.services.ServiceKey +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch +import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A [VirtDriver] that is backed by a simple hypervisor implementation. + */ +@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) +class SimpleVirtDriver( + private val hostContext: ServerContext, + private val coroutineScope: CoroutineScope +) : VirtDriver { + /** + * The [Server] on which this hypervisor runs. + */ + public val server: Server + get() = hostContext.server + + /** + * A set for tracking the VM context objects. + */ + internal val vms: MutableSet = mutableSetOf() + + /** + * Current total memory use of the images on this hypervisor. + */ + private var availableMemory: Long = hostContext.server.flavor.memorySize + + /** + * The [EventFlow] to emit the events. + */ + internal val eventFlow = EventFlow() + + override val events: Flow = eventFlow + + override suspend fun spawn( + image: Image, + flavor: Flavor + ): Server { + val requiredMemory = flavor.memorySize + if (availableMemory - requiredMemory < 0) { + throw InsufficientMemoryOnServerException() + } + require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } + + val events = EventFlow() + val server = Server( + UUID.randomUUID(), "", emptyMap(), flavor, image, ServerState.BUILD, + ServiceRegistry(), events + ) + availableMemory -= requiredMemory + vms.add(VmServerContext(server, events, simulationContext.domain)) + eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) + return server + } + + /** + * A flag to indicate the driver is stopped. + */ + private var stopped: Boolean = false + + /** + * The set of [VmServerContext] instances that is being scheduled at the moment. + */ + private val activeVms = mutableSetOf() + + /** + * The deferred run call. + */ + private var call: Job? = null + + /** + * Schedule the vCPUs on the physical CPUs. + */ + private suspend fun reschedule() { + flush() + + // Do not schedule a call if there is no work to schedule or the driver stopped. + if (stopped || activeVms.isEmpty()) { + return + } + + val call = coroutineScope.launch { + val start = simulationContext.clock.millis() + val vms = activeVms.toSet() + + var duration: Double = Double.POSITIVE_INFINITY + var deadline: Long = Long.MAX_VALUE + + val maxUsage = hostContext.cpus.sumByDouble { it.frequency } + var availableUsage = maxUsage + val requests = vms.asSequence() + .flatMap { it.requests.asSequence() } + .sortedBy { it.limit } + .toList() + + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing + for ((i, req) in requests.withIndex()) { + val remaining = requests.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(req.limit, availableShare) + + req.allocatedUsage = grantedUsage + availableUsage -= grantedUsage + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, req.burst / req.allocatedUsage) + deadline = min(deadline, req.vm.deadline) + } + + val usage = DoubleArray(hostContext.cpus.size) + val burst = LongArray(hostContext.cpus.size) + val totalUsage = maxUsage - availableUsage + availableUsage = totalUsage + val serverLoad = totalUsage / maxUsage + + // Divide the requests over the available capacity of the pCPUs fairly + for (i in hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }) { + val remaining = hostContext.cpus.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(hostContext.cpus[i].frequency, availableShare) + + usage[i] = grantedUsage + burst[i] = (duration * grantedUsage).toLong() + availableUsage -= grantedUsage + } + + val remainder = burst.clone() + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + hostContext.run(remainder, usage, deadline) + val end = simulationContext.clock.millis() + + // No work was performed + if ((end - start) <= 0) { + return@launch + } + + val totalRemainder = remainder.sum() + val totalBurst = burst.sum() + val imagesRunning = vms.map { it.server.image }.toSet() + + for (vm in vms) { + // Apply performance interference model + val performanceModel = + vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + val performanceScore = performanceModel?.apply(imagesRunning, serverLoad) ?: 1.0 + + for ((i, req) in vm.requests.withIndex()) { + // Compute the fraction of compute time allocated to the VM + val fraction = req.allocatedUsage / totalUsage + + // Derive the burst that was allocated to this vCPU + val allocatedBurst = ceil(duration * req.allocatedUsage).toLong() + + // Compute the burst time that the VM was actually granted + val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong() + + // Compute remaining burst time to be executed for the request + req.burst = max(0, vm.burst[i] - grantedBurst) + vm.burst[i] = req.burst + } + + if (vm.burst.any { it == 0L } || vm.deadline <= end) { + // Return vCPU `run` call: the requested burst was completed or deadline was exceeded + vm.chan.send(Unit) + } + } + + eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) + } + this.call = call + } + + /** + * Flush the progress of the current active VMs. + */ + private suspend fun flush() { + val call = call ?: return // If there is no active call, there is nothing to flush + // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its + // completion. + call.cancelAndJoin() + this.call = null + } + + /** + * A request to schedule a virtual CPU on the host cpu. + */ + internal data class CpuRequest( + val vm: VmServerContext, + val vcpu: ProcessingUnit, + var burst: Long, + val limit: Double + ) { + /** + * The usage that was actually granted. + */ + var allocatedUsage: Double = 0.0 + } + + internal inner class VmServerContext( + server: Server, + val events: EventFlow, + val domain: Domain + ) : ServerManagementContext { + private var finalized: Boolean = false + lateinit var requests: List + lateinit var burst: LongArray + var deadline: Long = 0L + var chan = Channel(Channel.RENDEZVOUS) + private var initialized: Boolean = false + + internal val job: Job = coroutineScope.launch { + delay(1) // TODO Introduce boot time + init() + try { + server.image(this@VmServerContext) + exit() + } catch (cause: Throwable) { + exit(cause) + } + } + + override var server: Server = server + set(value) { + if (field.state != value.state) { + events.emit(ServerEvent.StateChanged(value, field.state)) + } + + field = value + } + + override val cpus: List = hostContext.cpus.take(server.flavor.cpuCount) + + override suspend fun publishService(key: ServiceKey, service: T) { + server = server.copy(services = server.services.put(key, service)) + events.emit(ServerEvent.ServicePublished(server, key)) + } + + override suspend fun init() { + assert(!finalized) { "VM is already finalized" } + + server = server.copy(state = ServerState.ACTIVE) + initialized = true + } + + override suspend fun exit(cause: Throwable?) { + finalized = true + + val serverState = + if (cause == null || (cause is ShutdownException && cause.cause == null)) + ServerState.SHUTOFF + else + ServerState.ERROR + server = server.copy(state = serverState) + availableMemory += server.flavor.memorySize + vms.remove(this) + events.close() + eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) + } + + override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { + require(burst.size == limit.size) { "Array dimensions do not match" } + + this.deadline = deadline + this.burst = burst + requests = cpus.asSequence() + .take(burst.size) + .mapIndexed { i, cpu -> + CpuRequest( + this, + cpu, + burst[i], + limit[i] + ) + } + .toList() + + // Wait until the burst has been run or the coroutine is cancelled + try { + activeVms += this + reschedule() + chan.receive() + } catch (e: CancellationException) { + // On cancellation, we compute and return the remaining burst + e.assertFailure() + } finally { + activeVms -= this + reschedule() + } + } + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index 296f170e..d7ae0c12 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey import kotlinx.coroutines.flow.Flow import java.util.UUID @@ -39,7 +40,7 @@ public interface VirtDriver { /** * The events emitted by the driver. */ - public val events: Flow + public val events: Flow /** * Spawn the given [Image] on the compute resource of this driver. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt deleted file mode 100644 index ccbe8b3c..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver - -/** - * An event that is emitted by a [VirtDriver]. - */ -public sealed class VirtDriverEvent { - /** - * The driver that emitted the event. - */ - public abstract val driver: VirtDriver - - /** - * This event is emitted when the number of active servers on the server managed by this driver is updated. - * - * @property driver The driver that emitted the event. - * @property numberOfActiveServers The number of active servers. - * @property availableMemory The available memory, in MB. - */ - public data class VmsUpdated(override val driver: VirtDriver, public val numberOfActiveServers: Int, public val availableMemory: Long) : VirtDriverEvent() - - /** - * This event is emitted when a slice is finished. - * - * @property driver The driver that emitted the event. - * @property requestedBurst The total requested CPU time (can be above capacity). - * @property grantedBurst The actual total granted capacity. - * @property numberOfDeployedImages The number of images deployed on this hypervisor. - */ - public data class SliceFinished( - override val driver: VirtDriver, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val numberOfDeployedImages: Int - ) : VirtDriverEvent() -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt deleted file mode 100644 index 1eb0e0ff..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.opendc.compute.core.execution.ServerContext -import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.suspendCancellableCoroutine -import java.util.UUID - -/** - * A hypervisor managing the VMs of a node. - */ -object HypervisorImage : Image { - override val uid: UUID = UUID.randomUUID() - override val name: String = "vmm" - override val tags: TagContainer = emptyMap() - - override suspend fun invoke(ctx: ServerContext) { - coroutineScope { - val driver = HypervisorVirtDriver(ctx, this) - ctx.publishService(VirtDriver.Key, driver) - - // Suspend image until it is cancelled - try { - suspendCancellableCoroutine {} - } finally { - driver.eventFlow.close() - } - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt deleted file mode 100644 index 0b4a7109..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ /dev/null @@ -1,342 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ServerContext -import com.atlarge.opendc.compute.core.execution.ServerManagementContext -import com.atlarge.opendc.compute.core.execution.ShutdownException -import com.atlarge.opendc.compute.core.execution.assertFailure -import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverEvent -import com.atlarge.opendc.core.services.ServiceKey -import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min - -/** - * A [VirtDriver] that is backed by a simple hypervisor implementation. - */ -@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -class HypervisorVirtDriver( - private val hostContext: ServerContext, - private val coroutineScope: CoroutineScope -) : VirtDriver { - /** - * The [Server] on which this hypervisor runs. - */ - public val server: Server - get() = hostContext.server - - /** - * A set for tracking the VM context objects. - */ - internal val vms: MutableSet = mutableSetOf() - - /** - * Current total memory use of the images on this hypervisor. - */ - private var availableMemory: Long = hostContext.server.flavor.memorySize - - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow() - - override val events: Flow = eventFlow - - override suspend fun spawn( - image: Image, - flavor: Flavor - ): Server { - val requiredMemory = flavor.memorySize - if (availableMemory - requiredMemory < 0) { - throw InsufficientMemoryOnServerException() - } - require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } - - val events = EventFlow() - val server = Server( - UUID.randomUUID(), "", emptyMap(), flavor, image, ServerState.BUILD, - ServiceRegistry(), events - ) - availableMemory -= requiredMemory - vms.add(VmServerContext(server, events, simulationContext.domain)) - eventFlow.emit(VirtDriverEvent.VmsUpdated(this, vms.size, availableMemory)) - return server - } - - /** - * A flag to indicate the driver is stopped. - */ - private var stopped: Boolean = false - - /** - * The set of [VmServerContext] instances that is being scheduled at the moment. - */ - private val activeVms = mutableSetOf() - - /** - * The deferred run call. - */ - private var call: Job? = null - - /** - * Schedule the vCPUs on the physical CPUs. - */ - private suspend fun reschedule() { - flush() - - // Do not schedule a call if there is no work to schedule or the driver stopped. - if (stopped || activeVms.isEmpty()) { - return - } - - val call = coroutineScope.launch { - val start = simulationContext.clock.millis() - val vms = activeVms.toSet() - - var duration: Double = Double.POSITIVE_INFINITY - var deadline: Long = Long.MAX_VALUE - - val maxUsage = hostContext.cpus.sumByDouble { it.frequency } - var availableUsage = maxUsage - val requests = vms.asSequence() - .flatMap { it.requests.asSequence() } - .sortedBy { it.limit } - .toList() - - // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - for ((i, req) in requests.withIndex()) { - val remaining = requests.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(req.limit, availableShare) - - req.allocatedUsage = grantedUsage - availableUsage -= grantedUsage - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, req.burst / req.allocatedUsage) - deadline = min(deadline, req.vm.deadline) - } - - val usage = DoubleArray(hostContext.cpus.size) - val burst = LongArray(hostContext.cpus.size) - val totalUsage = maxUsage - availableUsage - availableUsage = totalUsage - val serverLoad = totalUsage / maxUsage - - // Divide the requests over the available capacity of the pCPUs fairly - for (i in hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }) { - val remaining = hostContext.cpus.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(hostContext.cpus[i].frequency, availableShare) - - usage[i] = grantedUsage - burst[i] = (duration * grantedUsage).toLong() - availableUsage -= grantedUsage - } - - val remainder = burst.clone() - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - hostContext.run(remainder, usage, deadline) - val end = simulationContext.clock.millis() - - // No work was performed - if ((end - start) <= 0) { - return@launch - } - - val totalRemainder = remainder.sum() - val totalBurst = burst.sum() - val imagesRunning = vms.map { it.server.image }.toSet() - - for (vm in vms) { - // Apply performance interference model - val performanceModel = - vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - val performanceScore = performanceModel?.apply(imagesRunning, serverLoad) ?: 1.0 - - for ((i, req) in vm.requests.withIndex()) { - // Compute the fraction of compute time allocated to the VM - val fraction = req.allocatedUsage / totalUsage - - // Derive the burst that was allocated to this vCPU - val allocatedBurst = ceil(duration * req.allocatedUsage).toLong() - - // Compute the burst time that the VM was actually granted - val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong() - - // Compute remaining burst time to be executed for the request - req.burst = max(0, vm.burst[i] - grantedBurst) - vm.burst[i] = req.burst - } - - if (vm.burst.any { it == 0L } || vm.deadline <= end) { - // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan.send(Unit) - } - } - - eventFlow.emit(VirtDriverEvent.SliceFinished(this@HypervisorVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) - } - this.call = call - } - - /** - * Flush the progress of the current active VMs. - */ - private suspend fun flush() { - val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its - // completion. - call.cancelAndJoin() - this.call = null - } - - /** - * A request to schedule a virtual CPU on the host cpu. - */ - internal data class CpuRequest( - val vm: VmServerContext, - val vcpu: ProcessingUnit, - var burst: Long, - val limit: Double - ) { - /** - * The usage that was actually granted. - */ - var allocatedUsage: Double = 0.0 - } - - internal inner class VmServerContext( - server: Server, - val events: EventFlow, - val domain: Domain - ) : ServerManagementContext { - private var finalized: Boolean = false - lateinit var requests: List - lateinit var burst: LongArray - var deadline: Long = 0L - var chan = Channel(Channel.RENDEZVOUS) - private var initialized: Boolean = false - - internal val job: Job = coroutineScope.launch { - delay(1) // TODO Introduce boot time - init() - try { - server.image(this@VmServerContext) - exit() - } catch (cause: Throwable) { - exit(cause) - } - } - - override var server: Server = server - set(value) { - if (field.state != value.state) { - events.emit(ServerEvent.StateChanged(value, field.state)) - } - - field = value - } - - override val cpus: List = hostContext.cpus.take(server.flavor.cpuCount) - - override suspend fun publishService(key: ServiceKey, service: T) { - server = server.copy(services = server.services.put(key, service)) - events.emit(ServerEvent.ServicePublished(server, key)) - } - - override suspend fun init() { - assert(!finalized) { "VM is already finalized" } - - server = server.copy(state = ServerState.ACTIVE) - initialized = true - } - - override suspend fun exit(cause: Throwable?) { - finalized = true - - val serverState = - if (cause == null || (cause is ShutdownException && cause.cause == null)) - ServerState.SHUTOFF - else - ServerState.ERROR - server = server.copy(state = serverState) - availableMemory += server.flavor.memorySize - vms.remove(this) - events.close() - eventFlow.emit(VirtDriverEvent.VmsUpdated(this@HypervisorVirtDriver, vms.size, availableMemory)) - } - - override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { - require(burst.size == limit.size) { "Array dimensions do not match" } - - this.deadline = deadline - this.burst = burst - requests = cpus.asSequence() - .take(burst.size) - .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) } - .toList() - - // Wait until the burst has been run or the coroutine is cancelled - try { - activeVms += this - reschedule() - chan.receive() - } catch (e: CancellationException) { - // On cancellation, we compute and return the remaining burst - e.assertFailure() - } finally { - activeVms -= this - reschedule() - } - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt deleted file mode 100644 index 926234b5..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt +++ /dev/null @@ -1,3 +0,0 @@ -package com.atlarge.opendc.compute.virt.driver.hypervisor - -public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 8365f8c9..8393dfa9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -8,22 +8,26 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage -import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException +import com.atlarge.opendc.compute.virt.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, private val ctx: SimulationContext, private val provisioningService: ProvisioningService -) : VirtProvisioningService { +) : VirtProvisioningService, CoroutineScope by ctx.domain { /** * The hypervisors that have been launched by the service. */ @@ -45,18 +49,17 @@ class SimpleVirtProvisioningService( private val activeImages: MutableSet = mutableSetOf() init { - ctx.domain.launch { + launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage val node = provisioningService.deploy(node, hypervisorImage) node.server!!.events.onEach { event -> - when (event) { - is ServerEvent.StateChanged -> stateChanged(event.server, event.previousState) - is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) - } + when (event) { + is ServerEvent.StateChanged -> stateChanged(event.server) + is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) } - .launchIn(ctx.domain) + }.collect() } } } @@ -64,8 +67,8 @@ class SimpleVirtProvisioningService( override suspend fun deploy( image: Image, flavor: Flavor - ) { - val vmInstance = ImageView(image, flavor) + ): Server = suspendCancellableCoroutine { cont -> + val vmInstance = ImageView(image, flavor, cont) incomingImages += vmInstance requestCycle() } @@ -77,7 +80,7 @@ class SimpleVirtProvisioningService( return } - val call = ctx.domain.launch { + val call = launch { schedule() } call.invokeOnCompletion { this.call = null } @@ -92,10 +95,12 @@ class SimpleVirtProvisioningService( try { println("Spawning ${imageInstance.image}") incomingImages -= imageInstance - imageInstance.server = selectedHv.driver.spawn( + val server = selectedHv.driver.spawn( imageInstance.image, imageInstance.flavor ) + imageInstance.server = server + imageInstance.continuation.resume(server) activeImages += imageInstance } catch (e: InsufficientMemoryOnServerException) { println("Unable to deploy image due to insufficient memory") @@ -103,7 +108,7 @@ class SimpleVirtProvisioningService( } } - private fun stateChanged(server: Server, previousState: ServerState) { + private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { val hvView = HypervisorView( @@ -134,6 +139,7 @@ class SimpleVirtProvisioningService( data class ImageView( val image: Image, val flavor: Flavor, + val continuation: Continuation, var server: Server? = null ) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index da72d742..12543ce3 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy @@ -16,5 +17,5 @@ interface VirtProvisioningService { * @param image The image to be deployed. * @param flavor The flavor of the machine instance to run this [image] on. */ - public suspend fun deploy(image: Image, flavor: Flavor) + public suspend fun deploy(image: Image, flavor: Flavor): Server } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index d86045c0..bcaafb59 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -30,6 +30,7 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.virt.HypervisorImage import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay @@ -75,8 +76,10 @@ internal class HypervisorTest { val flavor = Flavor(1, 0) val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] - vmDriver.spawn(workloadA, flavor).events.onEach { println(it) }.launchIn(this) - vmDriver.spawn(workloadB, flavor) + val vmA = vmDriver.spawn(workloadA, flavor) + vmA.events.onEach { println(it) }.launchIn(this) + val vmB = vmDriver.spawn(workloadB, flavor) + vmB.events.onEach { println(it) }.launchIn(this) } runBlocking { diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index d5e1404a..b0182ab3 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -29,8 +29,8 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.WorkflowEvent import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy @@ -38,12 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File @@ -62,28 +62,6 @@ fun main(args: Array) { var finished = 0 val token = Channel() - - val monitor = object : WorkflowMonitor { - override suspend fun onJobStart(job: Job, time: Long) { - println("Job ${job.uid} started") - } - - override suspend fun onJobFinish(job: Job, time: Long) { - finished += 1 - println("Jobs $finished/$total finished (${job.tasks.size} tasks)") - - if (finished == total) { - token.send(true) - } - } - - override suspend fun onTaskStart(job: Job, task: Task, time: Long) { - } - - override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { - } - } - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider(name = "sim") @@ -106,6 +84,27 @@ fun main(args: Array) { } val broker = system.newDomain(name = "broker") + + broker.launch { + val scheduler = schedulerAsync.await() + scheduler.events + .onEach { event -> + when (event) { + is WorkflowEvent.JobStarted -> { + println("Job ${event.job.uid} started") + } + is WorkflowEvent.JobFinished -> { + finished += 1 + println("Jobs $finished/$total finished (${event.job.tasks.size} tasks)") + + if (finished == total) { + token.send(true) + } + } + } + } + .collect() + } broker.launch { val ctx = simulationContext val reader = GwfTraceReader(File(args[0])) @@ -115,7 +114,7 @@ fun main(args: Array) { val (time, job) = reader.next() total += 1 delay(max(0, time * 1000 - ctx.clock.millis())) - scheduler.submit(job, monitor) + scheduler.submit(job) } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 0f4d0c1b..e18bbe30 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -2,9 +2,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.flow.first import java.io.BufferedWriter import java.io.Closeable @@ -12,7 +10,7 @@ import java.io.FileWriter class Sc20Monitor( destination: String -) : HypervisorMonitor, ServerMonitor, Closeable { +) : Closeable { private val outputFile = BufferedWriter(FileWriter(destination)) private var failed: Int = 0 @@ -20,14 +18,14 @@ class Sc20Monitor( outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - override fun stateChanged(server: Server, previousState: ServerState) { + fun stateChanged(server: Server) { println("${server.uid} ${server.state}") if (server.state == ServerState.ERROR) { failed++ } } - override suspend fun onSliceFinish( + suspend fun onSliceFinish( time: Long, requestedBurst: Long, grantedBurst: Long, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 4273c39e..96033ea7 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy @@ -41,6 +42,8 @@ import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File @@ -107,11 +110,10 @@ fun main(args: Array) { println(simulationContext.clock.instant()) val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] - val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, - bareMetalProvisioner, + bareMetalProvisioner ) val faultInjectorDomain = root.newDomain(name = "failures") @@ -131,8 +133,11 @@ fun main(args: Array) { while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) - chan.send(Unit) - scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) + launch { + chan.send(Unit) + val server = scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) + server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() + } } println(simulationContext.clock.instant()) diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt deleted file mode 100644 index 3c77d57a..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.monitor - -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task - -/** - * An interface for monitoring the progression of workflows. - */ -public interface WorkflowMonitor { - /** - * This method is invoked when a job has become active. - */ - public suspend fun onJobStart(job: Job, time: Long) - - /** - * This method is invoked when a job has finished processing. - */ - public suspend fun onJobFinish(job: Job, time: Long) - - /** - * This method is invoked when a task of a job has started processing. - */ - public suspend fun onTaskStart(job: Job, task: Task, time: Long) - - /** - * This method is invoked when a task has finished processing. - */ - public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt index b444f91c..1cb2de97 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt @@ -24,10 +24,9 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job -class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) { +class JobState(val job: Job, val submittedAt: Long) { /** * A flag to indicate whether this job is finished. */ diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index a055a3fe..7a20363c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -25,13 +25,13 @@ package com.atlarge.opendc.workflows.service import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy @@ -39,6 +39,11 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import java.util.PriorityQueue import java.util.Queue import kotlinx.coroutines.launch @@ -58,7 +63,7 @@ class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy, resourceFilterPolicy: ResourceFilterPolicy, resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowService, ServerMonitor { +) : WorkflowService, CoroutineScope by domain { /** * The incoming jobs ready to be processed by the scheduler. @@ -167,6 +172,7 @@ class StageWorkflowService( private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private val resourceFilterPolicy: ResourceFilterPolicy.Logic private val resourceSelectionPolicy: Comparator + private val eventFlow = EventFlow() init { domain.launch { @@ -183,9 +189,11 @@ class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) { + override val events: Flow = eventFlow + + override suspend fun submit(job: Job) = withContext(domain.coroutineContext) { // J1 Incoming Jobs - val jobInstance = JobState(job, monitor, simulationContext.clock.millis()) + val jobInstance = JobState(job, simulationContext.clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -217,6 +225,7 @@ class StageWorkflowService( /** * Perform a scheduling cycle immediately. */ + @OptIn(ExperimentalCoroutinesApi::class) internal suspend fun schedule() { // J2 Create list of eligible jobs val iterator = incomingJobs.iterator() @@ -232,7 +241,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis())) rootListener.jobStarted(jobInstance) } @@ -280,10 +289,13 @@ class StageWorkflowService( // T4 Submit task to machine available -= host instance.state = TaskStatus.ACTIVE - - val newHost = provisioningService.deploy(host, instance.task.image, this) + val newHost = provisioningService.deploy(host, instance.task.image) + val server = newHost.server!! instance.host = newHost - taskByServer[newHost.server!!] = instance + taskByServer[server] = instance + server.events + .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } + .launchIn(this) activeTasks += instance taskQueue.poll() @@ -294,50 +306,48 @@ class StageWorkflowService( } } - override fun stateChanged(server: Server, previousState: ServerState) { - domain.launch { - when (server.state) { - ServerState.ACTIVE -> { - val task = taskByServer.getValue(server) - task.startedAt = simulationContext.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) - rootListener.taskStarted(task) - } - ServerState.SHUTOFF, ServerState.ERROR -> { - val task = taskByServer.remove(server) ?: throw IllegalStateException() - val job = task.job - task.state = TaskStatus.FINISHED - task.finishedAt = simulationContext.clock.millis() - job.tasks.remove(task) - available += task.host!! - activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) - rootListener.taskFinished(task) - - // Add job roots to the scheduling queue - for (dependent in task.dependents) { - if (dependent.state != TaskStatus.READY) { - continue - } - - incomingTasks += dependent - rootListener.taskReady(dependent) + private suspend fun stateChanged(server: Server) { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.startedAt = simulationContext.clock.millis() + eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskStarted(task) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskStatus.FINISHED + task.finishedAt = simulationContext.clock.millis() + job.tasks.remove(task) + available += task.host!! + activeTasks -= task + eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue } - if (job.isFinished) { - finishJob(job) - } + incomingTasks += dependent + rootListener.taskReady(dependent) + } - requestCycle() + if (job.isFinished) { + finishJob(job) } - else -> throw IllegalStateException() + + requestCycle() } + else -> throw IllegalStateException() } } private suspend fun finishJob(job: JobState) { activeJobs -= job - job.monitor.onJobFinish(job.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis())) rootListener.jobFinished(job) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt new file mode 100644 index 00000000..2ca5a19d --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task + +/** + * An event emitted by the [WorkflowService]. + */ +public sealed class WorkflowEvent { + /** + * The [WorkflowService] that emitted the event. + */ + public abstract val service: WorkflowService + + /** + * This event is emitted when a job has become active. + */ + public data class JobStarted( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a job has finished processing. + */ + public data class JobFinished( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskStarted( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskFinished( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index 524f4f9e..38ea49c4 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.core.services.AbstractServiceKey -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -35,10 +35,15 @@ import java.util.UUID * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al. */ public interface WorkflowService { + /** + * Thie events emitted by the workflow scheduler. + */ + public val events: Flow + /** * Submit the specified [Job] to the workflow service for scheduling. */ - public suspend fun submit(job: Job, monitor: WorkflowMonitor) + public suspend fun submit(job: Job) /** * The service key for the workflow scheduler. diff --git a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 19e56482..5ee6d5e6 100644 --- a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -29,17 +29,16 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import kotlinx.coroutines.async import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals @@ -64,24 +63,6 @@ internal class StageWorkflowSchedulerIntegrationTest { var tasksStarted = 0L var tasksFinished = 0L - val monitor = object : WorkflowMonitor { - override suspend fun onJobStart(job: Job, time: Long) { - jobsStarted++ - } - - override suspend fun onJobFinish(job: Job, time: Long) { - jobsFinished++ - } - - override suspend fun onTaskStart(job: Job, task: Task, time: Long) { - tasksStarted++ - } - - override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { - tasksFinished++ - } - } - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider(name = "sim") @@ -104,6 +85,21 @@ internal class StageWorkflowSchedulerIntegrationTest { } val broker = system.newDomain(name = "broker") + + broker.launch { + val scheduler = schedulerAsync.await() + scheduler.events + .onEach { event -> + when (event) { + is WorkflowEvent.JobStarted -> jobsStarted++ + is WorkflowEvent.JobFinished -> jobsFinished++ + is WorkflowEvent.TaskStarted -> tasksStarted++ + is WorkflowEvent.TaskFinished -> tasksFinished++ + } + } + .collect() + } + broker.launch { val ctx = simulationContext val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) @@ -113,7 +109,7 @@ internal class StageWorkflowSchedulerIntegrationTest { val (time, job) = reader.next() jobsSubmitted++ delay(max(0, time * 1000 - ctx.clock.millis())) - scheduler.submit(job, monitor) + scheduler.submit(job) } } -- cgit v1.2.3