From 10f71541cd2c72e12f1b2325ee4f25e38a10e0ef Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 5 Mar 2021 16:26:06 +0100 Subject: compute: Convert Server to stateful interface This change converts the Server data class which can be used as a stateful object to control an instance running in the cloud. --- .../main/kotlin/org/opendc/compute/core/Server.kt | 47 ++++++------ .../org/opendc/compute/core/ServerWatcher.kt | 39 ++++++++++ .../org/opendc/compute/simulator/ClientServer.kt | 86 ++++++++++++++++++++++ .../simulator/SimVirtProvisioningService.kt | 45 +++++++---- .../org/opendc/compute/simulator/SimHostTest.kt | 22 +++++- .../experiments/capelin/ExperimentHelpers.kt | 15 ++-- .../capelin/monitor/ExperimentMonitor.kt | 3 +- .../capelin/monitor/ParquetExperimentMonitor.kt | 3 +- .../org/opendc/runner/web/WebExperimentMonitor.kt | 3 +- .../workflows/service/StageWorkflowService.kt | 19 ++--- .../workflows/service/WorkflowSchedulerMode.kt | 12 ++- 11 files changed, 224 insertions(+), 70 deletions(-) create mode 100644 simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerWatcher.kt create mode 100644 simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt (limited to 'simulator') diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt index 1fb5679a..ff212613 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt @@ -22,51 +22,54 @@ package org.opendc.compute.core -import kotlinx.coroutines.flow.Flow import org.opendc.compute.core.image.Image import org.opendc.core.resource.Resource -import org.opendc.core.resource.TagContainer -import java.util.UUID /** - * A server instance that is running on some physical or virtual machine. + * A stateful object representing a server instance that is running on some physical or virtual machine. */ -public data class Server( +public interface Server : Resource { /** - * The unique identifier of the server. + * The name of the server. */ - public override val uid: UUID, + public override val name: String /** - * The optional name of the server. + * The flavor of the server. */ - public override val name: String, + public val flavor: Flavor /** - * The tags of this server. + * The image of the server. */ - public override val tags: TagContainer, + public val image: Image /** - * The hardware configuration of the server. + * The tags assigned to the server. */ - public val flavor: Flavor, + public override val tags: Map /** - * The image running on the server. + * The last known state of the server. */ - public val image: Image, + public val state: ServerState /** - * The last known state of the server. + * Register the specified [ServerWatcher] to watch the state of the server. + * + * @param watcher The watcher to register for the server. + */ + public fun watch(watcher: ServerWatcher) + + /** + * De-register the specified [ServerWatcher] from the server to stop it from receiving events. + * + * @param watcher The watcher to de-register from the server. */ - public val state: ServerState, + public fun unwatch(watcher: ServerWatcher) /** - * The events that are emitted by the server. + * Refresh the local state of the resource. */ - public val events: Flow -) : Resource { - override fun hashCode(): Int = uid.hashCode() - override fun equals(other: Any?): Boolean = other is Server && uid == other.uid + public suspend fun refresh() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerWatcher.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerWatcher.kt new file mode 100644 index 00000000..a93a8382 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerWatcher.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core + +/** + * An interface used to watch the state of [Server] instances. + */ +public interface ServerWatcher { + /** + * This method is invoked when the state of a [Server] changes. + * + * Note that the state of [server] might not reflect the state as reported by the invocation, as a call to + * [Server.refresh] is required to update its state. + * + * @param server The server whose state has changed. + * @param newState The new state of the server. + */ + public fun onStateChanged(server: Server, newState: ServerState) {} +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt new file mode 100644 index 00000000..6d4fb4ae --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +import org.opendc.compute.core.Flavor +import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState +import org.opendc.compute.core.ServerWatcher +import org.opendc.compute.core.image.Image +import java.util.* + +/** + * A [Server] implementation that is passed to clients but delegates its implementation to another class. + */ +internal class ClientServer(private val delegate: Server) : Server, ServerWatcher { + private val watchers = mutableListOf() + + override val uid: UUID = delegate.uid + + override var name: String = delegate.name + private set + + override var flavor: Flavor = delegate.flavor + private set + + override var image: Image = delegate.image + private set + + override var tags: Map = delegate.tags.toMap() + private set + + override var state: ServerState = delegate.state + private set + + override fun watch(watcher: ServerWatcher) { + if (watchers.isEmpty()) { + delegate.watch(this) + } + + watchers += watcher + } + + override fun unwatch(watcher: ServerWatcher) { + watchers += watcher + + if (watchers.isEmpty()) { + delegate.unwatch(this) + } + } + + override suspend fun refresh() { + name = delegate.name + flavor = delegate.flavor + image = delegate.image + tags = delegate.tags + state = delegate.state + } + + override fun onStateChanged(server: Server, newState: ServerState) { + val watchers = watchers + + for (watcher in watchers) { + watcher.onStateChanged(this, newState) + } + } +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt index ee747a9a..5676a5e9 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt @@ -27,10 +27,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState +import org.opendc.compute.core.* import org.opendc.compute.core.image.Image import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.NodeEvent @@ -181,14 +178,11 @@ public class SimVirtProvisioningService( image: Image, flavor: Flavor ): Server { - return Server( + return ServerImpl( uid = UUID(random.nextLong(), random.nextLong()), name = name, - tags = emptyMap(), flavor = flavor, - image = image, - state = ServerState.BUILD, - events = EventFlow() + image = image ) } @@ -255,7 +249,7 @@ public class SimVirtProvisioningService( coroutineScope.launch { try { - cont.resume(server) + cont.resume(ClientServer(server)) selectedHv.driver.spawn(server) activeServers += server @@ -367,9 +361,9 @@ public class SimVirtProvisioningService( } override fun onStateChange(host: Host, server: Server, newState: ServerState) { - val eventFlow = server.events as EventFlow - val newServer = server.copy(state = newState) - eventFlow.emit(ServerEvent.StateChanged(newServer, server.state)) + val serverImpl = server as ServerImpl + serverImpl.state = newState + serverImpl.watchers.forEach { it.onStateChanged(server, newState) } if (newState == ServerState.SHUTOFF) { logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } @@ -405,4 +399,29 @@ public class SimVirtProvisioningService( } public data class LaunchRequest(val server: Server, val cont: Continuation) + + private class ServerImpl( + override val uid: UUID, + override val name: String, + override val flavor: Flavor, + override val image: Image + ) : Server { + val watchers = mutableListOf() + + override fun watch(watcher: ServerWatcher) { + watchers += watcher + } + + override fun unwatch(watcher: ServerWatcher) { + watchers -= watcher + } + + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override val tags: Map = emptyMap() + + override var state: ServerState = ServerState.BUILD + } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 83e891cb..22d3a7d2 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,7 +24,6 @@ package org.opendc.compute.simulator import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -36,6 +35,7 @@ import org.junit.jupiter.api.assertAll import org.opendc.compute.core.Flavor import org.opendc.compute.core.Server import org.opendc.compute.core.ServerState +import org.opendc.compute.core.ServerWatcher import org.opendc.compute.core.image.Image import org.opendc.compute.core.virt.HostEvent import org.opendc.simulator.compute.SimFairShareHypervisorProvider @@ -134,8 +134,8 @@ internal class SimHostTest { } .launchIn(this) - launch { virtDriver.spawn(Server(UUID.randomUUID(), "a", emptyMap(), flavor, vmImageA, ServerState.BUILD, emptyFlow())) } - launch { virtDriver.spawn(Server(UUID.randomUUID(), "b", emptyMap(), flavor, vmImageB, ServerState.BUILD, emptyFlow())) } + launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } + launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } } scope.advanceUntilIdle() @@ -148,4 +148,20 @@ internal class SimHostTest { { assertEquals(1200006, scope.currentTime) } ) } + + private class MockServer( + override val uid: UUID, + override val name: String, + override val flavor: Flavor, + override val image: Image + ) : Server { + override val tags: Map = emptyMap() + override val state: ServerState = ServerState.BUILD + + override fun watch(watcher: ServerWatcher) {} + + override fun unwatch(watcher: ServerWatcher) {} + + override suspend fun refresh() {} + } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 728d6c11..d8f68b7b 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -32,8 +32,7 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.ServerEvent +import org.opendc.compute.core.* import org.opendc.compute.core.metal.NODE_CLUSTER import org.opendc.compute.core.metal.NodeEvent import org.opendc.compute.core.metal.service.ProvisioningService @@ -250,14 +249,12 @@ public suspend fun processTrace( workload.image.tags["required-memory"] as Long ) ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(clock.millis(), it.server) - } + + server.watch(object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor.reportVmStateChange(clock.millis(), server, newState) } - .collect() + }) } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 8432025b..628a54a9 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -23,6 +23,7 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState import org.opendc.compute.core.metal.Node import org.opendc.compute.core.virt.Host import org.opendc.compute.core.virt.service.VirtProvisioningEvent @@ -35,7 +36,7 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked when the state of a VM changes. */ - public fun reportVmStateChange(time: Long, server: Server) {} + public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} /** * This method is invoked when the state of a host changes. diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index 2af43701..e2aab450 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -24,6 +24,7 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState import org.opendc.compute.core.metal.Node import org.opendc.compute.core.virt.Host import org.opendc.compute.core.virt.service.VirtProvisioningEvent @@ -53,7 +54,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: private val currentHostEvent = mutableMapOf() private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) { + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { if (startTime < 0) { startTime = time diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index 2a41be65..6a5b5e32 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -24,6 +24,7 @@ package org.opendc.runner.web import mu.KotlinLogging import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.NodeState import org.opendc.compute.core.virt.Host @@ -40,7 +41,7 @@ public class WebExperimentMonitor : ExperimentMonitor { private val currentHostEvent = mutableMapOf() private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) { + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { if (startTime < 0) { startTime = time diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index e04c8a4c..7761a793 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -25,15 +25,10 @@ package org.opendc.workflows.service import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState +import org.opendc.compute.core.* import org.opendc.compute.core.virt.service.VirtProvisioningService import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow @@ -61,7 +56,7 @@ public class StageWorkflowService( jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, taskOrderPolicy: TaskOrderPolicy -) : WorkflowService { +) : WorkflowService, ServerWatcher { /** * The logger instance to use. */ @@ -205,7 +200,7 @@ public class StageWorkflowService( /** * Indicate to the scheduler that a scheduling cycle is needed. */ - private suspend fun requestCycle() = mode.requestCycle() + private fun requestCycle() = mode.requestCycle() /** * Perform a scheduling cycle immediately. @@ -279,9 +274,7 @@ public class StageWorkflowService( instance.server = server taskByServer[server] = instance - server.events - .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } - .launchIn(coroutineScope) + server.watch(this@StageWorkflowService) } activeTasks += instance @@ -290,8 +283,8 @@ public class StageWorkflowService( } } - private suspend fun stateChanged(server: Server) { - when (server.state) { + public override fun onStateChanged(server: Server, newState: ServerState) { + when (newState) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt index d03adc61..cf8f92e0 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,6 @@ package org.opendc.workflows.service import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.yield import org.opendc.workflows.service.stage.StagePolicy /** @@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy