diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-03-26 13:55:32 +0100 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-03-26 13:55:32 +0100 |
| commit | 620f194c53d950a37f78577f4aacfd7c0c06bb9a (patch) | |
| tree | f5f7ffdce8efdcffb92e158ebbb643ba1a797b23 | |
| parent | f4ee29bb97aed68329e72710dd3049c23f592f25 (diff) | |
| parent | 7eb8177e2278bde2c0f4fad00af6fdd2d632cb5b (diff) | |
Merge branch 'feat/2.x-failures' into '2.x'
Implement basic hardware-level failures
See merge request opendc/opendc-simulator!35
54 files changed, 1391 insertions, 547 deletions
diff --git a/buildSrc/src/main/kotlin/library.kt b/buildSrc/src/main/kotlin/library.kt index 6333e351..3b05f3a4 100644 --- a/buildSrc/src/main/kotlin/library.kt +++ b/buildSrc/src/main/kotlin/library.kt @@ -45,5 +45,5 @@ object Library { /** * Kotlin coroutines support */ - val KOTLINX_COROUTINES = "1.3.4" + val KOTLINX_COROUTINES = "1.3.5" } diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt new file mode 100644 index 00000000..5d9af9ec --- /dev/null +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt @@ -0,0 +1,99 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.flow + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.consumeAsFlow +import java.util.WeakHashMap + +/** + * A [Flow] that can be used to emit events. + */ +public interface EventFlow<T> : Flow<T> { + /** + * Emit the specified [event]. + */ + public fun emit(event: T) + + /** + * Close the flow. + */ + public fun close() +} + +/** + * Creates a new [EventFlow]. + */ +@Suppress("FunctionName") +public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl() + +/** + * Internal implementation of the [EventFlow] class. + */ +@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) +private class EventFlowImpl<T> : EventFlow<T> { + private var closed: Boolean = false + private val subscribers = WeakHashMap<SendChannel<T>, Unit>() + + override fun emit(event: T) { + synchronized(this) { + for ((chan, _) in subscribers) { + chan.offer(event) + } + } + } + + override fun close() { + synchronized(this) { + closed = true + + for ((chan, _) in subscribers) { + chan.close() + } + } + } + + @InternalCoroutinesApi + override suspend fun collect(collector: FlowCollector<T>) { + val channel: Channel<T> + synchronized(this) { + if (closed) { + return + } + + channel = Channel(Channel.UNLIMITED) + subscribers[channel] = Unit + } + channel.consumeAsFlow().collect(collector) + } + + override fun toString(): String = "EventFlow" +} diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/signal/Signal.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/StateFlow.kt index da6298a3..50add0ad 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/signal/Signal.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/StateFlow.kt @@ -22,13 +22,13 @@ * SOFTWARE. */ -package com.atlarge.odcsim.signal +package com.atlarge.odcsim.flow import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.asFlow @@ -40,9 +40,9 @@ import kotlinx.coroutines.flow.asFlow * in the future, but is not available yet. * See: https://github.com/Kotlin/kotlinx.coroutines/pull/1354 */ -public interface Signal<T> : Flow<T> { +public interface StateFlow<T> : Flow<T> { /** - * The current value of this signal. + * The current value of this flow. * * Setting a value that is [equal][Any.equals] to the previous one does nothing. */ @@ -50,39 +50,30 @@ public interface Signal<T> : Flow<T> { } /** - * Creates a [Signal] with a given initial [value]. + * Creates a [StateFlow] with a given initial [value]. */ @Suppress("FunctionName") -public fun <T> Signal(value: T): Signal<T> = SignalImpl(value) +public fun <T> StateFlow(value: T): StateFlow<T> = StateFlowImpl(value) /** - * Internal implementation of the [Signal] interface. + * Internal implementation of the [StateFlow] interface. */ -private class SignalImpl<T>(initialValue: T) : Signal<T> { +@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) +private class StateFlowImpl<T>(initialValue: T) : StateFlow<T> { /** - * The [BroadcastChannel] to back this signal. + * The [BroadcastChannel] to back this flow. */ - @OptIn(ExperimentalCoroutinesApi::class) - private val chan = BroadcastChannel<T>(Channel.CONFLATED) + private val chan = ConflatedBroadcastChannel(initialValue) /** - * The internal [Flow] backing this signal. + * The internal [Flow] backing this flow. */ - @OptIn(FlowPreview::class) private val flow = chan.asFlow() - init { - @OptIn(ExperimentalCoroutinesApi::class) - chan.offer(initialValue) - } - - @OptIn(ExperimentalCoroutinesApi::class) public override var value: T = initialValue set(value) { - if (field != value) { - chan.offer(value) - field = value - } + chan.offer(value) + field = value } @InternalCoroutinesApi diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 4edf94d2..934af293 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -37,6 +37,7 @@ import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle @@ -174,6 +175,10 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine } } + private val exceptionHandler = CoroutineExceptionHandler { _, exception -> + log.error("Uncaught exception", exception) + } + // SimulationContext override val key: CoroutineContext.Key<*> = SimulationContext.Key @@ -192,7 +197,7 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine override val parent: Domain = parent ?: this @InternalCoroutinesApi - override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job + override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job + exceptionHandler override fun toString(): String = path } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index 86ec9a5b..01968cd8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.services.ServiceRegistryImpl +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -68,7 +68,12 @@ public data class Server( /** * The services published by this server. */ - public val serviceRegistry: ServiceRegistry = ServiceRegistryImpl() + public val services: ServiceRegistry, + + /** + * The events that are emitted by the server. + */ + public val events: Flow<ServerEvent> ) : Resource { override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean = other is Server && uid == other.uid diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt new file mode 100644 index 00000000..1595937c --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core + +import com.atlarge.opendc.core.services.ServiceKey + +/** + * An event that is emitted by a [Server]. + */ +public sealed class ServerEvent { + /** + * The server that emitted the event. + */ + public abstract val server: Server + + /** + * This event is emitted when the state of [server] changes. + * + * @property server The server of which the state changed. + * @property previousState The previous state of the server. + */ + public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent() + + /** + * This event is emitted when a server publishes a service. + * + * @property server The server that published the service. + * @property key The service key of the service that was published. + */ + public data class ServicePublished(override val server: Server, val key: ServiceKey<*>) : ServerEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index b09a5a7d..e0a491c8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -27,10 +27,10 @@ package com.atlarge.opendc.compute.core.execution import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.core.services.AbstractServiceKey +import com.atlarge.opendc.core.services.ServiceKey /** - * Represents the execution context in which an bootable [Image] runs on a [Server]. + * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** @@ -44,11 +44,9 @@ public interface ServerContext { public val cpus: List<ProcessingUnit> /** - * Publishes the given [service] with key [serviceKey] in the server's registry. + * Publish the specified [service] at the given [ServiceKey]. */ - public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) { - server.serviceRegistry[serviceKey] = service - } + public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) /** * Request the specified burst time from the processor cores and suspend execution until a processor core finishes diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt new file mode 100644 index 00000000..e4da557b --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core.execution + +import kotlinx.coroutines.CancellationException + +/** + * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown flow + * has been sent to the server. + */ +public class ShutdownException(message: String? = null, override val cause: Throwable? = null) : CancellationException(message) + +/** + * This method terminates the current active coroutine if the specified [CancellationException] is caused + * by a shutdown. + */ +public fun CancellationException.assertShutdown() { + if (this is ShutdownException) { + throw this + } +} + +/** + * This method terminates the current active coroutine if the specified [CancellationException] is caused + * by a failure. + */ +public fun CancellationException.assertFailure() { + if (this is ShutdownException && cause != null) { + throw this + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index 107237ea..e77b55a6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,7 +26,7 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.isActive +import kotlinx.coroutines.ensureActive import java.util.UUID import kotlin.coroutines.coroutineContext import kotlin.math.min @@ -64,11 +64,8 @@ data class FlopsApplicationImage( val burst = LongArray(cores) { flops / cores } val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - while (coroutineContext.isActive) { - if (burst.all { it == 0L }) { - break - } - + while (burst.any { it != 0L }) { + coroutineContext.ensureActive() ctx.run(burst, maxUsage, Long.MAX_VALUE) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt index 5fce3f48..a3a851fe 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt @@ -24,17 +24,11 @@ package com.atlarge.opendc.compute.metal -/** - * The power state of a compute node. +/* + * Common metadata keys for bare-metal nodes. */ -public enum class PowerState { - /** - * Node is powered on. - */ - POWER_ON, - /** - * Node is powered off. - */ - POWER_OFF, -} +/** + * The cluster to which the node belongs. + */ +const val NODE_CLUSTER = "bare-metal:cluster" diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index a43abfe9..7cb4c0c5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -27,12 +27,13 @@ package com.atlarge.opendc.compute.metal import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity +import kotlinx.coroutines.flow.Flow import java.util.UUID /** * A bare-metal compute node. */ -data class Node( +public data class Node( /** * The unique identifier of the node. */ @@ -44,9 +45,14 @@ data class Node( public override val name: String, /** - * The power state of the node. + * Metadata of the node. */ - public val powerState: PowerState, + public val metadata: Map<String, Any>, + + /** + * The last known state of the compute node. + */ + public val state: NodeState, /** * The boot image of the node. @@ -56,7 +62,12 @@ data class Node( /** * The server instance that is running on the node or `null` if no server is running. */ - public val server: Server? + public val server: Server?, + + /** + * The events that are emitted by the node. + */ + public val events: Flow<NodeEvent> ) : Identity { override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean = other is Node && uid == other.uid diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt index fbfd0ad6..7719db24 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt @@ -22,20 +22,22 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.core.monitor - -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState +package com.atlarge.opendc.compute.metal /** - * An interface for monitoring the state of a machine. + * An event that is emitted by a [Node]. */ -public interface ServerMonitor { +public sealed class NodeEvent { + /** + * The node that emitted the event. + */ + public abstract val node: Node + /** - * This method is invoked when the state of a machine updates. + * This event is emitted when the state of [node] changes. * - * @param server The server which state was updated. - * @param previousState The previous state of the server. + * @property node The node of which the state changed. + * @property previousState The previous state of the node. */ - public suspend fun onUpdate(server: Server, previousState: ServerState) + public data class StateChanged(override val node: Node, val previousState: NodeState) : NodeEvent() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt index 3c77d57a..ca9cf509 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt @@ -22,32 +22,34 @@ * SOFTWARE. */ -package com.atlarge.opendc.workflows.monitor - -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task +package com.atlarge.opendc.compute.metal /** - * An interface for monitoring the progression of workflows. + * An enumeration describing the possible states of a bare-metal compute node. */ -public interface WorkflowMonitor { +public enum class NodeState { + /** + * The node is booting. + */ + BOOT, + /** - * This method is invoked when a job has become active. + * The node is powered off. */ - public suspend fun onJobStart(job: Job, time: Long) + SHUTOFF, /** - * This method is invoked when a job has finished processing. + * The node is active and running. */ - public suspend fun onJobFinish(job: Job, time: Long) + ACTIVE, /** - * This method is invoked when a task of a job has started processing. + * The node is in error. */ - public suspend fun onTaskStart(job: Job, task: Task, time: Long) + ERROR, /** - * This method is invoked when a task has finished processing. + * The state of the node is unknown. */ - public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) + UNKNOWN, } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 1214dd36..41cec291 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -26,9 +26,8 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.PowerState +import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.power.Powerable import com.atlarge.opendc.core.services.AbstractServiceKey import kotlinx.coroutines.flow.Flow @@ -37,7 +36,12 @@ import java.util.UUID /** * A driver interface for the management interface of a bare-metal compute node. */ -public interface BareMetalDriver : Powerable { +public interface BareMetalDriver : Powerable, FailureDomain { + /** + * The [Node] that is controlled by this driver. + */ + public val node: Flow<Node> + /** * The amount of work done by the machine in percentage with respect to the total amount of processing power * available. @@ -47,12 +51,22 @@ public interface BareMetalDriver : Powerable { /** * Initialize the driver. */ - public suspend fun init(monitor: ServerMonitor): Node + public suspend fun init(): Node + + /** + * Start the bare metal node with the specified boot disk image. + */ + public suspend fun start(): Node + + /** + * Stop the bare metal node if it is running. + */ + public suspend fun stop(): Node /** - * Update the power state of the compute node. + * Reboot the bare metal node. */ - public suspend fun setPower(powerState: PowerState): Node + public suspend fun reboot(): Node /** * Update the boot disk image of the compute node. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index c7dc74cf..4a40dc9f 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -25,23 +25,31 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.signal.Signal +import com.atlarge.odcsim.flow.EventFlow +import com.atlarge.odcsim.flow.StateFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.execution.ShutdownException +import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.PowerState +import com.atlarge.opendc.compute.metal.NodeEvent +import com.atlarge.opendc.compute.metal.NodeState import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel +import com.atlarge.opendc.core.services.ServiceKey +import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch @@ -50,6 +58,7 @@ import kotlin.math.ceil import kotlin.math.max import kotlin.math.min import kotlinx.coroutines.withContext +import java.lang.Exception /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -57,6 +66,7 @@ import kotlinx.coroutines.withContext * @param domain The simulation domain the driver runs in. * @param uid The unique identifier of the machine. * @param name An optional name of the machine. + * @param metadata The initial metadata of the node. * @param cpus The CPUs available to the bare metal machine. * @param memoryUnits The memory units in this machine. * @param powerModel The power model of this machine. @@ -65,128 +75,175 @@ public class SimpleBareMetalDriver( private val domain: Domain, uid: UUID, name: String, + metadata: Map<String, Any>, val cpus: List<ProcessingUnit>, val memoryUnits: List<MemoryUnit>, - powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel(0.0) + powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel( + 0.0 + ) ) : BareMetalDriver { /** - * The monitor to use. + * The flavor that corresponds to this machine. */ - private lateinit var monitor: ServerMonitor + private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum()) /** - * The machine state. + * The current active server context. */ - private var node: Node = Node(uid, name, PowerState.POWER_OFF, EmptyImage, null) + private var serverContext: BareMetalServerContext? = null /** - * The flavor that corresponds to this machine. + * The events of the machine. */ - private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum()) + private val events = EventFlow<NodeEvent>() /** - * The job that is running the image. + * The flow containing the load of the server. */ - private var job: Job? = null + private val usageState = StateFlow(0.0) /** - * The signal containing the load of the server. + * The machine state. */ - private val usageSignal = Signal(0.0) + private val nodeState = StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) + + override val node: Flow<Node> = nodeState - override val usage: Flow<Double> = usageSignal + override val usage: Flow<Double> = usageState override val powerDraw: Flow<Double> = powerModel(this) - override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { - this@SimpleBareMetalDriver.monitor = monitor - return@withContext node + override suspend fun init(): Node = withContext(domain.coroutineContext) { + nodeState.value } - override suspend fun setPower(powerState: PowerState): Node = withContext(domain.coroutineContext) { - val previousPowerState = node.powerState - val server = when (node.powerState to powerState) { - PowerState.POWER_OFF to PowerState.POWER_OFF -> null - PowerState.POWER_OFF to PowerState.POWER_ON -> Server( - UUID.randomUUID(), - node.name, - emptyMap(), - flavor, - node.image, - ServerState.BUILD - ) - PowerState.POWER_ON to PowerState.POWER_OFF -> null // TODO Terminate existing image - PowerState.POWER_ON to PowerState.POWER_ON -> node.server - else -> throw IllegalStateException() + override suspend fun start(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value + if (node.state != NodeState.SHUTOFF) { + return@withContext node } - server?.serviceRegistry?.set(BareMetalDriver.Key, this@SimpleBareMetalDriver) - node = node.copy(powerState = powerState, server = server) - if (powerState != previousPowerState && server != null) { - launch() + val events = EventFlow<ServerEvent>() + val server = Server( + UUID.randomUUID(), + node.name, + emptyMap(), + flavor, + node.image, + ServerState.BUILD, + ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver), + events + ) + + setNode(node.copy(state = NodeState.BOOT, server = server)) + serverContext = BareMetalServerContext(events) + return@withContext nodeState.value + } + + override suspend fun stop(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value + if (node.state == NodeState.SHUTOFF) { + return@withContext node } + // We terminate the image running on the machine + serverContext!!.cancel(fail = false) + serverContext = null + + setNode(node.copy(state = NodeState.SHUTOFF, server = null)) return@withContext node } + override suspend fun reboot(): Node = withContext(domain.coroutineContext) { + stop() + start() + } + override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - node = node.copy(image = image) - return@withContext node + setNode(nodeState.value.copy(image = image)) + return@withContext nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } + override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } - /** - * Launch the server image on the machine. - */ - private suspend fun launch() { - val serverContext = serverCtx + private fun setNode(value: Node) { + val field = nodeState.value + if (field.state != value.state) { + events.emit(NodeEvent.StateChanged(value, field.state)) + } - job = domain.launch { - serverContext.init() - try { - node.server!!.image(serverContext) - serverContext.exit() - } catch (cause: Throwable) { - serverContext.exit(cause) - } + if (field.server != null && value.server != null && field.server.state != value.server.state) { + serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state)) } + + nodeState.value = value } - private val serverCtx = object : ServerManagementContext { - private var initialized: Boolean = false + private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext { + private var finalized: Boolean = false override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus - override var server: Server - get() = node.server!! - set(value) { - node = node.copy(server = value) + override val server: Server + get() = nodeState.value.server!! + + private val job = domain.launch { + delay(1) // TODO Introduce boot time + init() + try { + server.image(this@BareMetalServerContext) + exit() + } catch (cause: Throwable) { + exit(cause) } + } + + /** + * Cancel the image running on the machine. + */ + suspend fun cancel(fail: Boolean) { + if (fail) + job.cancel(ShutdownException(cause = Exception("Random failure"))) + else + job.cancel(ShutdownException()) + job.join() + } + + override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { + val server = server.copy(services = server.services.put(key, service)) + setNode(nodeState.value.copy(server = server)) + events.emit(ServerEvent.ServicePublished(server, key)) + } override suspend fun init() { - if (initialized) { - throw IllegalStateException() - } + assert(!finalized) { "Machine is already finalized" } - val previousState = server.state - server = server.copy(state = ServerState.ACTIVE) - monitor.onUpdate(server, previousState) - initialized = true + val server = server.copy(state = ServerState.ACTIVE) + setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) } override suspend fun exit(cause: Throwable?) { - val previousState = server.state - val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR - server = server.copy(state = state) - initialized = false - domain.launch { monitor.onUpdate(server, previousState) } + finalized = true + + val newServerState = + if (cause == null || (cause is ShutdownException && cause.cause == null)) + ServerState.SHUTOFF + else + ServerState.ERROR + val newNodeState = + if (cause == null || (cause is ShutdownException && cause.cause != null)) + nodeState.value.state + else + NodeState.ERROR + val server = server.copy(state = newServerState) + setNode(nodeState.value.copy(state = newNodeState, server = server)) } private var flush: Job? = null override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { require(burst.size == limit.size) { "Array dimensions do not match" } + assert(!finalized) { "Server instance is already finalized" } // If run is called in at the same timestamp as the previous call, cancel the load flush flush?.cancel() @@ -209,19 +266,20 @@ public class SimpleBareMetalDriver( } } - usageSignal.value = totalUsage / cpus.size + usageState.value = totalUsage / cpus.size try { delay(duration) - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst + } catch (e: CancellationException) { + // On non-failure cancellation, we compute and return the remaining burst + e.assertFailure() } val end = simulationContext.clock.millis() // Flush the load if the do not receive a new run call for the same timestamp - flush = domain.launch { + flush = domain.launch(job) { delay(1) - usageSignal.value = 0.0 + usageState.value = 0.0 } flush!!.invokeOnCompletion { flush = null @@ -235,4 +293,14 @@ public class SimpleBareMetalDriver( } } } + + override val scope: CoroutineScope + get() = domain + + override suspend fun fail() { + serverContext?.cancel(fail = true) + domain.cancel() + } + + override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})" } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt index 24ade799..105505f2 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.core.services.AbstractServiceKey @@ -53,7 +52,7 @@ public interface ProvisioningService { /** * Deploy the specified [Image] on a compute node. */ - public suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node + public suspend fun deploy(node: Node, image: Image): Node /** * The service key of this service. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index b18a4006..a7e143aa 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -25,31 +25,22 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.Domain -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.PowerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, ServerMonitor { +public class SimpleProvisioningService(val domain: Domain) : ProvisioningService { /** * The active nodes in this service. */ private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf() - /** - * The installed monitors. - */ - private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { - val node = driver.init(this@SimpleProvisioningService) + val node = driver.init() nodes[node] = driver return@withContext node } @@ -60,19 +51,10 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService return@withContext nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { + override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) { val driver = nodes[node]!! - driver.setImage(image) - driver.setPower(PowerState.POWER_OFF) - val newNode = driver.setPower(PowerState.POWER_ON) - monitors[newNode.server!!] = monitor + val newNode = driver.reboot() return@withContext newNode } - - override suspend fun onUpdate(server: Server, previousState: ServerState) { - withContext(domain.coroutineContext) { - monitors[server]?.onUpdate(server, previousState) - } - } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt new file mode 100644 index 00000000..69b0124d --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.core.Identity +import kotlinx.coroutines.flow.Flow +import java.util.UUID + +/** + * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment + * into several virtual guest machines. + */ +public class Hypervisor( + /** + * The unique identifier of the hypervisor. + */ + override val uid: UUID, + + /** + * The optional name of the hypervisor. + */ + override val name: String, + + /** + * Metadata of the hypervisor. + */ + public val metadata: Map<String, Any>, + + /** + * The events that are emitted by the hypervisor. + */ + public val events: Flow<HypervisorEvent> +) : Identity { + override fun hashCode(): Int = uid.hashCode() + override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt new file mode 100644 index 00000000..5c19b00d --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.virt.driver.VirtDriver + +/** + * An event that is emitted by a [VirtDriver]. + */ +public sealed class HypervisorEvent { + /** + * The driver that emitted the event. + */ + public abstract val driver: VirtDriver + + /** + * This event is emitted when the number of active servers on the server managed by this driver is updated. + * + * @property driver The driver that emitted the event. + * @property numberOfActiveServers The number of active servers. + * @property availableMemory The available memory, in MB. + */ + public data class VmsUpdated( + override val driver: VirtDriver, + public val numberOfActiveServers: Int, + public val availableMemory: Long + ) : HypervisorEvent() + + /** + * This event is emitted when a slice is finished. + * + * @property driver The driver that emitted the event. + * @property requestedBurst The total requested CPU time (can be above capacity). + * @property grantedBurst The actual total granted capacity. + * @property numberOfDeployedImages The number of images deployed on this hypervisor. + */ + public data class SliceFinished( + override val driver: VirtDriver, + public val requestedBurst: Long, + public val grantedBurst: Long, + public val numberOfDeployedImages: Int, + public val hostServer: Server + ) : HypervisorEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index 8d055953..c21b002d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -22,32 +22,36 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine import java.util.UUID /** * A hypervisor managing the VMs of a node. */ -class HypervisorImage( - private val hypervisorMonitor: HypervisorMonitor -) : Image { +object HypervisorImage : Image { override val uid: UUID = UUID.randomUUID() override val name: String = "vmm" override val tags: TagContainer = emptyMap() override suspend fun invoke(ctx: ServerContext) { - val driver = HypervisorVirtDriver(ctx, hypervisorMonitor) + coroutineScope { + val driver = SimpleVirtDriver(ctx, this) + ctx.publishService(VirtDriver.Key, driver) - ctx.publishService(VirtDriver.Key, driver) - - // Suspend image until it is cancelled - suspendCancellableCoroutine<Unit> {} + // Suspend image until it is cancelled + try { + suspendCancellableCoroutine<Unit> {} + } finally { + driver.eventFlow.close() + } + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt index 926234b5..0586ae00 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt @@ -1,3 +1,3 @@ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt.driver public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 430e5a37..76368080 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -22,26 +22,34 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt.driver -import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.execution.ShutdownException +import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.core.services.ServiceKey +import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -51,11 +59,18 @@ import kotlin.math.min /** * A [VirtDriver] that is backed by a simple hypervisor implementation. */ -class HypervisorVirtDriver( +@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) +class SimpleVirtDriver( private val hostContext: ServerContext, - private val monitor: HypervisorMonitor + private val coroutineScope: CoroutineScope ) : VirtDriver { /** + * The [Server] on which this hypervisor runs. + */ + private val server: Server + get() = hostContext.server + + /** * A set for tracking the VM context objects. */ internal val vms: MutableSet<VmServerContext> = mutableSetOf() @@ -66,31 +81,38 @@ class HypervisorVirtDriver( private var availableMemory: Long = hostContext.server.flavor.memorySize /** - * Monitors to keep informed. + * The [EventFlow] to emit the events. */ - private val monitors: MutableSet<VirtDriverMonitor> = mutableSetOf() + internal val eventFlow = EventFlow<HypervisorEvent>() - override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server { + override val events: Flow<HypervisorEvent> = eventFlow + + override suspend fun spawn( + name: String, + image: Image, + flavor: Flavor + ): Server { val requiredMemory = flavor.memorySize if (availableMemory - requiredMemory < 0) { throw InsufficientMemoryOnServerException() } require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } - val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) + val events = EventFlow<ServerEvent>() + val server = Server( + UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD, + ServiceRegistry(), events + ) availableMemory -= requiredMemory - vms.add(VmServerContext(server, monitor, simulationContext)) - monitors.forEach { it.onUpdate(vms.size, availableMemory) } + vms.add(VmServerContext(server, events, simulationContext.domain)) + eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server } - override suspend fun addMonitor(monitor: VirtDriverMonitor) { - monitors.add(monitor) - } - - override suspend fun removeMonitor(monitor: VirtDriverMonitor) { - monitors.remove(monitor) - } + /** + * A flag to indicate the driver is stopped. + */ + private var stopped: Boolean = false /** * The set of [VmServerContext] instances that is being scheduled at the moment. @@ -108,12 +130,12 @@ class HypervisorVirtDriver( private suspend fun reschedule() { flush() - // Do not schedule a call if there is no work to schedule - if (activeVms.isEmpty()) { + // Do not schedule a call if there is no work to schedule or the driver stopped. + if (stopped || activeVms.isEmpty()) { return } - val call = simulationContext.domain.launch { + val call = coroutineScope.launch { val start = simulationContext.clock.millis() val vms = activeVms.toSet() @@ -200,16 +222,9 @@ class HypervisorVirtDriver( } } - monitor.onSliceFinish( - end, - totalBurst, - totalBurst - totalRemainder, - vms.size, - hostContext.server - ) + eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size, server)) } this.call = call - call.invokeOnCompletion { this.call = null } } /** @@ -217,9 +232,10 @@ class HypervisorVirtDriver( */ private fun flush() { val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its + // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its // completion. call.cancel() + this.call = null } /** @@ -238,17 +254,19 @@ class HypervisorVirtDriver( } internal inner class VmServerContext( - override var server: Server, - val monitor: ServerMonitor, - ctx: SimulationContext + server: Server, + val events: EventFlow<ServerEvent>, + val domain: Domain ) : ServerManagementContext { + private var finalized: Boolean = false lateinit var requests: List<CpuRequest> lateinit var burst: LongArray var deadline: Long = 0L var chan = Channel<Unit>(Channel.RENDEZVOUS) private var initialized: Boolean = false - internal val job: Job = ctx.domain.launch { + internal val job: Job = coroutineScope.launch { + delay(1) // TODO Introduce boot time init() try { server.image(this@VmServerContext) @@ -258,28 +276,42 @@ class HypervisorVirtDriver( } } + override var server: Server = server + set(value) { + if (field.state != value.state) { + events.emit(ServerEvent.StateChanged(value, field.state)) + } + + field = value + } + override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) + override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { + server = server.copy(services = server.services.put(key, service)) + events.emit(ServerEvent.ServicePublished(server, key)) + } + override suspend fun init() { - if (initialized) { - throw IllegalStateException() - } + assert(!finalized) { "VM is already finalized" } - val previousState = server.state server = server.copy(state = ServerState.ACTIVE) - monitor.onUpdate(server, previousState) initialized = true } override suspend fun exit(cause: Throwable?) { - val previousState = server.state - val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR - server = server.copy(state = state) + finalized = true + + val serverState = + if (cause == null || (cause is ShutdownException && cause.cause == null)) + ServerState.SHUTOFF + else + ServerState.ERROR + server = server.copy(state = serverState) availableMemory += server.flavor.memorySize - monitor.onUpdate(server, previousState) - initialized = false vms.remove(this) - monitors.forEach { it.onUpdate(vms.size, availableMemory) } + events.close() + eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) } override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { @@ -289,7 +321,14 @@ class HypervisorVirtDriver( this.burst = burst requests = cpus.asSequence() .take(burst.size) - .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) } + .mapIndexed { i, cpu -> + CpuRequest( + this, + cpu, + burst[i], + limit[i] + ) + } .toList() // Wait until the burst has been run or the coroutine is cancelled @@ -297,11 +336,13 @@ class HypervisorVirtDriver( activeVms += this reschedule() chan.receive() - } catch (_: CancellationException) { + } catch (e: CancellationException) { // On cancellation, we compute and return the remaining burst + e.assertFailure() + } finally { + activeVms -= this + reschedule() } - activeVms -= this - reschedule() } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index d889d0f9..1002d382 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -27,8 +27,9 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -37,28 +38,19 @@ import java.util.UUID */ public interface VirtDriver { /** + * The events emitted by the driver. + */ + public val events: Flow<HypervisorEvent> + + /** * Spawn the given [Image] on the compute resource of this driver. * + * @param name The name of the server to spawn. * @param image The image to deploy. - * @param monitor The monitor to use for the deployment of this particular image. * @param flavor The flavor of the server which this driver is controlling. * @return The virtual server spawned by this method. */ - public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server - - /** - * Adds the given [VirtDriverMonitor] to the list of monitors to keep informed on the state of this driver. - * - * @param monitor The monitor to keep informed. - */ - public suspend fun addMonitor(monitor: VirtDriverMonitor) - - /** - * Removes the given [VirtDriverMonitor] from the list of monitors. - * - * @param monitor The monitor to unsubscribe - */ - public suspend fun removeMonitor(monitor: VirtDriverMonitor) + public suspend fun spawn(name: String, image: Image, flavor: Flavor): Server companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt deleted file mode 100644 index cf2f4619..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt +++ /dev/null @@ -1,14 +0,0 @@ -package com.atlarge.opendc.compute.virt.driver - -/** - * Monitor for entities interested in the state of a [VirtDriver]. - */ -interface VirtDriverMonitor { - /** - * Called when the number of active servers on the server managed by this driver is updated. - * - * @param numberOfActiveServers The number of active servers. - * @param availableMemory The available memory, in MB. - */ - public suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt deleted file mode 100644 index 1e3981f6..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt +++ /dev/null @@ -1,25 +0,0 @@ -package com.atlarge.opendc.compute.virt.monitor - -import com.atlarge.opendc.compute.core.Server - -/** - * Monitoring interface for hypervisor-specific events. - */ -interface HypervisorMonitor { - /** - * Invoked after a scheduling slice has finished processed. - * - * @param time The current time (in ms). - * @param requestedBurst The total requested CPU time (can be above capacity). - * @param grantedBurst The actual total granted capacity. - * @param numberOfDeployedImages The number of images deployed on this hypervisor. - * @param hostServer The server hosting this hypervisor. - */ - suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - numberOfDeployedImages: Int, - hostServer: Server - ) -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt new file mode 100644 index 00000000..97842f18 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt @@ -0,0 +1,12 @@ +package com.atlarge.opendc.compute.virt.service + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.virt.driver.VirtDriver + +class HypervisorView( + var server: Server, + var numberOfActiveServers: Int, + var availableMemory: Long +) { + lateinit var driver: VirtDriver +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt deleted file mode 100644 index 41e67624..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt +++ /dev/null @@ -1,11 +0,0 @@ -package com.atlarge.opendc.compute.virt.service - -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage - -class NodeView( - val node: Node, - val hypervisor: HypervisorImage, - var numberOfActiveServers: Int, - var availableMemory: Long -) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 17960186..156521db 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -3,123 +3,175 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage -import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import com.atlarge.opendc.compute.virt.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.coroutines.yield +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withContext +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +@OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, private val ctx: SimulationContext, - private val provisioningService: ProvisioningService, - private val hypervisorMonitor: HypervisorMonitor -) : VirtProvisioningService, ServerMonitor { + private val provisioningService: ProvisioningService +) : VirtProvisioningService, CoroutineScope by ctx.domain { /** - * The nodes that are controlled by the service. + * The hypervisors that have been launched by the service. */ - internal lateinit var nodes: List<Node> + private val hypervisors: MutableMap<Server, HypervisorView> = mutableMapOf() /** - * The available nodes. + * The available hypervisors. */ - internal val availableNodes: MutableSet<NodeView> = mutableSetOf() + private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf() /** * The incoming images to be processed by the provisioner. */ - internal val incomingImages: MutableSet<ImageView> = mutableSetOf() + private val incomingImages: MutableSet<ImageView> = mutableSetOf() /** * The active images in the system. */ - internal val activeImages: MutableSet<ImageView> = mutableSetOf() + private val activeImages: MutableSet<ImageView> = mutableSetOf() init { - ctx.domain.launch { - val provisionedNodes = provisioningService.nodes().toList() - val deployedNodes = provisionedNodes.map { node -> - val hypervisorImage = HypervisorImage(hypervisorMonitor) - val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) - val nodeView = NodeView( - deployedNode, - hypervisorImage, - 0, - deployedNode.server!!.flavor.memorySize - ) - yield() - deployedNode.server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { - override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { - nodeView.numberOfActiveServers = numberOfActiveServers - nodeView.availableMemory = availableMemory + launch { + val provisionedNodes = provisioningService.nodes() + provisionedNodes.forEach { node -> + val hypervisorImage = HypervisorImage + val node = provisioningService.deploy(node, hypervisorImage) + node.server!!.events.onEach { event -> + when (event) { + is ServerEvent.StateChanged -> stateChanged(event.server) + is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) } - }) - nodeView + }.launchIn(this) } - nodes = deployedNodes.map { it.node } - availableNodes.addAll(deployedNodes) } } - override suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor) { - val vmInstance = ImageView(image, monitor, flavor) - incomingImages += vmInstance - requestCycle() + override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) { + availableHypervisors.map { it.driver }.toSet() + } + + override suspend fun deploy( + name: String, + image: Image, + flavor: Flavor + ): Server = withContext(coroutineContext) { + suspendCancellableCoroutine<Server> { cont -> + val vmInstance = ImageView(name, image, flavor, cont) + incomingImages += vmInstance + requestCycle() + } } + private var call: Job? = null + private fun requestCycle() { - ctx.domain.launch { + if (call != null) { + return + } + + val call = launch { + delay(1) + this@SimpleVirtProvisioningService.call = null schedule() } + this.call = call } private suspend fun schedule() { val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { - println("Spawning $imageInstance") + val selectedHv = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break + try { + println("Spawning ${imageInstance.image}") + incomingImages -= imageInstance - val selectedNode = availableNodes.minWith(allocationPolicy().thenBy { it.node.uid }) + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + selectedHv.numberOfActiveServers++ + selectedHv.availableMemory -= (imageInstance.image as VmImage).requiredMemory // XXX Temporary hack - try { - imageInstance.server = selectedNode?.node!!.server!!.serviceRegistry[VirtDriver.Key].spawn( + val server = selectedHv.driver.spawn( + imageInstance.name, imageInstance.image, - imageInstance.monitor, imageInstance.flavor ) + imageInstance.server = server + imageInstance.continuation.resume(server) activeImages += imageInstance } catch (e: InsufficientMemoryOnServerException) { println("Unable to deploy image due to insufficient memory") - } - incomingImages -= imageInstance + selectedHv.numberOfActiveServers-- + selectedHv.availableMemory += (imageInstance.image as VmImage).requiredMemory + } } } - override suspend fun onUpdate(server: Server, previousState: ServerState) { + private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { - // TODO handle hypervisor server becoming active + val hvView = HypervisorView( + server, + 0, + server.flavor.memorySize + ) + hypervisors[server] = hvView } ServerState.SHUTOFF, ServerState.ERROR -> { - // TODO handle hypervisor server shutting down or failing + val hv = hypervisors[server] ?: return + availableHypervisors -= hv + requestCycle() } else -> throw IllegalStateException() } } - class ImageView( + private fun servicePublished(server: Server, key: ServiceKey<*>) { + if (key == VirtDriver.Key) { + val hv = hypervisors[server] ?: return + hv.driver = server.services[VirtDriver] + availableHypervisors += hv + + hv.driver.events + .onEach { event -> + if (event is HypervisorEvent.VmsUpdated) { + hv.numberOfActiveServers = event.numberOfActiveServers + hv.availableMemory = event.availableMemory + } + }.launchIn(this) + + requestCycle() + } + } + + data class ImageView( + val name: String, val image: Image, - val monitor: ServerMonitor, val flavor: Flavor, + val continuation: Continuation<Server>, var server: Server? = null ) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 7770ec50..a3ade2fb 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -1,8 +1,9 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy /** @@ -12,11 +13,16 @@ interface VirtProvisioningService { val allocationPolicy: AllocationPolicy /** + * Obtain the active hypervisors for this provisioner. + */ + public suspend fun drivers(): Set<VirtDriver> + + /** * Submit the specified [Image] to the provisioning service. * + * @param name The name of the server to deploy. * @param image The image to be deployed. - * @param monitor The monitor to inform on events. * @param flavor The flavor of the machine instance to run this [image] on. */ - public suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor) + public suspend fun deploy(name: String, image: Image, flavor: Flavor): Server } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt index a1c0ab9a..e2871cca 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt @@ -1,7 +1,7 @@ package com.atlarge.opendc.compute.virt.service.allocation import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.virt.service.NodeView +import com.atlarge.opendc.compute.virt.service.HypervisorView /** * A policy for selecting the [Node] an image should be deployed to, @@ -10,5 +10,5 @@ interface AllocationPolicy { /** * Builds the logic of the policy. */ - operator fun invoke(): Comparator<NodeView> + operator fun invoke(): Comparator<HypervisorView> } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt index b3e9d77e..f095849b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt @@ -1,12 +1,12 @@ package com.atlarge.opendc.compute.virt.service.allocation -import com.atlarge.opendc.compute.virt.service.NodeView +import com.atlarge.opendc.compute.virt.service.HypervisorView /** * Allocation policy that selects the node with the most available memory. */ class AvailableMemoryAllocationPolicy : AllocationPolicy { - override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 -> + override fun invoke(): Comparator<HypervisorView> = Comparator { o1, o2 -> compareValuesBy(o1, o2) { -it.availableMemory } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt index 9d6582dd..59e48465 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt @@ -1,13 +1,13 @@ package com.atlarge.opendc.compute.virt.service.allocation -import com.atlarge.opendc.compute.virt.service.NodeView +import com.atlarge.opendc.compute.virt.service.HypervisorView import kotlinx.coroutines.runBlocking /** * Allocation policy that selects the node with the least amount of active servers. */ class NumberOfActiveServersAllocationPolicy : AllocationPolicy { - override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 -> + override fun invoke(): Comparator<HypervisorView> = Comparator { o1, o2 -> runBlocking { compareValuesBy(o1, o2) { it.numberOfActiveServers } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index b8882eda..0fc64373 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,14 +25,12 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.PowerState +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext @@ -55,21 +53,19 @@ internal class SimpleBareMetalDriverTest { val dom = root.newDomain(name = "driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList()) - - val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${simulationContext.clock.millis()}] $server") - finalState = server.state - } - } + val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2) // Batch driver commands withContext(dom.coroutineContext) { - driver.init(monitor) + driver.init() driver.setImage(image) - driver.setPower(PowerState.POWER_ON) + val server = driver.start().server!! + server.events.collect { event -> + when (event) { + is ServerEvent.StateChanged -> { println(event); finalState = event.server.state } + } + } } } @@ -78,6 +74,6 @@ internal class SimpleBareMetalDriverTest { system.terminate() } - assertEquals(finalState, ServerState.SHUTOFF) + assertEquals(ServerState.SHUTOFF, finalState) } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index a837130d..f8bd786e 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -27,12 +27,10 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test @@ -53,23 +51,18 @@ internal class SimpleProvisioningServiceTest { val root = system.newDomain(name = "root") root.launch { val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) - val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println(server) - } - } - val dom = root.newDomain("provisioner") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val provisioner = SimpleProvisioningService(dom) provisioner.create(driver) delay(5) val nodes = provisioner.nodes() - provisioner.deploy(nodes.first(), image, monitor) + val node = provisioner.deploy(nodes.first(), image) + node.server!!.events.collect { println(it) } } runBlocking { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 254ad5fe..58d784b0 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -22,22 +22,19 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.PowerState import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test @@ -51,6 +48,7 @@ internal class HypervisorTest { /** * A smoke test for the bare-metal driver. */ + @OptIn(ExperimentalCoroutinesApi::class) @Test fun smoke() { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() @@ -58,41 +56,29 @@ internal class HypervisorTest { val root = system.newDomain("root") root.launch { - val vmm = HypervisorImage(object : HypervisorMonitor { - override suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - numberOfDeployedImages: Int, - hostServer: Server - ) { - println("Hello World!") - } - }) + val vmm = HypervisorImage val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) - val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${simulationContext.clock.millis()}]: $server") - } - } val driverDom = root.newDomain("driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) val cpus = List(2) { ProcessingUnit(cpuNode, it, 2000.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) - metalDriver.init(monitor) + metalDriver.init() metalDriver.setImage(vmm) - metalDriver.setPower(PowerState.POWER_ON) + val node = metalDriver.start() + node.server?.events?.onEach { println(it) }?.launchIn(this) delay(5) val flavor = Flavor(1, 0) - val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver] - vmDriver.spawn(workloadA, monitor, flavor) - vmDriver.spawn(workloadB, monitor, flavor) + val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] + val vmA = vmDriver.spawn("a", workloadA, flavor) + vmA.events.onEach { println(it) }.launchIn(this) + val vmB = vmDriver.spawn("b", workloadB, flavor) + vmB.events.onEach { println(it) }.launchIn(this) } runBlocking { diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt new file mode 100644 index 00000000..da4dee12 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt @@ -0,0 +1,110 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.failure + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.launch +import kotlin.math.exp +import kotlin.random.Random +import kotlin.random.asJavaRandom + +/** + * A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in + * isolation, but will trigger other faults. + */ +public class CorrelatedFaultInjector( + private val domain: Domain, + private val iatScale: Double, + private val iatShape: Double, + private val sizeScale: Double, + private val sizeShape: Double, + random: Random = Random +) : FaultInjector { + /** + * The active failure domains that have been registered. + */ + private val active = mutableSetOf<FailureDomain>() + + /** + * The [Job] that awaits the nearest fault in the system. + */ + private var job: Job? = null + + /** + * The [Random] instance to use. + */ + private val random: java.util.Random = random.asJavaRandom() + + /** + * Enqueue the specified [FailureDomain] to fail some time in the future. + */ + override fun enqueue(domain: FailureDomain) { + active += domain + + // Clean up the domain if it finishes + domain.scope.coroutineContext[Job]!!.invokeOnCompletion { + this@CorrelatedFaultInjector.domain.launch { + active -= domain + + if (active.isEmpty()) { + job?.cancel() + job = null + } + } + } + + if (job != null) { + return + } + + job = this.domain.launch { + while (true) { + ensureActive() + + // Make sure to convert delay from hours to milliseconds + val d = lognvariate(iatScale, iatShape) * 3600 * 1e6 + + // Handle long overflow + if (simulationContext.clock.millis() + d <= 0) { + return@launch + } + + delay(d.toLong()) + + val n = lognvariate(sizeScale, sizeShape).toInt() + for (failureDomain in active.shuffled(random).take(n)) { + failureDomain.fail() + } + } + } + } + + // XXX We should extract this in some common package later on. + private fun lognvariate(scale: Double, shape: Double) = exp(scale + shape * random.nextGaussian()) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt new file mode 100644 index 00000000..91ca9b83 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.failure + +import kotlinx.coroutines.CoroutineScope + +/** + * A logical or physical component in a computing environment which may fail. + */ +public interface FailureDomain { + /** + * The lifecycle of the failure domain to which a [FaultInjector] will attach. + */ + public val scope: CoroutineScope + + /** + * Fail the domain externally. + */ + public suspend fun fail() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FaultInjector.kt new file mode 100644 index 00000000..ac7a08de --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FaultInjector.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.failure + +/** + * An interface for stochastically injecting faults into a running system. + */ +public interface FaultInjector { + /** + * Enqueue the specified [FailureDomain] into the queue as candidate for failure injection in the future. + */ + public fun enqueue(domain: FailureDomain) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt new file mode 100644 index 00000000..3883eb11 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.failure + +import com.atlarge.odcsim.simulationContext +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlin.math.ln1p +import kotlin.math.pow +import kotlin.random.Random + +/** + * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are + * independent. + */ +public class UncorrelatedFaultInjector(private val alpha: Double, private val beta: Double, private val random: Random = Random) : FaultInjector { + /** + * Enqueue the specified [FailureDomain] to fail some time in the future. + */ + override fun enqueue(domain: FailureDomain) { + domain.scope.launch { + val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds + + // Handle long overflow + if (simulationContext.clock.millis() + d <= 0) { + return@launch + } + + delay(d.toLong()) + domain.fail() + } + } + + // XXX We should extract this in some common package later on. + private fun Random.weibull(alpha: Double, beta: Double) = (beta * (-ln1p(-nextDouble())).pow(1.0 / alpha)) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt index d9a85231..75aa778f 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt @@ -25,10 +25,15 @@ package com.atlarge.opendc.core.services /** - * A service registry for a datacenter zone. + * An immutable service registry interface. */ public interface ServiceRegistry { /** + * The keys in this registry. + */ + public val keys: Collection<ServiceKey<*>> + + /** * Determine if this map contains the service with the specified [ServiceKey]. * * @param key The key of the service to check for. @@ -41,12 +46,18 @@ public interface ServiceRegistry { * * @param key The key of the service to obtain. * @return The references to the service. - * @throws IllegalArgumentException if the key does not exists in the map. + * @throws IllegalArgumentException if the key does not exist in the map. */ public operator fun <T : Any> get(key: ServiceKey<T>): T /** - * Register the specified [ServiceKey] in this registry. + * Return the result of associating the specified [service] with the given [key] in this registry. */ - public operator fun <T : Any> set(key: ServiceKey<T>, service: T) + public fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry } + +/** + * Construct an empty [ServiceRegistry]. + */ +@Suppress("FunctionName") +public fun ServiceRegistry(): ServiceRegistry = ServiceRegistryImpl(emptyMap()) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt index 91147839..0686ebaf 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt @@ -27,20 +27,18 @@ package com.atlarge.opendc.core.services /** * Default implementation of the [ServiceRegistry] interface. */ -public class ServiceRegistryImpl : ServiceRegistry { - /** - * The map containing the registered services. - */ - private val services: MutableMap<ServiceKey<*>, Any> = mutableMapOf() +internal class ServiceRegistryImpl(private val map: Map<ServiceKey<*>, Any>) : ServiceRegistry { + override val keys: Collection<ServiceKey<*>> + get() = map.keys - override fun <T : Any> set(key: ServiceKey<T>, service: T) { - services[key] = service - } - - override fun contains(key: ServiceKey<*>): Boolean = key in services + override fun contains(key: ServiceKey<*>): Boolean = key in map override fun <T : Any> get(key: ServiceKey<T>): T { @Suppress("UNCHECKED_CAST") - return services[key] as T + return map[key] as T } + + override fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry = ServiceRegistryImpl(map.plus(key to service)) + + override fun toString(): String = map.toString() } diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index d5e1404a..b0182ab3 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -29,8 +29,8 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.WorkflowEvent import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy @@ -38,12 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File @@ -62,28 +62,6 @@ fun main(args: Array<String>) { var finished = 0 val token = Channel<Boolean>() - - val monitor = object : WorkflowMonitor { - override suspend fun onJobStart(job: Job, time: Long) { - println("Job ${job.uid} started") - } - - override suspend fun onJobFinish(job: Job, time: Long) { - finished += 1 - println("Jobs $finished/$total finished (${job.tasks.size} tasks)") - - if (finished == total) { - token.send(true) - } - } - - override suspend fun onTaskStart(job: Job, task: Task, time: Long) { - } - - override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { - } - } - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider(name = "sim") @@ -106,6 +84,27 @@ fun main(args: Array<String>) { } val broker = system.newDomain(name = "broker") + + broker.launch { + val scheduler = schedulerAsync.await() + scheduler.events + .onEach { event -> + when (event) { + is WorkflowEvent.JobStarted -> { + println("Job ${event.job.uid} started") + } + is WorkflowEvent.JobFinished -> { + finished += 1 + println("Jobs $finished/$total finished (${event.job.tasks.size} tasks)") + + if (finished == total) { + token.send(true) + } + } + } + } + .collect() + } broker.launch { val ctx = simulationContext val reader = GwfTraceReader(File(args[0])) @@ -115,7 +114,7 @@ fun main(args: Array<String>) { val (time, job) = reader.next() total += 1 delay(max(0, time * 1000 - ctx.clock.millis())) - scheduler.submit(job, monitor) + scheduler.submit(job) } } diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index d3b37336..28b8ae12 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -41,7 +41,9 @@ dependencies { implementation("com.xenomachina:kotlin-argparser:2.0.7") api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") + runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") runtimeOnly(project(":odcsim:odcsim-engine-omega")) + testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 9e8f0fa8..36da7703 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,23 +1,32 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.flow.first import java.io.BufferedWriter import java.io.Closeable import java.io.FileWriter -class Sc20HypervisorMonitor( +class Sc20Monitor( destination: String -) : HypervisorMonitor, Closeable { +) : Closeable { private val outputFile = BufferedWriter(FileWriter(destination)) + private var failed: Int = 0 init { - outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw\n") + outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - override suspend fun onSliceFinish( + suspend fun stateChanged(server: Server) { + println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}") + if (server.state == ServerState.ERROR) { + failed++ + } + } + + suspend fun onSliceFinish( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -25,11 +34,11 @@ class Sc20HypervisorMonitor( hostServer: Server ) { // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.serviceRegistry[BareMetalDriver.Key] + val driver = hostServer.services[BareMetalDriver.Key] val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw") + outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed") outputFile.newLine() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index f0d3fc8d..639c3aef 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -24,15 +24,19 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -40,11 +44,15 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File -import java.io.FileInputStream import java.io.FileReader import java.util.ServiceLoader import kotlin.math.max @@ -81,27 +89,35 @@ class ExperimentParameters(parser: ArgParser) { } /** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + return CorrelatedFaultInjector(domain, + iatScale = -1.39, iatShape = 1.03, + sizeScale = 1.88, sizeShape = 1.25 + ) +} + +/** * Main entry point of the experiment. */ +@OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array<String>) { ArgParser(args).parseInto(::ExperimentParameters).run { - val hypervisorMonitor = Sc20HypervisorMonitor(outputFile) - val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println(server) - } - } + val monitor = Sc20Monitor(outputFile) val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider("test") val root = system.newDomain("root") + val chan = Channel<Unit>(Channel.CONFLATED) root.launch { val environment = Sc20ClusterEnvironmentReader(File(environmentFile)) .use { it.construct(root) } val performanceInterferenceStream = if (performanceInterferenceFile != null) { - FileInputStream(File(performanceInterferenceFile!!)) + File(performanceInterferenceFile!!).inputStream().buffered() } else { object {}.javaClass.getResourceAsStream("/env/performance-interference.json") } @@ -111,18 +127,56 @@ fun main(args: Array<String>) { println(simulationContext.clock.instant()) + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + + // Wait for the bare metal nodes to be spawned + delay(10) + val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, - environment.platforms[0].zones[0].services[ProvisioningService.Key], - hypervisorMonitor + bareMetalProvisioner ) + // Wait for the hypervisors to be spawned + delay(10) + + // Monitor hypervisor events + for (hypervisor in scheduler.drivers()) { + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer) + else -> println(event) + } + } + .launchIn(this) + } + + root.newDomain(name = "failures").launch { + chan.receive() + val injectors = mutableMapOf<String, FaultInjector>() + + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) - scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) + launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.cores, workload.image.requiredMemory) + ) + // Monitor server events + server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() + } } println(simulationContext.clock.instant()) @@ -134,6 +188,6 @@ fun main(args: Array<String>) { } // Explicitly close the monitor to flush its buffer - hypervisorMonitor.close() + monitor.close() } } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 0d4bd125..6f6aa616 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -34,7 +34,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService import com.atlarge.opendc.core.Environment import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.services.ServiceRegistryImpl +import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper @@ -77,7 +77,14 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimpleBareMetalDriver(dom.newDomain("node-$counter"), UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000))) + SimpleBareMetalDriver( + dom.newDomain("node-$counter"), + UUID.randomUUID(), + "node-${counter++}", + emptyMap(), + cores, + listOf(MemoryUnit("", "", 2300.0, 16000)) + ) } } } @@ -89,9 +96,7 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb provisioningService.create(node) } - val serviceRegistry = ServiceRegistryImpl() - serviceRegistry[ProvisioningService.Key] = provisioningService - + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( UUID.randomUUID(), "sc18-platform", listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index ae0ba550..708e27bf 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -28,6 +28,7 @@ import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel import com.atlarge.opendc.compute.metal.service.ProvisioningService @@ -35,7 +36,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService import com.atlarge.opendc.core.Environment import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.services.ServiceRegistryImpl +import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader import java.io.BufferedReader import java.io.File @@ -100,13 +101,14 @@ class Sc20ClusterEnvironmentReader( dom.newDomain("node-$clusterId-$it"), UUID.randomUUID(), "node-$clusterId-$it", + mapOf(NODE_CLUSTER to clusterId), List(coresPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, speed) }, - listOf(unknownMemoryUnit), // For now we assume a simple linear load model with an idle draw of ~200W and a maximum // power draw of 350W. // Source: https://stackoverflow.com/questions/6128960 + listOf(unknownMemoryUnit), LinearLoadPowerModel(200.0, 350.0) ) ) @@ -119,8 +121,7 @@ class Sc20ClusterEnvironmentReader( provisioningService.create(node) } - val serviceRegistry = ServiceRegistryImpl() - serviceRegistry[ProvisioningService.Key] = provisioningService + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( UUID.randomUUID(), "sc20-platform", listOf( diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index a954a308..4b5d6fb7 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -35,7 +35,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService import com.atlarge.opendc.core.Environment import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.services.ServiceRegistryImpl +import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper @@ -85,11 +85,12 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb dom.newDomain("node-$counter"), UUID.randomUUID(), "node-${counter++}", + emptyMap(), cores, - memories, // For now we assume a simple linear load model with an idle draw of ~200W and a maximum // power draw of 350W. // Source: https://stackoverflow.com/questions/6128960 + memories, LinearLoadPowerModel(200.0, 350.0) ) } @@ -103,8 +104,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb provisioningService.create(node) } - val serviceRegistry = ServiceRegistryImpl() - serviceRegistry[ProvisioningService.Key] = provisioningService + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( UUID.randomUUID(), "sc20-platform", listOf( diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt index b444f91c..1cb2de97 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt @@ -24,10 +24,9 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job -class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) { +class JobState(val job: Job, val submittedAt: Long) { /** * A flag to indicate whether this job is finished. */ diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 008cd1ee..7a20363c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -25,13 +25,13 @@ package com.atlarge.opendc.workflows.service import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy @@ -39,6 +39,11 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import java.util.PriorityQueue import java.util.Queue import kotlinx.coroutines.launch @@ -58,7 +63,7 @@ class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy, resourceFilterPolicy: ResourceFilterPolicy, resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowService, ServerMonitor { +) : WorkflowService, CoroutineScope by domain { /** * The incoming jobs ready to be processed by the scheduler. @@ -167,6 +172,7 @@ class StageWorkflowService( private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private val resourceFilterPolicy: ResourceFilterPolicy.Logic private val resourceSelectionPolicy: Comparator<Node> + private val eventFlow = EventFlow<WorkflowEvent>() init { domain.launch { @@ -183,9 +189,11 @@ class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) { + override val events: Flow<WorkflowEvent> = eventFlow + + override suspend fun submit(job: Job) = withContext(domain.coroutineContext) { // J1 Incoming Jobs - val jobInstance = JobState(job, monitor, simulationContext.clock.millis()) + val jobInstance = JobState(job, simulationContext.clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -217,6 +225,7 @@ class StageWorkflowService( /** * Perform a scheduling cycle immediately. */ + @OptIn(ExperimentalCoroutinesApi::class) internal suspend fun schedule() { // J2 Create list of eligible jobs val iterator = incomingJobs.iterator() @@ -232,7 +241,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis())) rootListener.jobStarted(jobInstance) } @@ -280,10 +289,13 @@ class StageWorkflowService( // T4 Submit task to machine available -= host instance.state = TaskStatus.ACTIVE - - val newHost = provisioningService.deploy(host, instance.task.image, this) + val newHost = provisioningService.deploy(host, instance.task.image) + val server = newHost.server!! instance.host = newHost - taskByServer[newHost.server!!] = instance + taskByServer[server] = instance + server.events + .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } + .launchIn(this) activeTasks += instance taskQueue.poll() @@ -294,12 +306,12 @@ class StageWorkflowService( } } - override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) { + private suspend fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = simulationContext.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { @@ -310,7 +322,7 @@ class StageWorkflowService( job.tasks.remove(task) available += task.host!! activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -335,7 +347,7 @@ class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - job.monitor.onJobFinish(job.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis())) rootListener.jobFinished(job) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt new file mode 100644 index 00000000..2ca5a19d --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task + +/** + * An event emitted by the [WorkflowService]. + */ +public sealed class WorkflowEvent { + /** + * The [WorkflowService] that emitted the event. + */ + public abstract val service: WorkflowService + + /** + * This event is emitted when a job has become active. + */ + public data class JobStarted( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a job has finished processing. + */ + public data class JobFinished( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskStarted( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskFinished( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index 524f4f9e..38ea49c4 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.core.services.AbstractServiceKey -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -36,9 +36,14 @@ import java.util.UUID */ public interface WorkflowService { /** + * Thie events emitted by the workflow scheduler. + */ + public val events: Flow<WorkflowEvent> + + /** * Submit the specified [Job] to the workflow service for scheduling. */ - public suspend fun submit(job: Job, monitor: WorkflowMonitor) + public suspend fun submit(job: Job) /** * The service key for the workflow scheduler. diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt index 40389ce2..02969d8a 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt @@ -47,4 +47,6 @@ data class Job( override fun equals(other: Any?): Boolean = other is Job && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Job(uid=$uid, name=$name, tasks=${tasks.size}, metadata=$metadata)" } diff --git a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 19e56482..5ee6d5e6 100644 --- a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -29,17 +29,16 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import kotlinx.coroutines.async import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals @@ -64,24 +63,6 @@ internal class StageWorkflowSchedulerIntegrationTest { var tasksStarted = 0L var tasksFinished = 0L - val monitor = object : WorkflowMonitor { - override suspend fun onJobStart(job: Job, time: Long) { - jobsStarted++ - } - - override suspend fun onJobFinish(job: Job, time: Long) { - jobsFinished++ - } - - override suspend fun onTaskStart(job: Job, task: Task, time: Long) { - tasksStarted++ - } - - override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { - tasksFinished++ - } - } - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider(name = "sim") @@ -104,6 +85,21 @@ internal class StageWorkflowSchedulerIntegrationTest { } val broker = system.newDomain(name = "broker") + + broker.launch { + val scheduler = schedulerAsync.await() + scheduler.events + .onEach { event -> + when (event) { + is WorkflowEvent.JobStarted -> jobsStarted++ + is WorkflowEvent.JobFinished -> jobsFinished++ + is WorkflowEvent.TaskStarted -> tasksStarted++ + is WorkflowEvent.TaskFinished -> tasksFinished++ + } + } + .collect() + } + broker.launch { val ctx = simulationContext val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) @@ -113,7 +109,7 @@ internal class StageWorkflowSchedulerIntegrationTest { val (time, job) = reader.next() jobsSubmitted++ delay(max(0, time * 1000 - ctx.clock.millis())) - scheduler.submit(job, monitor) + scheduler.submit(job) } } |
