summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-05 16:26:06 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-07 16:13:16 +0100
commit10f71541cd2c72e12f1b2325ee4f25e38a10e0ef (patch)
tree5cd19515be73755911cbfdff0d477532e0dee02d /simulator
parent249a272702bb79a901848ed4957d0992e82b3f92 (diff)
compute: Convert Server to stateful interface
This change converts the Server data class which can be used as a stateful object to control an instance running in the cloud.
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt47
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerWatcher.kt39
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt86
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt45
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt22
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt15
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt3
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt3
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt19
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt12
11 files changed, 224 insertions, 70 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt
index 1fb5679a..ff212613 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt
@@ -22,51 +22,54 @@
package org.opendc.compute.core
-import kotlinx.coroutines.flow.Flow
import org.opendc.compute.core.image.Image
import org.opendc.core.resource.Resource
-import org.opendc.core.resource.TagContainer
-import java.util.UUID
/**
- * A server instance that is running on some physical or virtual machine.
+ * A stateful object representing a server instance that is running on some physical or virtual machine.
*/
-public data class Server(
+public interface Server : Resource {
/**
- * The unique identifier of the server.
+ * The name of the server.
*/
- public override val uid: UUID,
+ public override val name: String
/**
- * The optional name of the server.
+ * The flavor of the server.
*/
- public override val name: String,
+ public val flavor: Flavor
/**
- * The tags of this server.
+ * The image of the server.
*/
- public override val tags: TagContainer,
+ public val image: Image
/**
- * The hardware configuration of the server.
+ * The tags assigned to the server.
*/
- public val flavor: Flavor,
+ public override val tags: Map<String, String>
/**
- * The image running on the server.
+ * The last known state of the server.
*/
- public val image: Image,
+ public val state: ServerState
/**
- * The last known state of the server.
+ * Register the specified [ServerWatcher] to watch the state of the server.
+ *
+ * @param watcher The watcher to register for the server.
+ */
+ public fun watch(watcher: ServerWatcher)
+
+ /**
+ * De-register the specified [ServerWatcher] from the server to stop it from receiving events.
+ *
+ * @param watcher The watcher to de-register from the server.
*/
- public val state: ServerState,
+ public fun unwatch(watcher: ServerWatcher)
/**
- * The events that are emitted by the server.
+ * Refresh the local state of the resource.
*/
- public val events: Flow<ServerEvent>
-) : Resource {
- override fun hashCode(): Int = uid.hashCode()
- override fun equals(other: Any?): Boolean = other is Server && uid == other.uid
+ public suspend fun refresh()
}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerWatcher.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerWatcher.kt
new file mode 100644
index 00000000..a93a8382
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerWatcher.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.core
+
+/**
+ * An interface used to watch the state of [Server] instances.
+ */
+public interface ServerWatcher {
+ /**
+ * This method is invoked when the state of a [Server] changes.
+ *
+ * Note that the state of [server] might not reflect the state as reported by the invocation, as a call to
+ * [Server.refresh] is required to update its state.
+ *
+ * @param server The server whose state has changed.
+ * @param newState The new state of the server.
+ */
+ public fun onStateChanged(server: Server, newState: ServerState) {}
+}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt
new file mode 100644
index 00000000..6d4fb4ae
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator
+
+import org.opendc.compute.core.Flavor
+import org.opendc.compute.core.Server
+import org.opendc.compute.core.ServerState
+import org.opendc.compute.core.ServerWatcher
+import org.opendc.compute.core.image.Image
+import java.util.*
+
+/**
+ * A [Server] implementation that is passed to clients but delegates its implementation to another class.
+ */
+internal class ClientServer(private val delegate: Server) : Server, ServerWatcher {
+ private val watchers = mutableListOf<ServerWatcher>()
+
+ override val uid: UUID = delegate.uid
+
+ override var name: String = delegate.name
+ private set
+
+ override var flavor: Flavor = delegate.flavor
+ private set
+
+ override var image: Image = delegate.image
+ private set
+
+ override var tags: Map<String, String> = delegate.tags.toMap()
+ private set
+
+ override var state: ServerState = delegate.state
+ private set
+
+ override fun watch(watcher: ServerWatcher) {
+ if (watchers.isEmpty()) {
+ delegate.watch(this)
+ }
+
+ watchers += watcher
+ }
+
+ override fun unwatch(watcher: ServerWatcher) {
+ watchers += watcher
+
+ if (watchers.isEmpty()) {
+ delegate.unwatch(this)
+ }
+ }
+
+ override suspend fun refresh() {
+ name = delegate.name
+ flavor = delegate.flavor
+ image = delegate.image
+ tags = delegate.tags
+ state = delegate.state
+ }
+
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ val watchers = watchers
+
+ for (watcher in watchers) {
+ watcher.onStateChanged(this, newState)
+ }
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
index ee747a9a..5676a5e9 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
@@ -27,10 +27,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
+import org.opendc.compute.core.*
import org.opendc.compute.core.image.Image
import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.metal.NodeEvent
@@ -181,14 +178,11 @@ public class SimVirtProvisioningService(
image: Image,
flavor: Flavor
): Server {
- return Server(
+ return ServerImpl(
uid = UUID(random.nextLong(), random.nextLong()),
name = name,
- tags = emptyMap(),
flavor = flavor,
- image = image,
- state = ServerState.BUILD,
- events = EventFlow()
+ image = image
)
}
@@ -255,7 +249,7 @@ public class SimVirtProvisioningService(
coroutineScope.launch {
try {
- cont.resume(server)
+ cont.resume(ClientServer(server))
selectedHv.driver.spawn(server)
activeServers += server
@@ -367,9 +361,9 @@ public class SimVirtProvisioningService(
}
override fun onStateChange(host: Host, server: Server, newState: ServerState) {
- val eventFlow = server.events as EventFlow<ServerEvent>
- val newServer = server.copy(state = newState)
- eventFlow.emit(ServerEvent.StateChanged(newServer, server.state))
+ val serverImpl = server as ServerImpl
+ serverImpl.state = newState
+ serverImpl.watchers.forEach { it.onStateChanged(server, newState) }
if (newState == ServerState.SHUTOFF) {
logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
@@ -405,4 +399,29 @@ public class SimVirtProvisioningService(
}
public data class LaunchRequest(val server: Server, val cont: Continuation<Server>)
+
+ private class ServerImpl(
+ override val uid: UUID,
+ override val name: String,
+ override val flavor: Flavor,
+ override val image: Image
+ ) : Server {
+ val watchers = mutableListOf<ServerWatcher>()
+
+ override fun watch(watcher: ServerWatcher) {
+ watchers += watcher
+ }
+
+ override fun unwatch(watcher: ServerWatcher) {
+ watchers -= watcher
+ }
+
+ override suspend fun refresh() {
+ // No-op: this object is the source-of-truth
+ }
+
+ override val tags: Map<String, String> = emptyMap()
+
+ override var state: ServerState = ServerState.BUILD
+ }
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 83e891cb..22d3a7d2 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -24,7 +24,6 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -36,6 +35,7 @@ import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.Flavor
import org.opendc.compute.core.Server
import org.opendc.compute.core.ServerState
+import org.opendc.compute.core.ServerWatcher
import org.opendc.compute.core.image.Image
import org.opendc.compute.core.virt.HostEvent
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
@@ -134,8 +134,8 @@ internal class SimHostTest {
}
.launchIn(this)
- launch { virtDriver.spawn(Server(UUID.randomUUID(), "a", emptyMap(), flavor, vmImageA, ServerState.BUILD, emptyFlow())) }
- launch { virtDriver.spawn(Server(UUID.randomUUID(), "b", emptyMap(), flavor, vmImageB, ServerState.BUILD, emptyFlow())) }
+ launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
+ launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
}
scope.advanceUntilIdle()
@@ -148,4 +148,20 @@ internal class SimHostTest {
{ assertEquals(1200006, scope.currentTime) }
)
}
+
+ private class MockServer(
+ override val uid: UUID,
+ override val name: String,
+ override val flavor: Flavor,
+ override val image: Image
+ ) : Server {
+ override val tags: Map<String, String> = emptyMap()
+ override val state: ServerState = ServerState.BUILD
+
+ override fun watch(watcher: ServerWatcher) {}
+
+ override fun unwatch(watcher: ServerWatcher) {}
+
+ override suspend fun refresh() {}
+ }
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 728d6c11..d8f68b7b 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -32,8 +32,7 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.ServerEvent
+import org.opendc.compute.core.*
import org.opendc.compute.core.metal.NODE_CLUSTER
import org.opendc.compute.core.metal.NodeEvent
import org.opendc.compute.core.metal.service.ProvisioningService
@@ -250,14 +249,12 @@ public suspend fun processTrace(
workload.image.tags["required-memory"] as Long
)
)
- // Monitor server events
- server.events
- .onEach {
- if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(clock.millis(), it.server)
- }
+
+ server.watch(object : ServerWatcher {
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ monitor.reportVmStateChange(clock.millis(), server, newState)
}
- .collect()
+ })
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 8432025b..628a54a9 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -23,6 +23,7 @@
package org.opendc.experiments.capelin.monitor
import org.opendc.compute.core.Server
+import org.opendc.compute.core.ServerState
import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.virt.Host
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
@@ -35,7 +36,7 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked when the state of a VM changes.
*/
- public fun reportVmStateChange(time: Long, server: Server) {}
+ public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
/**
* This method is invoked when the state of a host changes.
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index 2af43701..e2aab450 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -24,6 +24,7 @@ package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
import org.opendc.compute.core.Server
+import org.opendc.compute.core.ServerState
import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.virt.Host
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
@@ -53,7 +54,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
private val currentHostEvent = mutableMapOf<Node, HostEvent>()
private var startTime = -1L
- override fun reportVmStateChange(time: Long, server: Server) {
+ override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {
if (startTime < 0) {
startTime = time
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
index 2a41be65..6a5b5e32 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -24,6 +24,7 @@ package org.opendc.runner.web
import mu.KotlinLogging
import org.opendc.compute.core.Server
+import org.opendc.compute.core.ServerState
import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.metal.NodeState
import org.opendc.compute.core.virt.Host
@@ -40,7 +41,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
private val currentHostEvent = mutableMapOf<Node, HostEvent>()
private var startTime = -1L
- override fun reportVmStateChange(time: Long, server: Server) {
+ override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {
if (startTime < 0) {
startTime = time
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index e04c8a4c..7761a793 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -25,15 +25,10 @@ package org.opendc.workflows.service
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
+import org.opendc.compute.core.*
import org.opendc.compute.core.virt.service.VirtProvisioningService
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.consumeAsFlow
@@ -61,7 +56,7 @@ public class StageWorkflowService(
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
taskOrderPolicy: TaskOrderPolicy
-) : WorkflowService {
+) : WorkflowService, ServerWatcher {
/**
* The logger instance to use.
*/
@@ -205,7 +200,7 @@ public class StageWorkflowService(
/**
* Indicate to the scheduler that a scheduling cycle is needed.
*/
- private suspend fun requestCycle() = mode.requestCycle()
+ private fun requestCycle() = mode.requestCycle()
/**
* Perform a scheduling cycle immediately.
@@ -279,9 +274,7 @@ public class StageWorkflowService(
instance.server = server
taskByServer[server] = instance
- server.events
- .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
- .launchIn(coroutineScope)
+ server.watch(this@StageWorkflowService)
}
activeTasks += instance
@@ -290,8 +283,8 @@ public class StageWorkflowService(
}
}
- private suspend fun stateChanged(server: Server) {
- when (server.state) {
+ public override fun onStateChanged(server: Server, newState: ServerState) {
+ when (newState) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
index d03adc61..cf8f92e0 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,6 @@ package org.opendc.workflows.service
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
import org.opendc.workflows.service.stage.StagePolicy
/**
@@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
/**
* Request a new scheduling cycle to be performed.
*/
- public suspend fun requestCycle()
+ public fun requestCycle()
}
/**
@@ -46,9 +45,8 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
*/
public object Interactive : WorkflowSchedulerMode() {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
- yield()
- scheduler.schedule()
+ override fun requestCycle() {
+ scheduler.coroutineScope.launch { scheduler.schedule() }
}
}
@@ -62,7 +60,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
@@ -88,7 +86,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
val delay = random.nextInt(200).toLong()