summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt8
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt)29
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt10
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt)23
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt3
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt62
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt3
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt30
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt24
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt59
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt14
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt13
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt67
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt25
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt36
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt4
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt27
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt13
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt34
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt3
21 files changed, 239 insertions, 250 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index 31b070a4..01968cd8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,6 +28,7 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -67,7 +68,12 @@ public data class Server(
/**
* The services published by this server.
*/
- public val services: ServiceRegistry = ServiceRegistry()
+ public val services: ServiceRegistry,
+
+ /**
+ * The events that are emitted by the server.
+ */
+ public val events: Flow<ServerEvent>
) : Resource {
override fun hashCode(): Int = uid.hashCode()
override fun equals(other: Any?): Boolean = other is Server && uid == other.uid
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt
index c2b30b9d..1595937c 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt
@@ -22,29 +22,32 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.core.monitor
+package com.atlarge.opendc.compute.core
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.core.services.ServiceKey
/**
- * An interface for monitoring the state of a machine.
+ * An event that is emitted by a [Server].
*/
-public interface ServerMonitor {
+public sealed class ServerEvent {
/**
- * This method is synchronously invoked when the state of a machine updates.
+ * The server that emitted the event.
+ */
+ public abstract val server: Server
+
+ /**
+ * This event is emitted when the state of [server] changes.
*
- * @param server The server which state was updated.
- * @param previousState The previous state of the server.
+ * @property server The server of which the state changed.
+ * @property previousState The previous state of the server.
*/
- public fun stateChanged(server: Server, previousState: ServerState) {}
+ public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent()
/**
- * This method is synchronously invoked when the server publishes a service.
+ * This event is emitted when a server publishes a service.
*
- * @param server The server that published the service.
- * @param key The key of the service that was published.
+ * @property server The server that published the service.
+ * @property key The service key of the service that was published.
*/
- public fun servicePublished(server: Server, key: ServiceKey<*>) {}
+ public data class ServicePublished(override val server: Server, val key: ServiceKey<*>) : ServerEvent()
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt
index abf6f8db..e4da557b 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt
@@ -27,7 +27,7 @@ package com.atlarge.opendc.compute.core.execution
import kotlinx.coroutines.CancellationException
/**
- * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown signal
+ * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown flow
* has been sent to the server.
*/
public class ShutdownException(message: String? = null, override val cause: Throwable? = null) : CancellationException(message)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index 55948d3c..8b8d1596 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -27,6 +27,7 @@ package com.atlarge.opendc.compute.metal
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -46,7 +47,7 @@ data class Node(
/**
* Meta data of the node.
*/
- public val metadata: Map<String, Any> = emptyMap(),
+ public val metadata: Map<String, Any>,
/**
* The last known state of the compute node.
@@ -61,7 +62,12 @@ data class Node(
/**
* The server instance that is running on the node or `null` if no server is running.
*/
- public val server: Server?
+ public val server: Server?,
+
+ /**
+ * The events that are emitted by the node.
+ */
+ public val events: Flow<NodeEvent>
) : Identity {
override fun hashCode(): Int = uid.hashCode()
override fun equals(other: Any?): Boolean = other is Node && uid == other.uid
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt
index bd4b40d8..7719db24 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt
@@ -22,21 +22,22 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.metal.monitor
-
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.NodeState
+package com.atlarge.opendc.compute.metal
/**
- * An interface for monitoring bare-metal nodes.
+ * An event that is emitted by a [Node].
*/
-public interface NodeMonitor : ServerMonitor {
+public sealed class NodeEvent {
+ /**
+ * The node that emitted the event.
+ */
+ public abstract val node: Node
+
/**
- * This method is synchronously invoked when the state of a bare metal machine updates.
+ * This event is emitted when the state of [node] changes.
*
- * @param node The node for which state was updated.
- * @param previousState The previous state of the node.
+ * @property node The node of which the state changed.
+ * @property previousState The previous state of the node.
*/
- public fun stateChanged(node: Node, previousState: NodeState) {}
+ public data class StateChanged(override val node: Node, val previousState: NodeState) : NodeEvent()
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 3956338b..5d1db378 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -27,7 +27,6 @@ package com.atlarge.opendc.compute.metal.driver
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.monitor.NodeMonitor
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.power.Powerable
import com.atlarge.opendc.core.services.AbstractServiceKey
@@ -47,7 +46,7 @@ public interface BareMetalDriver : Powerable, FailureDomain {
/**
* Initialize the driver.
*/
- public suspend fun init(monitor: NodeMonitor): Node
+ public suspend fun init(): Node
/**
* Start the bare metal node with the specified boot disk image.
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 46b4c30c..49c3fa2e 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -25,12 +25,14 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.signal.Signal
+import com.atlarge.odcsim.flow.EventFlow
+import com.atlarge.odcsim.flow.StateFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.MemoryUnit
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
@@ -38,8 +40,8 @@ import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.EmptyImage
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.compute.metal.NodeEvent
import com.atlarge.opendc.compute.metal.NodeState
-import com.atlarge.opendc.compute.metal.monitor.NodeMonitor
import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
import com.atlarge.opendc.core.services.ServiceKey
@@ -77,48 +79,47 @@ public class SimpleBareMetalDriver(
powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel(0.0)
) : BareMetalDriver {
/**
- * The monitor to use.
+ * The flavor that corresponds to this machine.
+ */
+ private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum())
+
+ /**
+ * The current active server context.
+ */
+ private var serverContext: BareMetalServerContext? = null
+
+ /**
+ * The events of the machine.
*/
- private lateinit var monitor: NodeMonitor
+ private val events = EventFlow<NodeEvent>()
+
+ /**
+ * The flow containing the load of the server.
+ */
+ private val usageSignal = StateFlow(0.0)
/**
* The machine state.
*/
- private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null)
+ private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)
set(value) {
if (field.state != value.state) {
- monitor.stateChanged(value, field.state)
+ events.emit(NodeEvent.StateChanged(value, field.state))
}
if (field.server != null && value.server != null && field.server!!.state != value.server.state) {
- monitor.stateChanged(value.server, field.server!!.state)
+ serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server!!.state))
}
field = value
}
- /**
- * The flavor that corresponds to this machine.
- */
- private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum())
-
- /**
- * The current active server context.
- */
- private var serverContext: BareMetalServerContext? = null
-
- /**
- * The signal containing the load of the server.
- */
- private val usageSignal = Signal(0.0)
-
override val usage: Flow<Double> = usageSignal
override val powerDraw: Flow<Double> = powerModel(this)
- override suspend fun init(monitor: NodeMonitor): Node = withContext(domain.coroutineContext) {
- this@SimpleBareMetalDriver.monitor = monitor
- return@withContext node
+ override suspend fun init(): Node = withContext(domain.coroutineContext) {
+ node
}
override suspend fun start(): Node = withContext(domain.coroutineContext) {
@@ -126,6 +127,7 @@ public class SimpleBareMetalDriver(
return@withContext node
}
+ val events = EventFlow<ServerEvent>()
val server = Server(
UUID.randomUUID(),
node.name,
@@ -133,11 +135,12 @@ public class SimpleBareMetalDriver(
flavor,
node.image,
ServerState.BUILD,
- ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver)
+ ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver),
+ events
)
node = node.copy(state = NodeState.BOOT, server = server)
- serverContext = BareMetalServerContext()
+ serverContext = BareMetalServerContext(events)
return@withContext node
}
@@ -166,7 +169,7 @@ public class SimpleBareMetalDriver(
override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node }
- private inner class BareMetalServerContext : ServerManagementContext {
+ private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
private var finalized: Boolean = false
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
@@ -175,6 +178,7 @@ public class SimpleBareMetalDriver(
get() = node.server!!
private val job = domain.launch {
+ delay(1) // TODO Introduce boot time
init()
try {
server.image(this@BareMetalServerContext)
@@ -198,7 +202,7 @@ public class SimpleBareMetalDriver(
override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
val server = server.copy(services = server.services.put(key, service))
node = node.copy(server = server)
- monitor.servicePublished(server, key)
+ events.emit(ServerEvent.ServicePublished(server, key))
}
override suspend fun init() {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
index 24ade799..105505f2 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.core.services.AbstractServiceKey
@@ -53,7 +52,7 @@ public interface ProvisioningService {
/**
* Deploy the specified [Image] on a compute node.
*/
- public suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node
+ public suspend fun deploy(node: Node, image: Image): Node
/**
* The service key of this service.
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index e5cd0a77..a7e143aa 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -25,33 +25,22 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.Domain
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
-import com.atlarge.opendc.compute.metal.monitor.NodeMonitor
-import com.atlarge.opendc.core.services.ServiceKey
-import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
/**
* A very basic implementation of the [ProvisioningService].
*/
-public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, NodeMonitor {
+public class SimpleProvisioningService(val domain: Domain) : ProvisioningService {
/**
* The active nodes in this service.
*/
private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf()
- /**
- * The installed monitors.
- */
- private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf()
-
override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) {
- val node = driver.init(this@SimpleProvisioningService)
+ val node = driver.init()
nodes[node] = driver
return@withContext node
}
@@ -62,23 +51,10 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService
return@withContext nodes[node]!!.refresh()
}
- override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
+ override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) {
val driver = nodes[node]!!
driver.setImage(image)
val newNode = driver.reboot()
- monitors[newNode.server!!] = monitor
return@withContext newNode
}
-
- override fun stateChanged(server: Server, previousState: ServerState) {
- domain.launch {
- monitors[server]?.stateChanged(server, previousState)
- }
- }
-
- override fun servicePublished(server: Server, key: ServiceKey<*>) {
- domain.launch {
- monitors[server]?.servicePublished(server, key)
- }
- }
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index d889d0f9..296f170e 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.virt.driver
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.core.services.AbstractServiceKey
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -37,28 +37,18 @@ import java.util.UUID
*/
public interface VirtDriver {
/**
+ * The events emitted by the driver.
+ */
+ public val events: Flow<VirtDriverEvent>
+
+ /**
* Spawn the given [Image] on the compute resource of this driver.
*
* @param image The image to deploy.
- * @param monitor The monitor to use for the deployment of this particular image.
* @param flavor The flavor of the server which this driver is controlling.
* @return The virtual server spawned by this method.
*/
- public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server
-
- /**
- * Adds the given [VirtDriverMonitor] to the list of monitors to keep informed on the state of this driver.
- *
- * @param monitor The monitor to keep informed.
- */
- public suspend fun addMonitor(monitor: VirtDriverMonitor)
-
- /**
- * Removes the given [VirtDriverMonitor] from the list of monitors.
- *
- * @param monitor The monitor to unsubscribe
- */
- public suspend fun removeMonitor(monitor: VirtDriverMonitor)
+ public suspend fun spawn(image: Image, flavor: Flavor): Server
companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver")
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt
new file mode 100644
index 00000000..ccbe8b3c
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt
@@ -0,0 +1,59 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.virt.driver
+
+/**
+ * An event that is emitted by a [VirtDriver].
+ */
+public sealed class VirtDriverEvent {
+ /**
+ * The driver that emitted the event.
+ */
+ public abstract val driver: VirtDriver
+
+ /**
+ * This event is emitted when the number of active servers on the server managed by this driver is updated.
+ *
+ * @property driver The driver that emitted the event.
+ * @property numberOfActiveServers The number of active servers.
+ * @property availableMemory The available memory, in MB.
+ */
+ public data class VmsUpdated(override val driver: VirtDriver, public val numberOfActiveServers: Int, public val availableMemory: Long) : VirtDriverEvent()
+
+ /**
+ * This event is emitted when a slice is finished.
+ *
+ * @property driver The driver that emitted the event.
+ * @property requestedBurst The total requested CPU time (can be above capacity).
+ * @property grantedBurst The actual total granted capacity.
+ * @property numberOfDeployedImages The number of images deployed on this hypervisor.
+ */
+ public data class SliceFinished(
+ override val driver: VirtDriver,
+ public val requestedBurst: Long,
+ public val grantedBurst: Long,
+ public val numberOfDeployedImages: Int
+ ) : VirtDriverEvent()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt
deleted file mode 100644
index cf2f4619..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.atlarge.opendc.compute.virt.driver
-
-/**
- * Monitor for entities interested in the state of a [VirtDriver].
- */
-interface VirtDriverMonitor {
- /**
- * Called when the number of active servers on the server managed by this driver is updated.
- *
- * @param numberOfActiveServers The number of active servers.
- * @param availableMemory The available memory, in MB.
- */
- public suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long)
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
index 0f4d3c15..1eb0e0ff 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
@@ -27,7 +27,6 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import com.atlarge.opendc.core.resource.TagContainer
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
@@ -36,20 +35,22 @@ import java.util.UUID
/**
* A hypervisor managing the VMs of a node.
*/
-class HypervisorImage(
- private val hypervisorMonitor: HypervisorMonitor
-) : Image {
+object HypervisorImage : Image {
override val uid: UUID = UUID.randomUUID()
override val name: String = "vmm"
override val tags: TagContainer = emptyMap()
override suspend fun invoke(ctx: ServerContext) {
coroutineScope {
- val driver = HypervisorVirtDriver(ctx, hypervisorMonitor, this)
+ val driver = HypervisorVirtDriver(ctx, this)
ctx.publishService(VirtDriver.Key, driver)
// Suspend image until it is cancelled
- suspendCancellableCoroutine<Unit> {}
+ try {
+ suspendCancellableCoroutine<Unit> {}
+ } finally {
+ driver.eventFlow.close()
+ }
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
index 98d8092c..0b4a7109 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
@@ -25,28 +25,33 @@
package com.atlarge.opendc.compute.virt.driver.hypervisor
import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import com.atlarge.opendc.compute.virt.driver.VirtDriverEvent
import com.atlarge.opendc.core.services.ServiceKey
+import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
@@ -56,12 +61,18 @@ import kotlin.math.min
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
*/
+@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
class HypervisorVirtDriver(
private val hostContext: ServerContext,
- private val monitor: HypervisorMonitor,
private val coroutineScope: CoroutineScope
) : VirtDriver {
/**
+ * The [Server] on which this hypervisor runs.
+ */
+ public val server: Server
+ get() = hostContext.server
+
+ /**
* A set for tracking the VM context objects.
*/
internal val vms: MutableSet<VmServerContext> = mutableSetOf()
@@ -72,32 +83,33 @@ class HypervisorVirtDriver(
private var availableMemory: Long = hostContext.server.flavor.memorySize
/**
- * Monitors to keep informed.
+ * The [EventFlow] to emit the events.
*/
- private val monitors: MutableSet<VirtDriverMonitor> = mutableSetOf()
+ internal val eventFlow = EventFlow<VirtDriverEvent>()
- override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server {
+ override val events: Flow<VirtDriverEvent> = eventFlow
+
+ override suspend fun spawn(
+ image: Image,
+ flavor: Flavor
+ ): Server {
val requiredMemory = flavor.memorySize
if (availableMemory - requiredMemory < 0) {
throw InsufficientMemoryOnServerException()
}
require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" }
- val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD)
+ val events = EventFlow<ServerEvent>()
+ val server = Server(
+ UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD,
+ ServiceRegistry(), events
+ )
availableMemory -= requiredMemory
- vms.add(VmServerContext(server, monitor, simulationContext.domain))
- monitors.forEach { it.onUpdate(vms.size, availableMemory) }
+ vms.add(VmServerContext(server, events, simulationContext.domain))
+ eventFlow.emit(VirtDriverEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
- override suspend fun addMonitor(monitor: VirtDriverMonitor) {
- monitors.add(monitor)
- }
-
- override suspend fun removeMonitor(monitor: VirtDriverMonitor) {
- monitors.remove(monitor)
- }
-
/**
* A flag to indicate the driver is stopped.
*/
@@ -211,13 +223,7 @@ class HypervisorVirtDriver(
}
}
- monitor.onSliceFinish(
- end,
- totalBurst,
- totalBurst - totalRemainder,
- vms.size,
- hostContext.server
- )
+ eventFlow.emit(VirtDriverEvent.SliceFinished(this@HypervisorVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size))
}
this.call = call
}
@@ -250,7 +256,7 @@ class HypervisorVirtDriver(
internal inner class VmServerContext(
server: Server,
- val monitor: ServerMonitor,
+ val events: EventFlow<ServerEvent>,
val domain: Domain
) : ServerManagementContext {
private var finalized: Boolean = false
@@ -261,6 +267,7 @@ class HypervisorVirtDriver(
private var initialized: Boolean = false
internal val job: Job = coroutineScope.launch {
+ delay(1) // TODO Introduce boot time
init()
try {
server.image(this@VmServerContext)
@@ -273,7 +280,7 @@ class HypervisorVirtDriver(
override var server: Server = server
set(value) {
if (field.state != value.state) {
- monitor.stateChanged(value, field.state)
+ events.emit(ServerEvent.StateChanged(value, field.state))
}
field = value
@@ -283,7 +290,7 @@ class HypervisorVirtDriver(
override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
server = server.copy(services = server.services.put(key, service))
- monitor.servicePublished(server, key)
+ events.emit(ServerEvent.ServicePublished(server, key))
}
override suspend fun init() {
@@ -304,8 +311,8 @@ class HypervisorVirtDriver(
server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
vms.remove(this)
-
- monitors.forEach { it.onUpdate(vms.size, availableMemory) }
+ events.close()
+ eventFlow.emit(VirtDriverEvent.VmsUpdated(this@HypervisorVirtDriver, vms.size, availableMemory))
}
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt
deleted file mode 100644
index 1e3981f6..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.atlarge.opendc.compute.virt.monitor
-
-import com.atlarge.opendc.compute.core.Server
-
-/**
- * Monitoring interface for hypervisor-specific events.
- */
-interface HypervisorMonitor {
- /**
- * Invoked after a scheduling slice has finished processed.
- *
- * @param time The current time (in ms).
- * @param requestedBurst The total requested CPU time (can be above capacity).
- * @param grantedBurst The actual total granted capacity.
- * @param numberOfDeployedImages The number of images deployed on this hypervisor.
- * @param hostServer The server hosting this hypervisor.
- */
- suspend fun onSliceFinish(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- numberOfDeployedImages: Int,
- hostServer: Server
- )
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 6fb821d7..8365f8c9 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -3,25 +3,27 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.odcsim.SimulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.services.ServiceKey
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
+@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
public override val allocationPolicy: AllocationPolicy,
private val ctx: SimulationContext,
- private val provisioningService: ProvisioningService,
- private val hypervisorMonitor: HypervisorMonitor
-) : VirtProvisioningService, ServerMonitor {
+ private val provisioningService: ProvisioningService
+) : VirtProvisioningService {
/**
* The hypervisors that have been launched by the service.
*/
@@ -46,14 +48,24 @@ class SimpleVirtProvisioningService(
ctx.domain.launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
- val hypervisorImage = HypervisorImage(hypervisorMonitor)
- provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService)
+ val hypervisorImage = HypervisorImage
+ val node = provisioningService.deploy(node, hypervisorImage)
+ node.server!!.events.onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> stateChanged(event.server, event.previousState)
+ is ServerEvent.ServicePublished -> servicePublished(event.server, event.key)
+ }
+ }
+ .launchIn(ctx.domain)
}
}
}
- override suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor) {
- val vmInstance = ImageView(image, monitor, flavor)
+ override suspend fun deploy(
+ image: Image,
+ flavor: Flavor
+ ) {
+ val vmInstance = ImageView(image, flavor)
incomingImages += vmInstance
requestCycle()
}
@@ -82,7 +94,6 @@ class SimpleVirtProvisioningService(
incomingImages -= imageInstance
imageInstance.server = selectedHv.driver.spawn(
imageInstance.image,
- imageInstance.monitor,
imageInstance.flavor
)
activeImages += imageInstance
@@ -92,7 +103,7 @@ class SimpleVirtProvisioningService(
}
}
- override fun stateChanged(server: Server, previousState: ServerState) {
+ private fun stateChanged(server: Server, previousState: ServerState) {
when (server.state) {
ServerState.ACTIVE -> {
val hvView = HypervisorView(
@@ -111,7 +122,7 @@ class SimpleVirtProvisioningService(
}
}
- override fun servicePublished(server: Server, key: ServiceKey<*>) {
+ private fun servicePublished(server: Server, key: ServiceKey<*>) {
if (key == VirtDriver.Key) {
val hv = hypervisors[server] ?: return
hv.driver = server.services[VirtDriver]
@@ -122,7 +133,6 @@ class SimpleVirtProvisioningService(
data class ImageView(
val image: Image,
- val monitor: ServerMonitor,
val flavor: Flavor,
var server: Server? = null
)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 7770ec50..da72d742 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -2,7 +2,6 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
/**
@@ -15,8 +14,7 @@ interface VirtProvisioningService {
* Submit the specified [Image] to the provisioning service.
*
* @param image The image to be deployed.
- * @param monitor The monitor to inform on events.
* @param flavor The flavor of the machine instance to run this [image] on.
*/
- public suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor)
+ public suspend fun deploy(image: Image, flavor: Flavor)
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index c5c0441c..e0d8799f 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -25,15 +25,12 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.NodeState
-import com.atlarge.opendc.compute.metal.monitor.NodeMonitor
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
@@ -57,24 +54,18 @@ internal class SimpleBareMetalDriverTest {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList())
-
- val monitor = object : NodeMonitor {
- override fun stateChanged(node: Node, previousState: NodeState) {
- println(node)
- }
-
- override fun stateChanged(server: Server, previousState: ServerState) {
- println("$server")
- finalState = server.state
- }
- }
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2)
// Batch driver commands
withContext(dom.coroutineContext) {
- driver.init(monitor)
+ driver.init()
driver.setImage(image)
- driver.start()
+ val server = driver.start().server!!
+ server.events.collect { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> finalState = event.server.state
+ }
+ }
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index 9cbb9baa..8e07c09c 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -27,12 +27,10 @@ package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
@@ -53,12 +51,6 @@ internal class SimpleProvisioningServiceTest {
val root = system.newDomain(name = "root")
root.launch {
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
- val monitor = object : ServerMonitor {
- override fun stateChanged(server: Server, previousState: ServerState) {
- println(server)
- }
- }
-
val dom = root.newDomain("provisioner")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
@@ -69,7 +61,8 @@ internal class SimpleProvisioningServiceTest {
provisioner.create(driver)
delay(5)
val nodes = provisioner.nodes()
- provisioner.deploy(nodes.first(), image, monitor)
+ val node = provisioner.deploy(nodes.first(), image)
+ node.server!!.events.collect { println(it) }
}
runBlocking {
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
index 9ceaf704..d86045c0 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
@@ -26,16 +26,15 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
-import com.atlarge.opendc.compute.metal.monitor.NodeMonitor
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
@@ -49,6 +48,7 @@ internal class HypervisorTest {
/**
* A smoke test for the bare-metal driver.
*/
+ @OptIn(ExperimentalCoroutinesApi::class)
@Test
fun smoke() {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
@@ -56,24 +56,9 @@ internal class HypervisorTest {
val root = system.newDomain("root")
root.launch {
- val vmm = HypervisorImage(object : HypervisorMonitor {
- override suspend fun onSliceFinish(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- numberOfDeployedImages: Int,
- hostServer: Server
- ) {
- println("Hello World!")
- }
- })
+ val vmm = HypervisorImage
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
- val monitor = object : NodeMonitor {
- override fun stateChanged(server: Server, previousState: ServerState) {
- println("$server")
- }
- }
val driverDom = root.newDomain("driver")
@@ -81,16 +66,17 @@ internal class HypervisorTest {
val cpus = List(2) { ProcessingUnit(cpuNode, it, 2000.0) }
val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", cpus, emptyList())
- metalDriver.init(monitor)
+ metalDriver.init()
metalDriver.setImage(vmm)
- metalDriver.start()
+ val node = metalDriver.start()
+ node.server?.events?.onEach { println(it) }?.launchIn(this)
delay(5)
val flavor = Flavor(1, 0)
val vmDriver = metalDriver.refresh().server!!.services[VirtDriver]
- vmDriver.spawn(workloadA, monitor, flavor)
- vmDriver.spawn(workloadB, monitor, flavor)
+ vmDriver.spawn(workloadA, flavor).events.onEach { println(it) }.launchIn(this)
+ vmDriver.spawn(workloadB, flavor)
}
runBlocking {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index efc85653..4273c39e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -112,7 +112,6 @@ fun main(args: Array<String>) {
AvailableMemoryAllocationPolicy(),
simulationContext,
bareMetalProvisioner,
- monitor
)
val faultInjectorDomain = root.newDomain(name = "failures")
@@ -133,7 +132,7 @@ fun main(args: Array<String>) {
val (time, workload) = reader.next()
delay(max(0, time - simulationContext.clock.millis()))
chan.send(Unit)
- scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory))
+ scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory))
}
println(simulationContext.clock.instant())