summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-03-26 13:55:32 +0100
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-03-26 13:55:32 +0100
commit620f194c53d950a37f78577f4aacfd7c0c06bb9a (patch)
treef5f7ffdce8efdcffb92e158ebbb643ba1a797b23 /opendc/opendc-compute
parentf4ee29bb97aed68329e72710dd3049c23f592f25 (diff)
parent7eb8177e2278bde2c0f4fad00af6fdd2d632cb5b (diff)
Merge branch 'feat/2.x-failures' into '2.x'
Implement basic hardware-level failures See merge request opendc/opendc-simulator!35
Diffstat (limited to 'opendc/opendc-compute')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt9
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt53
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt10
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt53
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt9
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt)18
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt19
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt)22
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt55
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt220
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt3
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt58
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt67
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt)24
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt)2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt)145
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt14
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt25
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt12
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt11
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt158
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt12
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt4
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt26
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt15
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt (renamed from opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt)44
31 files changed, 781 insertions, 393 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index 86ec9a5b..01968cd8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,7 +28,7 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
-import com.atlarge.opendc.core.services.ServiceRegistryImpl
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -68,7 +68,12 @@ public data class Server(
/**
* The services published by this server.
*/
- public val serviceRegistry: ServiceRegistry = ServiceRegistryImpl()
+ public val services: ServiceRegistry,
+
+ /**
+ * The events that are emitted by the server.
+ */
+ public val events: Flow<ServerEvent>
) : Resource {
override fun hashCode(): Int = uid.hashCode()
override fun equals(other: Any?): Boolean = other is Server && uid == other.uid
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt
new file mode 100644
index 00000000..1595937c
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt
@@ -0,0 +1,53 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.core
+
+import com.atlarge.opendc.core.services.ServiceKey
+
+/**
+ * An event that is emitted by a [Server].
+ */
+public sealed class ServerEvent {
+ /**
+ * The server that emitted the event.
+ */
+ public abstract val server: Server
+
+ /**
+ * This event is emitted when the state of [server] changes.
+ *
+ * @property server The server of which the state changed.
+ * @property previousState The previous state of the server.
+ */
+ public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent()
+
+ /**
+ * This event is emitted when a server publishes a service.
+ *
+ * @property server The server that published the service.
+ * @property key The service key of the service that was published.
+ */
+ public data class ServicePublished(override val server: Server, val key: ServiceKey<*>) : ServerEvent()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index b09a5a7d..e0a491c8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -27,10 +27,10 @@ package com.atlarge.opendc.compute.core.execution
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.core.services.AbstractServiceKey
+import com.atlarge.opendc.core.services.ServiceKey
/**
- * Represents the execution context in which an bootable [Image] runs on a [Server].
+ * Represents the execution context in which a bootable [Image] runs on a [Server].
*/
public interface ServerContext {
/**
@@ -44,11 +44,9 @@ public interface ServerContext {
public val cpus: List<ProcessingUnit>
/**
- * Publishes the given [service] with key [serviceKey] in the server's registry.
+ * Publish the specified [service] at the given [ServiceKey].
*/
- public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) {
- server.serviceRegistry[serviceKey] = service
- }
+ public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T)
/**
* Request the specified burst time from the processor cores and suspend execution until a processor core finishes
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt
new file mode 100644
index 00000000..e4da557b
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt
@@ -0,0 +1,53 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.core.execution
+
+import kotlinx.coroutines.CancellationException
+
+/**
+ * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown flow
+ * has been sent to the server.
+ */
+public class ShutdownException(message: String? = null, override val cause: Throwable? = null) : CancellationException(message)
+
+/**
+ * This method terminates the current active coroutine if the specified [CancellationException] is caused
+ * by a shutdown.
+ */
+public fun CancellationException.assertShutdown() {
+ if (this is ShutdownException) {
+ throw this
+ }
+}
+
+/**
+ * This method terminates the current active coroutine if the specified [CancellationException] is caused
+ * by a failure.
+ */
+public fun CancellationException.assertFailure() {
+ if (this is ShutdownException && cause != null) {
+ throw this
+ }
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
index 107237ea..e77b55a6 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
@@ -26,7 +26,7 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.isActive
+import kotlinx.coroutines.ensureActive
import java.util.UUID
import kotlin.coroutines.coroutineContext
import kotlin.math.min
@@ -64,11 +64,8 @@ data class FlopsApplicationImage(
val burst = LongArray(cores) { flops / cores }
val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization }
- while (coroutineContext.isActive) {
- if (burst.all { it == 0L }) {
- break
- }
-
+ while (burst.any { it != 0L }) {
+ coroutineContext.ensureActive()
ctx.run(burst, maxUsage, Long.MAX_VALUE)
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt
index 5fce3f48..a3a851fe 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt
@@ -24,17 +24,11 @@
package com.atlarge.opendc.compute.metal
-/**
- * The power state of a compute node.
+/*
+ * Common metadata keys for bare-metal nodes.
*/
-public enum class PowerState {
- /**
- * Node is powered on.
- */
- POWER_ON,
- /**
- * Node is powered off.
- */
- POWER_OFF,
-}
+/**
+ * The cluster to which the node belongs.
+ */
+const val NODE_CLUSTER = "bare-metal:cluster"
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index a43abfe9..7cb4c0c5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -27,12 +27,13 @@ package com.atlarge.opendc.compute.metal
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
* A bare-metal compute node.
*/
-data class Node(
+public data class Node(
/**
* The unique identifier of the node.
*/
@@ -44,9 +45,14 @@ data class Node(
public override val name: String,
/**
- * The power state of the node.
+ * Metadata of the node.
*/
- public val powerState: PowerState,
+ public val metadata: Map<String, Any>,
+
+ /**
+ * The last known state of the compute node.
+ */
+ public val state: NodeState,
/**
* The boot image of the node.
@@ -56,7 +62,12 @@ data class Node(
/**
* The server instance that is running on the node or `null` if no server is running.
*/
- public val server: Server?
+ public val server: Server?,
+
+ /**
+ * The events that are emitted by the node.
+ */
+ public val events: Flow<NodeEvent>
) : Identity {
override fun hashCode(): Int = uid.hashCode()
override fun equals(other: Any?): Boolean = other is Node && uid == other.uid
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt
index fbfd0ad6..7719db24 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt
@@ -22,20 +22,22 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.core.monitor
-
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
+package com.atlarge.opendc.compute.metal
/**
- * An interface for monitoring the state of a machine.
+ * An event that is emitted by a [Node].
*/
-public interface ServerMonitor {
+public sealed class NodeEvent {
+ /**
+ * The node that emitted the event.
+ */
+ public abstract val node: Node
+
/**
- * This method is invoked when the state of a machine updates.
+ * This event is emitted when the state of [node] changes.
*
- * @param server The server which state was updated.
- * @param previousState The previous state of the server.
+ * @property node The node of which the state changed.
+ * @property previousState The previous state of the node.
*/
- public suspend fun onUpdate(server: Server, previousState: ServerState)
+ public data class StateChanged(override val node: Node, val previousState: NodeState) : NodeEvent()
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt
new file mode 100644
index 00000000..ca9cf509
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt
@@ -0,0 +1,55 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.metal
+
+/**
+ * An enumeration describing the possible states of a bare-metal compute node.
+ */
+public enum class NodeState {
+ /**
+ * The node is booting.
+ */
+ BOOT,
+
+ /**
+ * The node is powered off.
+ */
+ SHUTOFF,
+
+ /**
+ * The node is active and running.
+ */
+ ACTIVE,
+
+ /**
+ * The node is in error.
+ */
+ ERROR,
+
+ /**
+ * The state of the node is unknown.
+ */
+ UNKNOWN,
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 1214dd36..41cec291 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -26,9 +26,8 @@ package com.atlarge.opendc.compute.metal.driver
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.PowerState
+import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.power.Powerable
import com.atlarge.opendc.core.services.AbstractServiceKey
import kotlinx.coroutines.flow.Flow
@@ -37,7 +36,12 @@ import java.util.UUID
/**
* A driver interface for the management interface of a bare-metal compute node.
*/
-public interface BareMetalDriver : Powerable {
+public interface BareMetalDriver : Powerable, FailureDomain {
+ /**
+ * The [Node] that is controlled by this driver.
+ */
+ public val node: Flow<Node>
+
/**
* The amount of work done by the machine in percentage with respect to the total amount of processing power
* available.
@@ -47,12 +51,22 @@ public interface BareMetalDriver : Powerable {
/**
* Initialize the driver.
*/
- public suspend fun init(monitor: ServerMonitor): Node
+ public suspend fun init(): Node
+
+ /**
+ * Start the bare metal node with the specified boot disk image.
+ */
+ public suspend fun start(): Node
+
+ /**
+ * Stop the bare metal node if it is running.
+ */
+ public suspend fun stop(): Node
/**
- * Update the power state of the compute node.
+ * Reboot the bare metal node.
*/
- public suspend fun setPower(powerState: PowerState): Node
+ public suspend fun reboot(): Node
/**
* Update the boot disk image of the compute node.
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index c7dc74cf..4a40dc9f 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -25,23 +25,31 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.signal.Signal
+import com.atlarge.odcsim.flow.EventFlow
+import com.atlarge.odcsim.flow.StateFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.MemoryUnit
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
+import com.atlarge.opendc.compute.core.execution.ShutdownException
+import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.EmptyImage
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.PowerState
+import com.atlarge.opendc.compute.metal.NodeEvent
+import com.atlarge.opendc.compute.metal.NodeState
import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
+import com.atlarge.opendc.core.services.ServiceKey
+import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
@@ -50,6 +58,7 @@ import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
import kotlinx.coroutines.withContext
+import java.lang.Exception
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
@@ -57,6 +66,7 @@ import kotlinx.coroutines.withContext
* @param domain The simulation domain the driver runs in.
* @param uid The unique identifier of the machine.
* @param name An optional name of the machine.
+ * @param metadata The initial metadata of the node.
* @param cpus The CPUs available to the bare metal machine.
* @param memoryUnits The memory units in this machine.
* @param powerModel The power model of this machine.
@@ -65,128 +75,175 @@ public class SimpleBareMetalDriver(
private val domain: Domain,
uid: UUID,
name: String,
+ metadata: Map<String, Any>,
val cpus: List<ProcessingUnit>,
val memoryUnits: List<MemoryUnit>,
- powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel(0.0)
+ powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel(
+ 0.0
+ )
) : BareMetalDriver {
/**
- * The monitor to use.
+ * The flavor that corresponds to this machine.
*/
- private lateinit var monitor: ServerMonitor
+ private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum())
/**
- * The machine state.
+ * The current active server context.
*/
- private var node: Node = Node(uid, name, PowerState.POWER_OFF, EmptyImage, null)
+ private var serverContext: BareMetalServerContext? = null
/**
- * The flavor that corresponds to this machine.
+ * The events of the machine.
*/
- private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum())
+ private val events = EventFlow<NodeEvent>()
/**
- * The job that is running the image.
+ * The flow containing the load of the server.
*/
- private var job: Job? = null
+ private val usageState = StateFlow(0.0)
/**
- * The signal containing the load of the server.
+ * The machine state.
*/
- private val usageSignal = Signal(0.0)
+ private val nodeState = StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events))
+
+ override val node: Flow<Node> = nodeState
- override val usage: Flow<Double> = usageSignal
+ override val usage: Flow<Double> = usageState
override val powerDraw: Flow<Double> = powerModel(this)
- override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
- this@SimpleBareMetalDriver.monitor = monitor
- return@withContext node
+ override suspend fun init(): Node = withContext(domain.coroutineContext) {
+ nodeState.value
}
- override suspend fun setPower(powerState: PowerState): Node = withContext(domain.coroutineContext) {
- val previousPowerState = node.powerState
- val server = when (node.powerState to powerState) {
- PowerState.POWER_OFF to PowerState.POWER_OFF -> null
- PowerState.POWER_OFF to PowerState.POWER_ON -> Server(
- UUID.randomUUID(),
- node.name,
- emptyMap(),
- flavor,
- node.image,
- ServerState.BUILD
- )
- PowerState.POWER_ON to PowerState.POWER_OFF -> null // TODO Terminate existing image
- PowerState.POWER_ON to PowerState.POWER_ON -> node.server
- else -> throw IllegalStateException()
+ override suspend fun start(): Node = withContext(domain.coroutineContext) {
+ val node = nodeState.value
+ if (node.state != NodeState.SHUTOFF) {
+ return@withContext node
}
- server?.serviceRegistry?.set(BareMetalDriver.Key, this@SimpleBareMetalDriver)
- node = node.copy(powerState = powerState, server = server)
- if (powerState != previousPowerState && server != null) {
- launch()
+ val events = EventFlow<ServerEvent>()
+ val server = Server(
+ UUID.randomUUID(),
+ node.name,
+ emptyMap(),
+ flavor,
+ node.image,
+ ServerState.BUILD,
+ ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver),
+ events
+ )
+
+ setNode(node.copy(state = NodeState.BOOT, server = server))
+ serverContext = BareMetalServerContext(events)
+ return@withContext nodeState.value
+ }
+
+ override suspend fun stop(): Node = withContext(domain.coroutineContext) {
+ val node = nodeState.value
+ if (node.state == NodeState.SHUTOFF) {
+ return@withContext node
}
+ // We terminate the image running on the machine
+ serverContext!!.cancel(fail = false)
+ serverContext = null
+
+ setNode(node.copy(state = NodeState.SHUTOFF, server = null))
return@withContext node
}
+ override suspend fun reboot(): Node = withContext(domain.coroutineContext) {
+ stop()
+ start()
+ }
+
override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
- node = node.copy(image = image)
- return@withContext node
+ setNode(nodeState.value.copy(image = image))
+ return@withContext nodeState.value
}
- override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node }
+ override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
- /**
- * Launch the server image on the machine.
- */
- private suspend fun launch() {
- val serverContext = serverCtx
+ private fun setNode(value: Node) {
+ val field = nodeState.value
+ if (field.state != value.state) {
+ events.emit(NodeEvent.StateChanged(value, field.state))
+ }
- job = domain.launch {
- serverContext.init()
- try {
- node.server!!.image(serverContext)
- serverContext.exit()
- } catch (cause: Throwable) {
- serverContext.exit(cause)
- }
+ if (field.server != null && value.server != null && field.server.state != value.server.state) {
+ serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state))
}
+
+ nodeState.value = value
}
- private val serverCtx = object : ServerManagementContext {
- private var initialized: Boolean = false
+ private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
+ private var finalized: Boolean = false
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
- override var server: Server
- get() = node.server!!
- set(value) {
- node = node.copy(server = value)
+ override val server: Server
+ get() = nodeState.value.server!!
+
+ private val job = domain.launch {
+ delay(1) // TODO Introduce boot time
+ init()
+ try {
+ server.image(this@BareMetalServerContext)
+ exit()
+ } catch (cause: Throwable) {
+ exit(cause)
}
+ }
+
+ /**
+ * Cancel the image running on the machine.
+ */
+ suspend fun cancel(fail: Boolean) {
+ if (fail)
+ job.cancel(ShutdownException(cause = Exception("Random failure")))
+ else
+ job.cancel(ShutdownException())
+ job.join()
+ }
+
+ override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
+ val server = server.copy(services = server.services.put(key, service))
+ setNode(nodeState.value.copy(server = server))
+ events.emit(ServerEvent.ServicePublished(server, key))
+ }
override suspend fun init() {
- if (initialized) {
- throw IllegalStateException()
- }
+ assert(!finalized) { "Machine is already finalized" }
- val previousState = server.state
- server = server.copy(state = ServerState.ACTIVE)
- monitor.onUpdate(server, previousState)
- initialized = true
+ val server = server.copy(state = ServerState.ACTIVE)
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
}
override suspend fun exit(cause: Throwable?) {
- val previousState = server.state
- val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR
- server = server.copy(state = state)
- initialized = false
- domain.launch { monitor.onUpdate(server, previousState) }
+ finalized = true
+
+ val newServerState =
+ if (cause == null || (cause is ShutdownException && cause.cause == null))
+ ServerState.SHUTOFF
+ else
+ ServerState.ERROR
+ val newNodeState =
+ if (cause == null || (cause is ShutdownException && cause.cause != null))
+ nodeState.value.state
+ else
+ NodeState.ERROR
+ val server = server.copy(state = newServerState)
+ setNode(nodeState.value.copy(state = newNodeState, server = server))
}
private var flush: Job? = null
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
require(burst.size == limit.size) { "Array dimensions do not match" }
+ assert(!finalized) { "Server instance is already finalized" }
// If run is called in at the same timestamp as the previous call, cancel the load flush
flush?.cancel()
@@ -209,19 +266,20 @@ public class SimpleBareMetalDriver(
}
}
- usageSignal.value = totalUsage / cpus.size
+ usageState.value = totalUsage / cpus.size
try {
delay(duration)
- } catch (_: CancellationException) {
- // On cancellation, we compute and return the remaining burst
+ } catch (e: CancellationException) {
+ // On non-failure cancellation, we compute and return the remaining burst
+ e.assertFailure()
}
val end = simulationContext.clock.millis()
// Flush the load if the do not receive a new run call for the same timestamp
- flush = domain.launch {
+ flush = domain.launch(job) {
delay(1)
- usageSignal.value = 0.0
+ usageState.value = 0.0
}
flush!!.invokeOnCompletion {
flush = null
@@ -235,4 +293,14 @@ public class SimpleBareMetalDriver(
}
}
}
+
+ override val scope: CoroutineScope
+ get() = domain
+
+ override suspend fun fail() {
+ serverContext?.cancel(fail = true)
+ domain.cancel()
+ }
+
+ override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})"
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
index 24ade799..105505f2 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.core.services.AbstractServiceKey
@@ -53,7 +52,7 @@ public interface ProvisioningService {
/**
* Deploy the specified [Image] on a compute node.
*/
- public suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node
+ public suspend fun deploy(node: Node, image: Image): Node
/**
* The service key of this service.
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index b18a4006..a7e143aa 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -25,31 +25,22 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.Domain
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.PowerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import kotlinx.coroutines.withContext
/**
* A very basic implementation of the [ProvisioningService].
*/
-public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, ServerMonitor {
+public class SimpleProvisioningService(val domain: Domain) : ProvisioningService {
/**
* The active nodes in this service.
*/
private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf()
- /**
- * The installed monitors.
- */
- private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf()
-
override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) {
- val node = driver.init(this@SimpleProvisioningService)
+ val node = driver.init()
nodes[node] = driver
return@withContext node
}
@@ -60,19 +51,10 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService
return@withContext nodes[node]!!.refresh()
}
- override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
+ override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) {
val driver = nodes[node]!!
-
driver.setImage(image)
- driver.setPower(PowerState.POWER_OFF)
- val newNode = driver.setPower(PowerState.POWER_ON)
- monitors[newNode.server!!] = monitor
+ val newNode = driver.reboot()
return@withContext newNode
}
-
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- withContext(domain.coroutineContext) {
- monitors[server]?.onUpdate(server, previousState)
- }
- }
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
new file mode 100644
index 00000000..69b0124d
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -0,0 +1,58 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.virt
+
+import com.atlarge.opendc.core.Identity
+import kotlinx.coroutines.flow.Flow
+import java.util.UUID
+
+/**
+ * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
+ * into several virtual guest machines.
+ */
+public class Hypervisor(
+ /**
+ * The unique identifier of the hypervisor.
+ */
+ override val uid: UUID,
+
+ /**
+ * The optional name of the hypervisor.
+ */
+ override val name: String,
+
+ /**
+ * Metadata of the hypervisor.
+ */
+ public val metadata: Map<String, Any>,
+
+ /**
+ * The events that are emitted by the hypervisor.
+ */
+ public val events: Flow<HypervisorEvent>
+) : Identity {
+ override fun hashCode(): Int = uid.hashCode()
+ override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
new file mode 100644
index 00000000..5c19b00d
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
@@ -0,0 +1,67 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.virt
+
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+
+/**
+ * An event that is emitted by a [VirtDriver].
+ */
+public sealed class HypervisorEvent {
+ /**
+ * The driver that emitted the event.
+ */
+ public abstract val driver: VirtDriver
+
+ /**
+ * This event is emitted when the number of active servers on the server managed by this driver is updated.
+ *
+ * @property driver The driver that emitted the event.
+ * @property numberOfActiveServers The number of active servers.
+ * @property availableMemory The available memory, in MB.
+ */
+ public data class VmsUpdated(
+ override val driver: VirtDriver,
+ public val numberOfActiveServers: Int,
+ public val availableMemory: Long
+ ) : HypervisorEvent()
+
+ /**
+ * This event is emitted when a slice is finished.
+ *
+ * @property driver The driver that emitted the event.
+ * @property requestedBurst The total requested CPU time (can be above capacity).
+ * @property grantedBurst The actual total granted capacity.
+ * @property numberOfDeployedImages The number of images deployed on this hypervisor.
+ */
+ public data class SliceFinished(
+ override val driver: VirtDriver,
+ public val requestedBurst: Long,
+ public val grantedBurst: Long,
+ public val numberOfDeployedImages: Int,
+ public val hostServer: Server
+ ) : HypervisorEvent()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index 8d055953..c21b002d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -22,32 +22,36 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import com.atlarge.opendc.core.resource.TagContainer
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.UUID
/**
* A hypervisor managing the VMs of a node.
*/
-class HypervisorImage(
- private val hypervisorMonitor: HypervisorMonitor
-) : Image {
+object HypervisorImage : Image {
override val uid: UUID = UUID.randomUUID()
override val name: String = "vmm"
override val tags: TagContainer = emptyMap()
override suspend fun invoke(ctx: ServerContext) {
- val driver = HypervisorVirtDriver(ctx, hypervisorMonitor)
+ coroutineScope {
+ val driver = SimpleVirtDriver(ctx, this)
+ ctx.publishService(VirtDriver.Key, driver)
- ctx.publishService(VirtDriver.Key, driver)
-
- // Suspend image until it is cancelled
- suspendCancellableCoroutine<Unit> {}
+ // Suspend image until it is cancelled
+ try {
+ suspendCancellableCoroutine<Unit> {}
+ } finally {
+ driver.eventFlow.close()
+ }
+ }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt
index 926234b5..0586ae00 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt
@@ -1,3 +1,3 @@
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt.driver
public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.")
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 430e5a37..76368080 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -22,26 +22,34 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt.driver
-import com.atlarge.odcsim.SimulationContext
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
+import com.atlarge.opendc.compute.core.execution.ShutdownException
+import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import com.atlarge.opendc.compute.virt.HypervisorEvent
+import com.atlarge.opendc.core.services.ServiceKey
+import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
@@ -51,11 +59,18 @@ import kotlin.math.min
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
*/
-class HypervisorVirtDriver(
+@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
+class SimpleVirtDriver(
private val hostContext: ServerContext,
- private val monitor: HypervisorMonitor
+ private val coroutineScope: CoroutineScope
) : VirtDriver {
/**
+ * The [Server] on which this hypervisor runs.
+ */
+ private val server: Server
+ get() = hostContext.server
+
+ /**
* A set for tracking the VM context objects.
*/
internal val vms: MutableSet<VmServerContext> = mutableSetOf()
@@ -66,31 +81,38 @@ class HypervisorVirtDriver(
private var availableMemory: Long = hostContext.server.flavor.memorySize
/**
- * Monitors to keep informed.
+ * The [EventFlow] to emit the events.
*/
- private val monitors: MutableSet<VirtDriverMonitor> = mutableSetOf()
+ internal val eventFlow = EventFlow<HypervisorEvent>()
- override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server {
+ override val events: Flow<HypervisorEvent> = eventFlow
+
+ override suspend fun spawn(
+ name: String,
+ image: Image,
+ flavor: Flavor
+ ): Server {
val requiredMemory = flavor.memorySize
if (availableMemory - requiredMemory < 0) {
throw InsufficientMemoryOnServerException()
}
require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" }
- val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD)
+ val events = EventFlow<ServerEvent>()
+ val server = Server(
+ UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD,
+ ServiceRegistry(), events
+ )
availableMemory -= requiredMemory
- vms.add(VmServerContext(server, monitor, simulationContext))
- monitors.forEach { it.onUpdate(vms.size, availableMemory) }
+ vms.add(VmServerContext(server, events, simulationContext.domain))
+ eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
- override suspend fun addMonitor(monitor: VirtDriverMonitor) {
- monitors.add(monitor)
- }
-
- override suspend fun removeMonitor(monitor: VirtDriverMonitor) {
- monitors.remove(monitor)
- }
+ /**
+ * A flag to indicate the driver is stopped.
+ */
+ private var stopped: Boolean = false
/**
* The set of [VmServerContext] instances that is being scheduled at the moment.
@@ -108,12 +130,12 @@ class HypervisorVirtDriver(
private suspend fun reschedule() {
flush()
- // Do not schedule a call if there is no work to schedule
- if (activeVms.isEmpty()) {
+ // Do not schedule a call if there is no work to schedule or the driver stopped.
+ if (stopped || activeVms.isEmpty()) {
return
}
- val call = simulationContext.domain.launch {
+ val call = coroutineScope.launch {
val start = simulationContext.clock.millis()
val vms = activeVms.toSet()
@@ -200,16 +222,9 @@ class HypervisorVirtDriver(
}
}
- monitor.onSliceFinish(
- end,
- totalBurst,
- totalBurst - totalRemainder,
- vms.size,
- hostContext.server
- )
+ eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size, server))
}
this.call = call
- call.invokeOnCompletion { this.call = null }
}
/**
@@ -217,9 +232,10 @@ class HypervisorVirtDriver(
*/
private fun flush() {
val call = call ?: return // If there is no active call, there is nothing to flush
- // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its
+ // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its
// completion.
call.cancel()
+ this.call = null
}
/**
@@ -238,17 +254,19 @@ class HypervisorVirtDriver(
}
internal inner class VmServerContext(
- override var server: Server,
- val monitor: ServerMonitor,
- ctx: SimulationContext
+ server: Server,
+ val events: EventFlow<ServerEvent>,
+ val domain: Domain
) : ServerManagementContext {
+ private var finalized: Boolean = false
lateinit var requests: List<CpuRequest>
lateinit var burst: LongArray
var deadline: Long = 0L
var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
- internal val job: Job = ctx.domain.launch {
+ internal val job: Job = coroutineScope.launch {
+ delay(1) // TODO Introduce boot time
init()
try {
server.image(this@VmServerContext)
@@ -258,28 +276,42 @@ class HypervisorVirtDriver(
}
}
+ override var server: Server = server
+ set(value) {
+ if (field.state != value.state) {
+ events.emit(ServerEvent.StateChanged(value, field.state))
+ }
+
+ field = value
+ }
+
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
+ override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
+ server = server.copy(services = server.services.put(key, service))
+ events.emit(ServerEvent.ServicePublished(server, key))
+ }
+
override suspend fun init() {
- if (initialized) {
- throw IllegalStateException()
- }
+ assert(!finalized) { "VM is already finalized" }
- val previousState = server.state
server = server.copy(state = ServerState.ACTIVE)
- monitor.onUpdate(server, previousState)
initialized = true
}
override suspend fun exit(cause: Throwable?) {
- val previousState = server.state
- val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR
- server = server.copy(state = state)
+ finalized = true
+
+ val serverState =
+ if (cause == null || (cause is ShutdownException && cause.cause == null))
+ ServerState.SHUTOFF
+ else
+ ServerState.ERROR
+ server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
- monitor.onUpdate(server, previousState)
- initialized = false
vms.remove(this)
- monitors.forEach { it.onUpdate(vms.size, availableMemory) }
+ events.close()
+ eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
}
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
@@ -289,7 +321,14 @@ class HypervisorVirtDriver(
this.burst = burst
requests = cpus.asSequence()
.take(burst.size)
- .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) }
+ .mapIndexed { i, cpu ->
+ CpuRequest(
+ this,
+ cpu,
+ burst[i],
+ limit[i]
+ )
+ }
.toList()
// Wait until the burst has been run or the coroutine is cancelled
@@ -297,11 +336,13 @@ class HypervisorVirtDriver(
activeVms += this
reschedule()
chan.receive()
- } catch (_: CancellationException) {
+ } catch (e: CancellationException) {
// On cancellation, we compute and return the remaining burst
+ e.assertFailure()
+ } finally {
+ activeVms -= this
+ reschedule()
}
- activeVms -= this
- reschedule()
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index d889d0f9..1002d382 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -27,8 +27,9 @@ package com.atlarge.opendc.compute.virt.driver
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -37,28 +38,19 @@ import java.util.UUID
*/
public interface VirtDriver {
/**
+ * The events emitted by the driver.
+ */
+ public val events: Flow<HypervisorEvent>
+
+ /**
* Spawn the given [Image] on the compute resource of this driver.
*
+ * @param name The name of the server to spawn.
* @param image The image to deploy.
- * @param monitor The monitor to use for the deployment of this particular image.
* @param flavor The flavor of the server which this driver is controlling.
* @return The virtual server spawned by this method.
*/
- public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server
-
- /**
- * Adds the given [VirtDriverMonitor] to the list of monitors to keep informed on the state of this driver.
- *
- * @param monitor The monitor to keep informed.
- */
- public suspend fun addMonitor(monitor: VirtDriverMonitor)
-
- /**
- * Removes the given [VirtDriverMonitor] from the list of monitors.
- *
- * @param monitor The monitor to unsubscribe
- */
- public suspend fun removeMonitor(monitor: VirtDriverMonitor)
+ public suspend fun spawn(name: String, image: Image, flavor: Flavor): Server
companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver")
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt
deleted file mode 100644
index cf2f4619..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.atlarge.opendc.compute.virt.driver
-
-/**
- * Monitor for entities interested in the state of a [VirtDriver].
- */
-interface VirtDriverMonitor {
- /**
- * Called when the number of active servers on the server managed by this driver is updated.
- *
- * @param numberOfActiveServers The number of active servers.
- * @param availableMemory The available memory, in MB.
- */
- public suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long)
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt
deleted file mode 100644
index 1e3981f6..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.atlarge.opendc.compute.virt.monitor
-
-import com.atlarge.opendc.compute.core.Server
-
-/**
- * Monitoring interface for hypervisor-specific events.
- */
-interface HypervisorMonitor {
- /**
- * Invoked after a scheduling slice has finished processed.
- *
- * @param time The current time (in ms).
- * @param requestedBurst The total requested CPU time (can be above capacity).
- * @param grantedBurst The actual total granted capacity.
- * @param numberOfDeployedImages The number of images deployed on this hypervisor.
- * @param hostServer The server hosting this hypervisor.
- */
- suspend fun onSliceFinish(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- numberOfDeployedImages: Int,
- hostServer: Server
- )
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt
new file mode 100644
index 00000000..97842f18
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt
@@ -0,0 +1,12 @@
+package com.atlarge.opendc.compute.virt.service
+
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+
+class HypervisorView(
+ var server: Server,
+ var numberOfActiveServers: Int,
+ var availableMemory: Long
+) {
+ lateinit var driver: VirtDriver
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt
deleted file mode 100644
index 41e67624..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.atlarge.opendc.compute.virt.service
-
-import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
-
-class NodeView(
- val node: Node,
- val hypervisor: HypervisorImage,
- var numberOfActiveServers: Int,
- var availableMemory: Long
-)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 17960186..156521db 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -3,123 +3,175 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.odcsim.SimulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
-import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
-import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import com.atlarge.opendc.compute.virt.HypervisorImage
+import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
+import com.atlarge.opendc.core.services.ServiceKey
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlinx.coroutines.withContext
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
public override val allocationPolicy: AllocationPolicy,
private val ctx: SimulationContext,
- private val provisioningService: ProvisioningService,
- private val hypervisorMonitor: HypervisorMonitor
-) : VirtProvisioningService, ServerMonitor {
+ private val provisioningService: ProvisioningService
+) : VirtProvisioningService, CoroutineScope by ctx.domain {
/**
- * The nodes that are controlled by the service.
+ * The hypervisors that have been launched by the service.
*/
- internal lateinit var nodes: List<Node>
+ private val hypervisors: MutableMap<Server, HypervisorView> = mutableMapOf()
/**
- * The available nodes.
+ * The available hypervisors.
*/
- internal val availableNodes: MutableSet<NodeView> = mutableSetOf()
+ private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf()
/**
* The incoming images to be processed by the provisioner.
*/
- internal val incomingImages: MutableSet<ImageView> = mutableSetOf()
+ private val incomingImages: MutableSet<ImageView> = mutableSetOf()
/**
* The active images in the system.
*/
- internal val activeImages: MutableSet<ImageView> = mutableSetOf()
+ private val activeImages: MutableSet<ImageView> = mutableSetOf()
init {
- ctx.domain.launch {
- val provisionedNodes = provisioningService.nodes().toList()
- val deployedNodes = provisionedNodes.map { node ->
- val hypervisorImage = HypervisorImage(hypervisorMonitor)
- val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService)
- val nodeView = NodeView(
- deployedNode,
- hypervisorImage,
- 0,
- deployedNode.server!!.flavor.memorySize
- )
- yield()
- deployedNode.server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor {
- override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) {
- nodeView.numberOfActiveServers = numberOfActiveServers
- nodeView.availableMemory = availableMemory
+ launch {
+ val provisionedNodes = provisioningService.nodes()
+ provisionedNodes.forEach { node ->
+ val hypervisorImage = HypervisorImage
+ val node = provisioningService.deploy(node, hypervisorImage)
+ node.server!!.events.onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> stateChanged(event.server)
+ is ServerEvent.ServicePublished -> servicePublished(event.server, event.key)
}
- })
- nodeView
+ }.launchIn(this)
}
- nodes = deployedNodes.map { it.node }
- availableNodes.addAll(deployedNodes)
}
}
- override suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor) {
- val vmInstance = ImageView(image, monitor, flavor)
- incomingImages += vmInstance
- requestCycle()
+ override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) {
+ availableHypervisors.map { it.driver }.toSet()
+ }
+
+ override suspend fun deploy(
+ name: String,
+ image: Image,
+ flavor: Flavor
+ ): Server = withContext(coroutineContext) {
+ suspendCancellableCoroutine<Server> { cont ->
+ val vmInstance = ImageView(name, image, flavor, cont)
+ incomingImages += vmInstance
+ requestCycle()
+ }
}
+ private var call: Job? = null
+
private fun requestCycle() {
- ctx.domain.launch {
+ if (call != null) {
+ return
+ }
+
+ val call = launch {
+ delay(1)
+ this@SimpleVirtProvisioningService.call = null
schedule()
}
+ this.call = call
}
private suspend fun schedule() {
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
- println("Spawning $imageInstance")
+ val selectedHv = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break
+ try {
+ println("Spawning ${imageInstance.image}")
+ incomingImages -= imageInstance
- val selectedNode = availableNodes.minWith(allocationPolicy().thenBy { it.node.uid })
+ // Speculatively update the hypervisor view information to prevent other images in the queue from
+ // deciding on stale values.
+ selectedHv.numberOfActiveServers++
+ selectedHv.availableMemory -= (imageInstance.image as VmImage).requiredMemory // XXX Temporary hack
- try {
- imageInstance.server = selectedNode?.node!!.server!!.serviceRegistry[VirtDriver.Key].spawn(
+ val server = selectedHv.driver.spawn(
+ imageInstance.name,
imageInstance.image,
- imageInstance.monitor,
imageInstance.flavor
)
+ imageInstance.server = server
+ imageInstance.continuation.resume(server)
activeImages += imageInstance
} catch (e: InsufficientMemoryOnServerException) {
println("Unable to deploy image due to insufficient memory")
- }
- incomingImages -= imageInstance
+ selectedHv.numberOfActiveServers--
+ selectedHv.availableMemory += (imageInstance.image as VmImage).requiredMemory
+ }
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
- // TODO handle hypervisor server becoming active
+ val hvView = HypervisorView(
+ server,
+ 0,
+ server.flavor.memorySize
+ )
+ hypervisors[server] = hvView
}
ServerState.SHUTOFF, ServerState.ERROR -> {
- // TODO handle hypervisor server shutting down or failing
+ val hv = hypervisors[server] ?: return
+ availableHypervisors -= hv
+ requestCycle()
}
else -> throw IllegalStateException()
}
}
- class ImageView(
+ private fun servicePublished(server: Server, key: ServiceKey<*>) {
+ if (key == VirtDriver.Key) {
+ val hv = hypervisors[server] ?: return
+ hv.driver = server.services[VirtDriver]
+ availableHypervisors += hv
+
+ hv.driver.events
+ .onEach { event ->
+ if (event is HypervisorEvent.VmsUpdated) {
+ hv.numberOfActiveServers = event.numberOfActiveServers
+ hv.availableMemory = event.availableMemory
+ }
+ }.launchIn(this)
+
+ requestCycle()
+ }
+ }
+
+ data class ImageView(
+ val name: String,
val image: Image,
- val monitor: ServerMonitor,
val flavor: Flavor,
+ val continuation: Continuation<Server>,
var server: Server? = null
)
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 7770ec50..a3ade2fb 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -1,8 +1,9 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
/**
@@ -12,11 +13,16 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
+ * Obtain the active hypervisors for this provisioner.
+ */
+ public suspend fun drivers(): Set<VirtDriver>
+
+ /**
* Submit the specified [Image] to the provisioning service.
*
+ * @param name The name of the server to deploy.
* @param image The image to be deployed.
- * @param monitor The monitor to inform on events.
* @param flavor The flavor of the machine instance to run this [image] on.
*/
- public suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor)
+ public suspend fun deploy(name: String, image: Image, flavor: Flavor): Server
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt
index a1c0ab9a..e2871cca 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt
@@ -1,7 +1,7 @@
package com.atlarge.opendc.compute.virt.service.allocation
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.virt.service.NodeView
+import com.atlarge.opendc.compute.virt.service.HypervisorView
/**
* A policy for selecting the [Node] an image should be deployed to,
@@ -10,5 +10,5 @@ interface AllocationPolicy {
/**
* Builds the logic of the policy.
*/
- operator fun invoke(): Comparator<NodeView>
+ operator fun invoke(): Comparator<HypervisorView>
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt
index b3e9d77e..f095849b 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt
@@ -1,12 +1,12 @@
package com.atlarge.opendc.compute.virt.service.allocation
-import com.atlarge.opendc.compute.virt.service.NodeView
+import com.atlarge.opendc.compute.virt.service.HypervisorView
/**
* Allocation policy that selects the node with the most available memory.
*/
class AvailableMemoryAllocationPolicy : AllocationPolicy {
- override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 ->
+ override fun invoke(): Comparator<HypervisorView> = Comparator { o1, o2 ->
compareValuesBy(o1, o2) { -it.availableMemory }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt
index 9d6582dd..59e48465 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt
@@ -1,13 +1,13 @@
package com.atlarge.opendc.compute.virt.service.allocation
-import com.atlarge.opendc.compute.virt.service.NodeView
+import com.atlarge.opendc.compute.virt.service.HypervisorView
import kotlinx.coroutines.runBlocking
/**
* Allocation policy that selects the node with the least amount of active servers.
*/
class NumberOfActiveServersAllocationPolicy : AllocationPolicy {
- override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 ->
+ override fun invoke(): Comparator<HypervisorView> = Comparator { o1, o2 ->
runBlocking {
compareValuesBy(o1, o2) { it.numberOfActiveServers }
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index b8882eda..0fc64373 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -25,14 +25,12 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.metal.PowerState
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
@@ -55,21 +53,19 @@ internal class SimpleBareMetalDriverTest {
val dom = root.newDomain(name = "driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList())
-
- val monitor = object : ServerMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("[${simulationContext.clock.millis()}] $server")
- finalState = server.state
- }
- }
+ val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2)
// Batch driver commands
withContext(dom.coroutineContext) {
- driver.init(monitor)
+ driver.init()
driver.setImage(image)
- driver.setPower(PowerState.POWER_ON)
+ val server = driver.start().server!!
+ server.events.collect { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> { println(event); finalState = event.server.state }
+ }
+ }
}
}
@@ -78,6 +74,6 @@ internal class SimpleBareMetalDriverTest {
system.terminate()
}
- assertEquals(finalState, ServerState.SHUTOFF)
+ assertEquals(ServerState.SHUTOFF, finalState)
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index a837130d..f8bd786e 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -27,12 +27,10 @@ package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
@@ -53,23 +51,18 @@ internal class SimpleProvisioningServiceTest {
val root = system.newDomain(name = "root")
root.launch {
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
- val monitor = object : ServerMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println(server)
- }
- }
-
val dom = root.newDomain("provisioner")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val provisioner = SimpleProvisioningService(dom)
provisioner.create(driver)
delay(5)
val nodes = provisioner.nodes()
- provisioner.deploy(nodes.first(), image, monitor)
+ val node = provisioner.deploy(nodes.first(), image)
+ node.server!!.events.collect { println(it) }
}
runBlocking {
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 254ad5fe..58d784b0 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -22,22 +22,19 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.metal.PowerState
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
@@ -51,6 +48,7 @@ internal class HypervisorTest {
/**
* A smoke test for the bare-metal driver.
*/
+ @OptIn(ExperimentalCoroutinesApi::class)
@Test
fun smoke() {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
@@ -58,41 +56,29 @@ internal class HypervisorTest {
val root = system.newDomain("root")
root.launch {
- val vmm = HypervisorImage(object : HypervisorMonitor {
- override suspend fun onSliceFinish(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- numberOfDeployedImages: Int,
- hostServer: Server
- ) {
- println("Hello World!")
- }
- })
+ val vmm = HypervisorImage
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
- val monitor = object : ServerMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("[${simulationContext.clock.millis()}]: $server")
- }
- }
val driverDom = root.newDomain("driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
val cpus = List(2) { ProcessingUnit(cpuNode, it, 2000.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
- metalDriver.init(monitor)
+ metalDriver.init()
metalDriver.setImage(vmm)
- metalDriver.setPower(PowerState.POWER_ON)
+ val node = metalDriver.start()
+ node.server?.events?.onEach { println(it) }?.launchIn(this)
delay(5)
val flavor = Flavor(1, 0)
- val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver]
- vmDriver.spawn(workloadA, monitor, flavor)
- vmDriver.spawn(workloadB, monitor, flavor)
+ val vmDriver = metalDriver.refresh().server!!.services[VirtDriver]
+ val vmA = vmDriver.spawn("a", workloadA, flavor)
+ vmA.events.onEach { println(it) }.launchIn(this)
+ val vmB = vmDriver.spawn("b", workloadB, flavor)
+ vmB.events.onEach { println(it) }.launchIn(this)
}
runBlocking {