summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-17 22:26:15 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-25 10:48:58 +0100
commitb1cf9b2bd9559328c3c9d26e73123e67d2bfea05 (patch)
tree62de7a5a2b386e1467171578742dc983bd9f7c19
parent6b10881f123f5e6a8e7bce1045d02eba5e48c3a2 (diff)
refactor: Rework monitor interfaces
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt3
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt8
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt13
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt44
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt14
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt7
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt56
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt6
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt2
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt7
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt19
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt20
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt7
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt1
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt6
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt5
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt5
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt64
20 files changed, 174 insertions, 143 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index 86ec9a5b..31b070a4 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,7 +28,6 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
-import com.atlarge.opendc.core.services.ServiceRegistryImpl
import java.util.UUID
/**
@@ -68,7 +67,7 @@ public data class Server(
/**
* The services published by this server.
*/
- public val serviceRegistry: ServiceRegistry = ServiceRegistryImpl()
+ public val services: ServiceRegistry = ServiceRegistry()
) : Resource {
override fun hashCode(): Int = uid.hashCode()
override fun equals(other: Any?): Boolean = other is Server && uid == other.uid
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index b09a5a7d..c8caaca6 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -27,7 +27,7 @@ package com.atlarge.opendc.compute.core.execution
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.core.services.AbstractServiceKey
+import com.atlarge.opendc.core.services.ServiceKey
/**
* Represents the execution context in which an bootable [Image] runs on a [Server].
@@ -44,11 +44,9 @@ public interface ServerContext {
public val cpus: List<ProcessingUnit>
/**
- * Publishes the given [service] with key [serviceKey] in the server's registry.
+ * Publish the specified [service] at the given [ServiceKey].
*/
- public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) {
- server.serviceRegistry[serviceKey] = service
- }
+ public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T)
/**
* Request the specified burst time from the processor cores and suspend execution until a processor core finishes
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt
index 26b94ba5..c2b30b9d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt
@@ -26,16 +26,25 @@ package com.atlarge.opendc.compute.core.monitor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
+import com.atlarge.opendc.core.services.ServiceKey
/**
* An interface for monitoring the state of a machine.
*/
public interface ServerMonitor {
/**
- * This method is invoked when the state of a machine updates.
+ * This method is synchronously invoked when the state of a machine updates.
*
* @param server The server which state was updated.
* @param previousState The previous state of the server.
*/
- public suspend fun onUpdate(server: Server, previousState: ServerState) {}
+ public fun stateChanged(server: Server, previousState: ServerState) {}
+
+ /**
+ * This method is synchronously invoked when the server publishes a service.
+ *
+ * @param server The server that published the service.
+ * @param key The key of the service that was published.
+ */
+ public fun servicePublished(server: Server, key: ServiceKey<*>) {}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index a8f3d781..46b4c30c 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -42,6 +42,8 @@ import com.atlarge.opendc.compute.metal.NodeState
import com.atlarge.opendc.compute.metal.monitor.NodeMonitor
import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
+import com.atlarge.opendc.core.services.ServiceKey
+import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
@@ -83,20 +85,18 @@ public class SimpleBareMetalDriver(
* The machine state.
*/
private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null)
+ set(value) {
+ if (field.state != value.state) {
+ monitor.stateChanged(value, field.state)
+ }
- private suspend fun setNode(value: Node) {
- val field = node
- if (field.state != value.state) {
- monitor.onUpdate(value, field.state)
- }
+ if (field.server != null && value.server != null && field.server!!.state != value.server.state) {
+ monitor.stateChanged(value.server, field.server!!.state)
+ }
- if (field.server != null && value.server != null && field.server.state != value.server.state) {
- monitor.onUpdate(value.server, field.server.state)
+ field = value
}
- node = value
- }
-
/**
* The flavor that corresponds to this machine.
*/
@@ -132,11 +132,11 @@ public class SimpleBareMetalDriver(
emptyMap(),
flavor,
node.image,
- ServerState.BUILD
+ ServerState.BUILD,
+ ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver)
)
- server.serviceRegistry[BareMetalDriver.Key] = this@SimpleBareMetalDriver
- setNode(node.copy(state = NodeState.BOOT, server = server))
+ node = node.copy(state = NodeState.BOOT, server = server)
serverContext = BareMetalServerContext()
return@withContext node
}
@@ -150,7 +150,7 @@ public class SimpleBareMetalDriver(
serverContext!!.cancel(fail = false)
serverContext = null
- setNode(node.copy(state = NodeState.SHUTOFF, server = null))
+ node = node.copy(state = NodeState.SHUTOFF, server = null)
return@withContext node
}
@@ -160,7 +160,7 @@ public class SimpleBareMetalDriver(
}
override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
- setNode(node.copy(image = image))
+ node = node.copy(image = image)
return@withContext node
}
@@ -195,11 +195,17 @@ public class SimpleBareMetalDriver(
job.join()
}
+ override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
+ val server = server.copy(services = server.services.put(key, service))
+ node = node.copy(server = server)
+ monitor.servicePublished(server, key)
+ }
+
override suspend fun init() {
assert(!finalized) { "Machine is already finalized" }
val server = server.copy(state = ServerState.ACTIVE)
- setNode(node.copy(state = NodeState.ACTIVE, server = server))
+ node = node.copy(state = NodeState.ACTIVE, server = server)
}
override suspend fun exit(cause: Throwable?) {
@@ -216,7 +222,7 @@ public class SimpleBareMetalDriver(
else
NodeState.ERROR
val server = server.copy(state = serverState)
- setNode(node.copy(state = nodeState, server = server))
+ node = node.copy(state = nodeState, server = server)
}
private var flush: Job? = null
@@ -278,8 +284,6 @@ public class SimpleBareMetalDriver(
get() = domain
override suspend fun fail() {
- withContext(domain.coroutineContext) {
- serverContext?.cancel(fail = true)
- }
+ serverContext?.cancel(fail = true)
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt
index f35cf57b..bd4b40d8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt
@@ -33,10 +33,10 @@ import com.atlarge.opendc.compute.metal.NodeState
*/
public interface NodeMonitor : ServerMonitor {
/**
- * This method is invoked when the state of a bare metal machine updates.
+ * This method is synchronously invoked when the state of a bare metal machine updates.
*
* @param node The node for which state was updated.
* @param previousState The previous state of the node.
*/
- public suspend fun onUpdate(node: Node, previousState: NodeState) {}
+ public fun stateChanged(node: Node, previousState: NodeState) {}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index d8fe0dd9..e5cd0a77 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -32,6 +32,8 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.metal.monitor.NodeMonitor
+import com.atlarge.opendc.core.services.ServiceKey
+import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
/**
@@ -68,9 +70,15 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService
return@withContext newNode
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- withContext(domain.coroutineContext) {
- monitors[server]?.onUpdate(server, previousState)
+ override fun stateChanged(server: Server, previousState: ServerState) {
+ domain.launch {
+ monitors[server]?.stateChanged(server, previousState)
+ }
+ }
+
+ override fun servicePublished(server: Server, key: ServiceKey<*>) {
+ domain.launch {
+ monitors[server]?.servicePublished(server, key)
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
index 1ff33c0c..98d8092c 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
@@ -39,6 +39,7 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellationException
@@ -248,7 +249,7 @@ class HypervisorVirtDriver(
}
internal inner class VmServerContext(
- override var server: Server,
+ server: Server,
val monitor: ServerMonitor,
val domain: Domain
) : ServerManagementContext {
@@ -269,21 +270,26 @@ class HypervisorVirtDriver(
}
}
- private suspend fun setServer(value: Server) {
- val field = server
- if (field.state != value.state) {
- monitor.onUpdate(value, field.state)
- }
+ override var server: Server = server
+ set(value) {
+ if (field.state != value.state) {
+ monitor.stateChanged(value, field.state)
+ }
- server = value
- }
+ field = value
+ }
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
+ override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
+ server = server.copy(services = server.services.put(key, service))
+ monitor.servicePublished(server, key)
+ }
+
override suspend fun init() {
assert(!finalized) { "VM is already finalized" }
- setServer(server.copy(state = ServerState.ACTIVE))
+ server = server.copy(state = ServerState.ACTIVE)
initialized = true
}
@@ -295,7 +301,7 @@ class HypervisorVirtDriver(
ServerState.SHUTOFF
else
ServerState.ERROR
- setServer(server.copy(state = serverState))
+ server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
vms.remove(this)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt
index 996bd8eb..97842f18 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt
@@ -1,11 +1,12 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
class HypervisorView(
var server: Server,
- val hypervisor: HypervisorImage,
var numberOfActiveServers: Int,
var availableMemory: Long
-)
+) {
+ lateinit var driver: VirtDriver
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index a50292a7..6fb821d7 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -8,11 +8,12 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException
import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
+import com.atlarge.opendc.core.services.ServiceKey
+import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
class SimpleVirtProvisioningService(
@@ -46,15 +47,7 @@ class SimpleVirtProvisioningService(
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
val hypervisorImage = HypervisorImage(hypervisorMonitor)
- val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService)
- val server = deployedNode.server!!
- val hvView = HypervisorView(
- server,
- hypervisorImage,
- 0,
- server.flavor.memorySize
- )
- hypervisors[server] = hvView
+ provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService)
}
}
}
@@ -65,21 +58,29 @@ class SimpleVirtProvisioningService(
requestCycle()
}
+ private var call: Job? = null
+
private fun requestCycle() {
- ctx.domain.launch {
+ if (call != null) {
+ return
+ }
+
+ val call = ctx.domain.launch {
schedule()
}
+ call.invokeOnCompletion { this.call = null }
+ this.call = call
}
private suspend fun schedule() {
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
- val selectedNode = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break
+ val selectedHv = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break
try {
println("Spawning ${imageInstance.image}")
incomingImages -= imageInstance
- imageInstance.server = selectedNode.server.serviceRegistry[VirtDriver.Key].spawn(
+ imageInstance.server = selectedHv.driver.spawn(
imageInstance.image,
imageInstance.monitor,
imageInstance.flavor
@@ -91,21 +92,15 @@ class SimpleVirtProvisioningService(
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("${server.uid} ${server.state} ${hypervisors[server]}")
+ override fun stateChanged(server: Server, previousState: ServerState) {
when (server.state) {
ServerState.ACTIVE -> {
- val hv = hypervisors[server] ?: return
- availableHypervisors += hv
-
- server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor {
- override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) {
- hv.numberOfActiveServers = numberOfActiveServers
- hv.availableMemory = availableMemory
- }
- })
-
- requestCycle()
+ val hvView = HypervisorView(
+ server,
+ 0,
+ server.flavor.memorySize
+ )
+ hypervisors[server] = hvView
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val hv = hypervisors[server] ?: return
@@ -116,6 +111,15 @@ class SimpleVirtProvisioningService(
}
}
+ override fun servicePublished(server: Server, key: ServiceKey<*>) {
+ if (key == VirtDriver.Key) {
+ val hv = hypervisors[server] ?: return
+ hv.driver = server.services[VirtDriver]
+ availableHypervisors += hv
+ requestCycle()
+ }
+ }
+
data class ImageView(
val image: Image,
val monitor: ServerMonitor,
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index 166e93b8..c5c0441c 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -59,12 +59,12 @@ internal class SimpleBareMetalDriverTest {
val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList())
val monitor = object : NodeMonitor {
- override suspend fun onUpdate(node: Node, previousState: NodeState) {
+ override fun stateChanged(node: Node, previousState: NodeState) {
println(node)
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("[${simulationContext.clock.millis()}] $server")
+ override fun stateChanged(server: Server, previousState: ServerState) {
+ println("$server")
finalState = server.state
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index ef19427e..9cbb9baa 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -54,7 +54,7 @@ internal class SimpleProvisioningServiceTest {
root.launch {
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
val monitor = object : ServerMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ override fun stateChanged(server: Server, previousState: ServerState) {
println(server)
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
index 57a7150e..9ceaf704 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.compute.virt.driver.hypervisor
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
@@ -71,8 +70,8 @@ internal class HypervisorTest {
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
val monitor = object : NodeMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("[${simulationContext.clock.millis()}]: $server")
+ override fun stateChanged(server: Server, previousState: ServerState) {
+ println("$server")
}
}
@@ -89,7 +88,7 @@ internal class HypervisorTest {
delay(5)
val flavor = Flavor(1, 0)
- val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver]
+ val vmDriver = metalDriver.refresh().server!!.services[VirtDriver]
vmDriver.spawn(workloadA, monitor, flavor)
vmDriver.spawn(workloadB, monitor, flavor)
}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt
index a036a705..75aa778f 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt
@@ -25,10 +25,15 @@
package com.atlarge.opendc.core.services
/**
- * A service registry for a datacenter zone.
+ * An immutable service registry interface.
*/
public interface ServiceRegistry {
/**
+ * The keys in this registry.
+ */
+ public val keys: Collection<ServiceKey<*>>
+
+ /**
* Determine if this map contains the service with the specified [ServiceKey].
*
* @param key The key of the service to check for.
@@ -41,12 +46,18 @@ public interface ServiceRegistry {
*
* @param key The key of the service to obtain.
* @return The references to the service.
- * @throws IllegalArgumentException if the key does not exists in the map.
+ * @throws IllegalArgumentException if the key does not exist in the map.
*/
public operator fun <T : Any> get(key: ServiceKey<T>): T
/**
- * Register the specified [ServiceKey] in this registry.
+ * Return the result of associating the specified [service] with the given [key] in this registry.
*/
- public operator fun <T : Any> set(key: ServiceKey<T>, service: T): ServiceRegistry
+ public fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry
}
+
+/**
+ * Construct an empty [ServiceRegistry].
+ */
+@Suppress("FunctionName")
+public fun ServiceRegistry(): ServiceRegistry = ServiceRegistryImpl(emptyMap())
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt
index e3fa171d..0686ebaf 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt
@@ -27,22 +27,18 @@ package com.atlarge.opendc.core.services
/**
* Default implementation of the [ServiceRegistry] interface.
*/
-public class ServiceRegistryImpl : ServiceRegistry {
- /**
- * The map containing the registered services.
- */
- private val services: MutableMap<ServiceKey<*>, Any> = mutableMapOf()
+internal class ServiceRegistryImpl(private val map: Map<ServiceKey<*>, Any>) : ServiceRegistry {
+ override val keys: Collection<ServiceKey<*>>
+ get() = map.keys
- override fun <T : Any> set(key: ServiceKey<T>, service: T) {
- services[key] = service
- }
-
- override fun contains(key: ServiceKey<*>): Boolean = key in services
+ override fun contains(key: ServiceKey<*>): Boolean = key in map
override fun <T : Any> get(key: ServiceKey<T>): T {
@Suppress("UNCHECKED_CAST")
- return services[key] as T
+ return map[key] as T
}
- override fun toString(): String = services.toString()
+ override fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry = ServiceRegistryImpl(map.plus(key to service))
+
+ override fun toString(): String = map.toString()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 40cb9719..0f4d0c1b 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -1,6 +1,5 @@
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
@@ -21,8 +20,8 @@ class Sc20Monitor(
outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("${simulationContext.clock.instant()} ${server.uid} ${server.state}")
+ override fun stateChanged(server: Server, previousState: ServerState) {
+ println("${server.uid} ${server.state}")
if (server.state == ServerState.ERROR) {
failed++
}
@@ -36,7 +35,7 @@ class Sc20Monitor(
hostServer: Server
) {
// Assume for now that the host is not virtualized and measure the current power draw
- val driver = hostServer.serviceRegistry[BareMetalDriver.Key]
+ val driver = hostServer.services[BareMetalDriver.Key]
val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 09b6592e..efc85653 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -39,7 +39,6 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
-import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 0d4bd125..ab9f272f 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -34,7 +34,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
import com.atlarge.opendc.core.Environment
import com.atlarge.opendc.core.Platform
import com.atlarge.opendc.core.Zone
-import com.atlarge.opendc.core.services.ServiceRegistryImpl
+import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
@@ -89,9 +89,7 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
provisioningService.create(node)
}
- val serviceRegistry = ServiceRegistryImpl()
- serviceRegistry[ProvisioningService.Key] = provisioningService
-
+ val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
UUID.randomUUID(), "sc18-platform", listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index ae0ba550..c6a393e1 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -35,7 +35,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
import com.atlarge.opendc.core.Environment
import com.atlarge.opendc.core.Platform
import com.atlarge.opendc.core.Zone
-import com.atlarge.opendc.core.services.ServiceRegistryImpl
+import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.format.environment.EnvironmentReader
import java.io.BufferedReader
import java.io.File
@@ -119,8 +119,7 @@ class Sc20ClusterEnvironmentReader(
provisioningService.create(node)
}
- val serviceRegistry = ServiceRegistryImpl()
- serviceRegistry[ProvisioningService.Key] = provisioningService
+ val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
UUID.randomUUID(), "sc20-platform", listOf(
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index a954a308..07309341 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -35,7 +35,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
import com.atlarge.opendc.core.Environment
import com.atlarge.opendc.core.Platform
import com.atlarge.opendc.core.Zone
-import com.atlarge.opendc.core.services.ServiceRegistryImpl
+import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
@@ -103,8 +103,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
provisioningService.create(node)
}
- val serviceRegistry = ServiceRegistryImpl()
- serviceRegistry[ProvisioningService.Key] = provisioningService
+ val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
UUID.randomUUID(), "sc20-platform", listOf(
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 008cd1ee..a055a3fe 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -294,42 +294,44 @@ class StageWorkflowService(
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) {
- when (server.state) {
- ServerState.ACTIVE -> {
- val task = taskByServer.getValue(server)
- task.startedAt = simulationContext.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
- rootListener.taskStarted(task)
- }
- ServerState.SHUTOFF, ServerState.ERROR -> {
- val task = taskByServer.remove(server) ?: throw IllegalStateException()
- val job = task.job
- task.state = TaskStatus.FINISHED
- task.finishedAt = simulationContext.clock.millis()
- job.tasks.remove(task)
- available += task.host!!
- activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
- rootListener.taskFinished(task)
-
- // Add job roots to the scheduling queue
- for (dependent in task.dependents) {
- if (dependent.state != TaskStatus.READY) {
- continue
+ override fun stateChanged(server: Server, previousState: ServerState) {
+ domain.launch {
+ when (server.state) {
+ ServerState.ACTIVE -> {
+ val task = taskByServer.getValue(server)
+ task.startedAt = simulationContext.clock.millis()
+ task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
+ rootListener.taskStarted(task)
+ }
+ ServerState.SHUTOFF, ServerState.ERROR -> {
+ val task = taskByServer.remove(server) ?: throw IllegalStateException()
+ val job = task.job
+ task.state = TaskStatus.FINISHED
+ task.finishedAt = simulationContext.clock.millis()
+ job.tasks.remove(task)
+ available += task.host!!
+ activeTasks -= task
+ job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
+ rootListener.taskFinished(task)
+
+ // Add job roots to the scheduling queue
+ for (dependent in task.dependents) {
+ if (dependent.state != TaskStatus.READY) {
+ continue
+ }
+
+ incomingTasks += dependent
+ rootListener.taskReady(dependent)
}
- incomingTasks += dependent
- rootListener.taskReady(dependent)
- }
+ if (job.isFinished) {
+ finishJob(job)
+ }
- if (job.isFinished) {
- finishJob(job)
+ requestCycle()
}
-
- requestCycle()
+ else -> throw IllegalStateException()
}
- else -> throw IllegalStateException()
}
}