From b1cf9b2bd9559328c3c9d26e73123e67d2bfea05 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Mar 2020 22:26:15 +0100 Subject: refactor: Rework monitor interfaces --- .../com/atlarge/opendc/compute/core/Server.kt | 3 +- .../opendc/compute/core/execution/ServerContext.kt | 8 +-- .../opendc/compute/core/monitor/ServerMonitor.kt | 13 ++++- .../compute/metal/driver/SimpleBareMetalDriver.kt | 44 ++++++++------- .../opendc/compute/metal/monitor/NodeMonitor.kt | 4 +- .../metal/service/SimpleProvisioningService.kt | 14 ++++- .../virt/driver/hypervisor/HypervisorVirtDriver.kt | 26 +++++---- .../opendc/compute/virt/service/HypervisorView.kt | 7 ++- .../virt/service/SimpleVirtProvisioningService.kt | 56 ++++++++++--------- .../metal/driver/SimpleBareMetalDriverTest.kt | 6 +- .../metal/service/SimpleProvisioningServiceTest.kt | 2 +- .../virt/driver/hypervisor/HypervisorTest.kt | 7 +-- .../opendc/core/services/ServiceRegistry.kt | 19 +++++-- .../opendc/core/services/ServiceRegistryImpl.kt | 20 +++---- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 7 +-- .../opendc/experiments/sc20/TestExperiment.kt | 1 - .../environment/sc18/Sc18EnvironmentReader.kt | 6 +- .../sc20/Sc20ClusterEnvironmentReader.kt | 5 +- .../environment/sc20/Sc20EnvironmentReader.kt | 5 +- .../workflows/service/StageWorkflowService.kt | 64 +++++++++++----------- 20 files changed, 174 insertions(+), 143 deletions(-) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index 86ec9a5b..31b070a4 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,7 +28,6 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.services.ServiceRegistryImpl import java.util.UUID /** @@ -68,7 +67,7 @@ public data class Server( /** * The services published by this server. */ - public val serviceRegistry: ServiceRegistry = ServiceRegistryImpl() + public val services: ServiceRegistry = ServiceRegistry() ) : Resource { override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean = other is Server && uid == other.uid diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index b09a5a7d..c8caaca6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -27,7 +27,7 @@ package com.atlarge.opendc.compute.core.execution import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.core.services.AbstractServiceKey +import com.atlarge.opendc.core.services.ServiceKey /** * Represents the execution context in which an bootable [Image] runs on a [Server]. @@ -44,11 +44,9 @@ public interface ServerContext { public val cpus: List /** - * Publishes the given [service] with key [serviceKey] in the server's registry. + * Publish the specified [service] at the given [ServiceKey]. */ - public suspend fun publishService(serviceKey: AbstractServiceKey, service: T) { - server.serviceRegistry[serviceKey] = service - } + public suspend fun publishService(key: ServiceKey, service: T) /** * Request the specified burst time from the processor cores and suspend execution until a processor core finishes diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt index 26b94ba5..c2b30b9d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt @@ -26,16 +26,25 @@ package com.atlarge.opendc.compute.core.monitor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.core.services.ServiceKey /** * An interface for monitoring the state of a machine. */ public interface ServerMonitor { /** - * This method is invoked when the state of a machine updates. + * This method is synchronously invoked when the state of a machine updates. * * @param server The server which state was updated. * @param previousState The previous state of the server. */ - public suspend fun onUpdate(server: Server, previousState: ServerState) {} + public fun stateChanged(server: Server, previousState: ServerState) {} + + /** + * This method is synchronously invoked when the server publishes a service. + * + * @param server The server that published the service. + * @param key The key of the service that was published. + */ + public fun servicePublished(server: Server, key: ServiceKey<*>) {} } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index a8f3d781..46b4c30c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -42,6 +42,8 @@ import com.atlarge.opendc.compute.metal.NodeState import com.atlarge.opendc.compute.metal.monitor.NodeMonitor import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel +import com.atlarge.opendc.core.services.ServiceKey +import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job @@ -83,20 +85,18 @@ public class SimpleBareMetalDriver( * The machine state. */ private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null) + set(value) { + if (field.state != value.state) { + monitor.stateChanged(value, field.state) + } - private suspend fun setNode(value: Node) { - val field = node - if (field.state != value.state) { - monitor.onUpdate(value, field.state) - } + if (field.server != null && value.server != null && field.server!!.state != value.server.state) { + monitor.stateChanged(value.server, field.server!!.state) + } - if (field.server != null && value.server != null && field.server.state != value.server.state) { - monitor.onUpdate(value.server, field.server.state) + field = value } - node = value - } - /** * The flavor that corresponds to this machine. */ @@ -132,11 +132,11 @@ public class SimpleBareMetalDriver( emptyMap(), flavor, node.image, - ServerState.BUILD + ServerState.BUILD, + ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver) ) - server.serviceRegistry[BareMetalDriver.Key] = this@SimpleBareMetalDriver - setNode(node.copy(state = NodeState.BOOT, server = server)) + node = node.copy(state = NodeState.BOOT, server = server) serverContext = BareMetalServerContext() return@withContext node } @@ -150,7 +150,7 @@ public class SimpleBareMetalDriver( serverContext!!.cancel(fail = false) serverContext = null - setNode(node.copy(state = NodeState.SHUTOFF, server = null)) + node = node.copy(state = NodeState.SHUTOFF, server = null) return@withContext node } @@ -160,7 +160,7 @@ public class SimpleBareMetalDriver( } override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - setNode(node.copy(image = image)) + node = node.copy(image = image) return@withContext node } @@ -195,11 +195,17 @@ public class SimpleBareMetalDriver( job.join() } + override suspend fun publishService(key: ServiceKey, service: T) { + val server = server.copy(services = server.services.put(key, service)) + node = node.copy(server = server) + monitor.servicePublished(server, key) + } + override suspend fun init() { assert(!finalized) { "Machine is already finalized" } val server = server.copy(state = ServerState.ACTIVE) - setNode(node.copy(state = NodeState.ACTIVE, server = server)) + node = node.copy(state = NodeState.ACTIVE, server = server) } override suspend fun exit(cause: Throwable?) { @@ -216,7 +222,7 @@ public class SimpleBareMetalDriver( else NodeState.ERROR val server = server.copy(state = serverState) - setNode(node.copy(state = nodeState, server = server)) + node = node.copy(state = nodeState, server = server) } private var flush: Job? = null @@ -278,8 +284,6 @@ public class SimpleBareMetalDriver( get() = domain override suspend fun fail() { - withContext(domain.coroutineContext) { - serverContext?.cancel(fail = true) - } + serverContext?.cancel(fail = true) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt index f35cf57b..bd4b40d8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt @@ -33,10 +33,10 @@ import com.atlarge.opendc.compute.metal.NodeState */ public interface NodeMonitor : ServerMonitor { /** - * This method is invoked when the state of a bare metal machine updates. + * This method is synchronously invoked when the state of a bare metal machine updates. * * @param node The node for which state was updated. * @param previousState The previous state of the node. */ - public suspend fun onUpdate(node: Node, previousState: NodeState) {} + public fun stateChanged(node: Node, previousState: NodeState) {} } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index d8fe0dd9..e5cd0a77 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -32,6 +32,8 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.metal.monitor.NodeMonitor +import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.launch import kotlinx.coroutines.withContext /** @@ -68,9 +70,15 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService return@withContext newNode } - override suspend fun onUpdate(server: Server, previousState: ServerState) { - withContext(domain.coroutineContext) { - monitors[server]?.onUpdate(server, previousState) + override fun stateChanged(server: Server, previousState: ServerState) { + domain.launch { + monitors[server]?.stateChanged(server, previousState) + } + } + + override fun servicePublished(server: Server, key: ServiceKey<*>) { + domain.launch { + monitors[server]?.servicePublished(server, key) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 1ff33c0c..98d8092c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -39,6 +39,7 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellationException @@ -248,7 +249,7 @@ class HypervisorVirtDriver( } internal inner class VmServerContext( - override var server: Server, + server: Server, val monitor: ServerMonitor, val domain: Domain ) : ServerManagementContext { @@ -269,21 +270,26 @@ class HypervisorVirtDriver( } } - private suspend fun setServer(value: Server) { - val field = server - if (field.state != value.state) { - monitor.onUpdate(value, field.state) - } + override var server: Server = server + set(value) { + if (field.state != value.state) { + monitor.stateChanged(value, field.state) + } - server = value - } + field = value + } override val cpus: List = hostContext.cpus.take(server.flavor.cpuCount) + override suspend fun publishService(key: ServiceKey, service: T) { + server = server.copy(services = server.services.put(key, service)) + monitor.servicePublished(server, key) + } + override suspend fun init() { assert(!finalized) { "VM is already finalized" } - setServer(server.copy(state = ServerState.ACTIVE)) + server = server.copy(state = ServerState.ACTIVE) initialized = true } @@ -295,7 +301,7 @@ class HypervisorVirtDriver( ServerState.SHUTOFF else ServerState.ERROR - setServer(server.copy(state = serverState)) + server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt index 996bd8eb..97842f18 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt @@ -1,11 +1,12 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.VirtDriver class HypervisorView( var server: Server, - val hypervisor: HypervisorImage, var numberOfActiveServers: Int, var availableMemory: Long -) +) { + lateinit var driver: VirtDriver +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index a50292a7..6fb821d7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -8,11 +8,12 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.Job import kotlinx.coroutines.launch class SimpleVirtProvisioningService( @@ -46,15 +47,7 @@ class SimpleVirtProvisioningService( val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage(hypervisorMonitor) - val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) - val server = deployedNode.server!! - val hvView = HypervisorView( - server, - hypervisorImage, - 0, - server.flavor.memorySize - ) - hypervisors[server] = hvView + provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) } } } @@ -65,21 +58,29 @@ class SimpleVirtProvisioningService( requestCycle() } + private var call: Job? = null + private fun requestCycle() { - ctx.domain.launch { + if (call != null) { + return + } + + val call = ctx.domain.launch { schedule() } + call.invokeOnCompletion { this.call = null } + this.call = call } private suspend fun schedule() { val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { - val selectedNode = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break + val selectedHv = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break try { println("Spawning ${imageInstance.image}") incomingImages -= imageInstance - imageInstance.server = selectedNode.server.serviceRegistry[VirtDriver.Key].spawn( + imageInstance.server = selectedHv.driver.spawn( imageInstance.image, imageInstance.monitor, imageInstance.flavor @@ -91,21 +92,15 @@ class SimpleVirtProvisioningService( } } - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("${server.uid} ${server.state} ${hypervisors[server]}") + override fun stateChanged(server: Server, previousState: ServerState) { when (server.state) { ServerState.ACTIVE -> { - val hv = hypervisors[server] ?: return - availableHypervisors += hv - - server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { - override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { - hv.numberOfActiveServers = numberOfActiveServers - hv.availableMemory = availableMemory - } - }) - - requestCycle() + val hvView = HypervisorView( + server, + 0, + server.flavor.memorySize + ) + hypervisors[server] = hvView } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return @@ -116,6 +111,15 @@ class SimpleVirtProvisioningService( } } + override fun servicePublished(server: Server, key: ServiceKey<*>) { + if (key == VirtDriver.Key) { + val hv = hypervisors[server] ?: return + hv.driver = server.services[VirtDriver] + availableHypervisors += hv + requestCycle() + } + } + data class ImageView( val image: Image, val monitor: ServerMonitor, diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 166e93b8..c5c0441c 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -59,12 +59,12 @@ internal class SimpleBareMetalDriverTest { val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList()) val monitor = object : NodeMonitor { - override suspend fun onUpdate(node: Node, previousState: NodeState) { + override fun stateChanged(node: Node, previousState: NodeState) { println(node) } - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${simulationContext.clock.millis()}] $server") + override fun stateChanged(server: Server, previousState: ServerState) { + println("$server") finalState = server.state } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index ef19427e..9cbb9baa 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -54,7 +54,7 @@ internal class SimpleProvisioningServiceTest { root.launch { val image = FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), 1000, 2) val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { + override fun stateChanged(server: Server, previousState: ServerState) { println(server) } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index 57a7150e..9ceaf704 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor @@ -71,8 +70,8 @@ internal class HypervisorTest { val workloadA = FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), 2_000, 1) val monitor = object : NodeMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${simulationContext.clock.millis()}]: $server") + override fun stateChanged(server: Server, previousState: ServerState) { + println("$server") } } @@ -89,7 +88,7 @@ internal class HypervisorTest { delay(5) val flavor = Flavor(1, 0) - val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver] + val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] vmDriver.spawn(workloadA, monitor, flavor) vmDriver.spawn(workloadB, monitor, flavor) } diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt index a036a705..75aa778f 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt @@ -25,9 +25,14 @@ package com.atlarge.opendc.core.services /** - * A service registry for a datacenter zone. + * An immutable service registry interface. */ public interface ServiceRegistry { + /** + * The keys in this registry. + */ + public val keys: Collection> + /** * Determine if this map contains the service with the specified [ServiceKey]. * @@ -41,12 +46,18 @@ public interface ServiceRegistry { * * @param key The key of the service to obtain. * @return The references to the service. - * @throws IllegalArgumentException if the key does not exists in the map. + * @throws IllegalArgumentException if the key does not exist in the map. */ public operator fun get(key: ServiceKey): T /** - * Register the specified [ServiceKey] in this registry. + * Return the result of associating the specified [service] with the given [key] in this registry. */ - public operator fun set(key: ServiceKey, service: T): ServiceRegistry + public fun put(key: ServiceKey, service: T): ServiceRegistry } + +/** + * Construct an empty [ServiceRegistry]. + */ +@Suppress("FunctionName") +public fun ServiceRegistry(): ServiceRegistry = ServiceRegistryImpl(emptyMap()) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt index e3fa171d..0686ebaf 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt @@ -27,22 +27,18 @@ package com.atlarge.opendc.core.services /** * Default implementation of the [ServiceRegistry] interface. */ -public class ServiceRegistryImpl : ServiceRegistry { - /** - * The map containing the registered services. - */ - private val services: MutableMap, Any> = mutableMapOf() +internal class ServiceRegistryImpl(private val map: Map, Any>) : ServiceRegistry { + override val keys: Collection> + get() = map.keys - override fun set(key: ServiceKey, service: T) { - services[key] = service - } - - override fun contains(key: ServiceKey<*>): Boolean = key in services + override fun contains(key: ServiceKey<*>): Boolean = key in map override fun get(key: ServiceKey): T { @Suppress("UNCHECKED_CAST") - return services[key] as T + return map[key] as T } - override fun toString(): String = services.toString() + override fun put(key: ServiceKey, service: T): ServiceRegistry = ServiceRegistryImpl(map.plus(key to service)) + + override fun toString(): String = map.toString() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 40cb9719..0f4d0c1b 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -21,8 +20,8 @@ class Sc20Monitor( outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("${simulationContext.clock.instant()} ${server.uid} ${server.state}") + override fun stateChanged(server: Server, previousState: ServerState) { + println("${server.uid} ${server.state}") if (server.state == ServerState.ERROR) { failed++ } @@ -36,7 +35,7 @@ class Sc20Monitor( hostServer: Server ) { // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.serviceRegistry[BareMetalDriver.Key] + val driver = hostServer.services[BareMetalDriver.Key] val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 09b6592e..efc85653 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -39,7 +39,6 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default -import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 0d4bd125..ab9f272f 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -34,7 +34,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService import com.atlarge.opendc.core.Environment import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.services.ServiceRegistryImpl +import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper @@ -89,9 +89,7 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb provisioningService.create(node) } - val serviceRegistry = ServiceRegistryImpl() - serviceRegistry[ProvisioningService.Key] = provisioningService - + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( UUID.randomUUID(), "sc18-platform", listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index ae0ba550..c6a393e1 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -35,7 +35,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService import com.atlarge.opendc.core.Environment import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.services.ServiceRegistryImpl +import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader import java.io.BufferedReader import java.io.File @@ -119,8 +119,7 @@ class Sc20ClusterEnvironmentReader( provisioningService.create(node) } - val serviceRegistry = ServiceRegistryImpl() - serviceRegistry[ProvisioningService.Key] = provisioningService + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( UUID.randomUUID(), "sc20-platform", listOf( diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index a954a308..07309341 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -35,7 +35,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService import com.atlarge.opendc.core.Environment import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.services.ServiceRegistryImpl +import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper @@ -103,8 +103,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb provisioningService.create(node) } - val serviceRegistry = ServiceRegistryImpl() - serviceRegistry[ProvisioningService.Key] = provisioningService + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( UUID.randomUUID(), "sc20-platform", listOf( diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 008cd1ee..a055a3fe 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -294,42 +294,44 @@ class StageWorkflowService( } } - override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) { - when (server.state) { - ServerState.ACTIVE -> { - val task = taskByServer.getValue(server) - task.startedAt = simulationContext.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) - rootListener.taskStarted(task) - } - ServerState.SHUTOFF, ServerState.ERROR -> { - val task = taskByServer.remove(server) ?: throw IllegalStateException() - val job = task.job - task.state = TaskStatus.FINISHED - task.finishedAt = simulationContext.clock.millis() - job.tasks.remove(task) - available += task.host!! - activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) - rootListener.taskFinished(task) - - // Add job roots to the scheduling queue - for (dependent in task.dependents) { - if (dependent.state != TaskStatus.READY) { - continue + override fun stateChanged(server: Server, previousState: ServerState) { + domain.launch { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.startedAt = simulationContext.clock.millis() + task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) + rootListener.taskStarted(task) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskStatus.FINISHED + task.finishedAt = simulationContext.clock.millis() + job.tasks.remove(task) + available += task.host!! + activeTasks -= task + job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue + } + + incomingTasks += dependent + rootListener.taskReady(dependent) } - incomingTasks += dependent - rootListener.taskReady(dependent) - } + if (job.isFinished) { + finishJob(job) + } - if (job.isFinished) { - finishJob(job) + requestCycle() } - - requestCycle() + else -> throw IllegalStateException() } - else -> throw IllegalStateException() } } -- cgit v1.2.3