summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-21 22:04:31 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-25 10:51:27 +0100
commit76bfeb44c5a02be143c152c52bc1029cff360744 (patch)
treebe467a0be698df2ebb4dd9fd3c5410d1e53ffa46 /opendc
parentbc64182612ad06f15bff5b48637ed7d241e293b2 (diff)
refactor: Migrate to Flow for event listeners
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt5
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt72
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt)31
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt)14
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt)5
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt)2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt)26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt3
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt36
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt3
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt7
-rw-r--r--opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt51
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt8
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt13
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt3
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt96
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt76
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt9
-rw-r--r--opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt40
21 files changed, 320 insertions, 186 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index c8caaca6..e0a491c8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -30,7 +30,7 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.services.ServiceKey
/**
- * Represents the execution context in which an bootable [Image] runs on a [Server].
+ * Represents the execution context in which a bootable [Image] runs on a [Server].
*/
public interface ServerContext {
/**
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index 8b8d1596..7cb4c0c5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -33,7 +33,7 @@ import java.util.UUID
/**
* A bare-metal compute node.
*/
-data class Node(
+public data class Node(
/**
* The unique identifier of the node.
*/
@@ -45,7 +45,7 @@ data class Node(
public override val name: String,
/**
- * Meta data of the node.
+ * Metadata of the node.
*/
public val metadata: Map<String, Any>,
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 5d1db378..41cec291 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -38,6 +38,11 @@ import java.util.UUID
*/
public interface BareMetalDriver : Powerable, FailureDomain {
/**
+ * The [Node] that is controlled by this driver.
+ */
+ public val node: Flow<Node>
+
+ /**
* The amount of work done by the machine in percentage with respect to the total amount of processing power
* available.
*/
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 49c3fa2e..67069c03 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -48,10 +48,13 @@ import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.scanReduce
import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
@@ -96,33 +99,40 @@ public class SimpleBareMetalDriver(
/**
* The flow containing the load of the server.
*/
- private val usageSignal = StateFlow(0.0)
+ private val usageState = StateFlow(0.0)
/**
* The machine state.
*/
- private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)
- set(value) {
+ private val nodeState = StateFlow(Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events))
+
+ override val node: Flow<Node> = nodeState
+
+ override val usage: Flow<Double> = usageState
+
+ override val powerDraw: Flow<Double> = powerModel(this)
+
+ init {
+ @OptIn(ExperimentalCoroutinesApi::class)
+ nodeState.scanReduce { field, value ->
if (field.state != value.state) {
events.emit(NodeEvent.StateChanged(value, field.state))
}
- if (field.server != null && value.server != null && field.server!!.state != value.server.state) {
- serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server!!.state))
+ if (field.server != null && value.server != null && field.server.state != value.server.state) {
+ serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state))
}
- field = value
- }
-
- override val usage: Flow<Double> = usageSignal
-
- override val powerDraw: Flow<Double> = powerModel(this)
+ value
+ }.launchIn(domain)
+ }
override suspend fun init(): Node = withContext(domain.coroutineContext) {
- node
+ nodeState.value
}
override suspend fun start(): Node = withContext(domain.coroutineContext) {
+ val node = nodeState.value
if (node.state != NodeState.SHUTOFF) {
return@withContext node
}
@@ -139,12 +149,13 @@ public class SimpleBareMetalDriver(
events
)
- node = node.copy(state = NodeState.BOOT, server = server)
+ nodeState.value = node.copy(state = NodeState.BOOT, server = server)
serverContext = BareMetalServerContext(events)
- return@withContext node
+ return@withContext nodeState.value
}
override suspend fun stop(): Node = withContext(domain.coroutineContext) {
+ val node = nodeState.value
if (node.state == NodeState.SHUTOFF) {
return@withContext node
}
@@ -153,7 +164,7 @@ public class SimpleBareMetalDriver(
serverContext!!.cancel(fail = false)
serverContext = null
- node = node.copy(state = NodeState.SHUTOFF, server = null)
+ nodeState.value = node.copy(state = NodeState.SHUTOFF, server = null)
return@withContext node
}
@@ -163,11 +174,11 @@ public class SimpleBareMetalDriver(
}
override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
- node = node.copy(image = image)
- return@withContext node
+ nodeState.value = nodeState.value.copy(image = image)
+ return@withContext nodeState.value
}
- override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node }
+ override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
private var finalized: Boolean = false
@@ -175,7 +186,7 @@ public class SimpleBareMetalDriver(
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
override val server: Server
- get() = node.server!!
+ get() = nodeState.value.server!!
private val job = domain.launch {
delay(1) // TODO Introduce boot time
@@ -193,15 +204,15 @@ public class SimpleBareMetalDriver(
*/
suspend fun cancel(fail: Boolean) {
if (fail)
- domain.cancel(ShutdownException(cause = Exception("Random failure")))
+ job.cancel(ShutdownException(cause = Exception("Random failure")))
else
- domain.cancel(ShutdownException())
+ job.cancel(ShutdownException())
job.join()
}
override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
val server = server.copy(services = server.services.put(key, service))
- node = node.copy(server = server)
+ nodeState.value = nodeState.value.copy(server = server)
events.emit(ServerEvent.ServicePublished(server, key))
}
@@ -209,24 +220,24 @@ public class SimpleBareMetalDriver(
assert(!finalized) { "Machine is already finalized" }
val server = server.copy(state = ServerState.ACTIVE)
- node = node.copy(state = NodeState.ACTIVE, server = server)
+ nodeState.value = nodeState.value.copy(state = NodeState.ACTIVE, server = server)
}
override suspend fun exit(cause: Throwable?) {
finalized = true
- val serverState =
+ val newServerState =
if (cause == null || (cause is ShutdownException && cause.cause == null))
ServerState.SHUTOFF
else
ServerState.ERROR
- val nodeState =
+ val newNodeState =
if (cause == null || (cause is ShutdownException && cause.cause != null))
- node.state
+ nodeState.value.state
else
NodeState.ERROR
- val server = server.copy(state = serverState)
- node = node.copy(state = nodeState, server = server)
+ val server = server.copy(state = newServerState)
+ nodeState.value = nodeState.value.copy(state = newNodeState, server = server)
}
private var flush: Job? = null
@@ -256,7 +267,7 @@ public class SimpleBareMetalDriver(
}
}
- usageSignal.value = totalUsage / cpus.size
+ usageState.value = totalUsage / cpus.size
try {
delay(duration)
@@ -269,7 +280,7 @@ public class SimpleBareMetalDriver(
// Flush the load if the do not receive a new run call for the same timestamp
flush = domain.launch(job) {
delay(1)
- usageSignal.value = 0.0
+ usageState.value = 0.0
}
flush!!.invokeOnCompletion {
flush = null
@@ -289,5 +300,6 @@ public class SimpleBareMetalDriver(
override suspend fun fail() {
serverContext?.cancel(fail = true)
+ domain.cancel()
}
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
index 3c77d57a..69b0124d 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -22,32 +22,37 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.workflows.monitor
+package com.atlarge.opendc.compute.virt
-import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
+import com.atlarge.opendc.core.Identity
+import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
- * An interface for monitoring the progression of workflows.
+ * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
+ * into several virtual guest machines.
*/
-public interface WorkflowMonitor {
+public class Hypervisor(
/**
- * This method is invoked when a job has become active.
+ * The unique identifier of the hypervisor.
*/
- public suspend fun onJobStart(job: Job, time: Long)
+ override val uid: UUID,
/**
- * This method is invoked when a job has finished processing.
+ * The optional name of the hypervisor.
*/
- public suspend fun onJobFinish(job: Job, time: Long)
+ override val name: String,
/**
- * This method is invoked when a task of a job has started processing.
+ * Metadata of the hypervisor.
*/
- public suspend fun onTaskStart(job: Job, task: Task, time: Long)
+ public val metadata: Map<String, Any>,
/**
- * This method is invoked when a task has finished processing.
+ * The events that are emitted by the hypervisor.
*/
- public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long)
+ public val events: Flow<HypervisorEvent>
+) : Identity {
+ override fun hashCode(): Int = uid.hashCode()
+ override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
index ccbe8b3c..3230c2ba 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
@@ -22,12 +22,14 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver
+package com.atlarge.opendc.compute.virt
+
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
/**
* An event that is emitted by a [VirtDriver].
*/
-public sealed class VirtDriverEvent {
+public sealed class HypervisorEvent {
/**
* The driver that emitted the event.
*/
@@ -40,7 +42,11 @@ public sealed class VirtDriverEvent {
* @property numberOfActiveServers The number of active servers.
* @property availableMemory The available memory, in MB.
*/
- public data class VmsUpdated(override val driver: VirtDriver, public val numberOfActiveServers: Int, public val availableMemory: Long) : VirtDriverEvent()
+ public data class VmsUpdated(
+ override val driver: VirtDriver,
+ public val numberOfActiveServers: Int,
+ public val availableMemory: Long
+ ) : HypervisorEvent()
/**
* This event is emitted when a slice is finished.
@@ -55,5 +61,5 @@ public sealed class VirtDriverEvent {
public val requestedBurst: Long,
public val grantedBurst: Long,
public val numberOfDeployedImages: Int
- ) : VirtDriverEvent()
+ ) : HypervisorEvent()
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index 1eb0e0ff..c21b002d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -22,10 +22,11 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.core.resource.TagContainer
import kotlinx.coroutines.coroutineScope
@@ -42,7 +43,7 @@ object HypervisorImage : Image {
override suspend fun invoke(ctx: ServerContext) {
coroutineScope {
- val driver = HypervisorVirtDriver(ctx, this)
+ val driver = SimpleVirtDriver(ctx, this)
ctx.publishService(VirtDriver.Key, driver)
// Suspend image until it is cancelled
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt
index 926234b5..0586ae00 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt
@@ -1,3 +1,3 @@
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt.driver
public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.")
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 0b4a7109..fc4c7634 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt.driver
import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.flow.EventFlow
@@ -37,8 +37,7 @@ import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriverEvent
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
@@ -62,7 +61,7 @@ import kotlin.math.min
* A [VirtDriver] that is backed by a simple hypervisor implementation.
*/
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
-class HypervisorVirtDriver(
+class SimpleVirtDriver(
private val hostContext: ServerContext,
private val coroutineScope: CoroutineScope
) : VirtDriver {
@@ -85,9 +84,9 @@ class HypervisorVirtDriver(
/**
* The [EventFlow] to emit the events.
*/
- internal val eventFlow = EventFlow<VirtDriverEvent>()
+ internal val eventFlow = EventFlow<HypervisorEvent>()
- override val events: Flow<VirtDriverEvent> = eventFlow
+ override val events: Flow<HypervisorEvent> = eventFlow
override suspend fun spawn(
image: Image,
@@ -106,7 +105,7 @@ class HypervisorVirtDriver(
)
availableMemory -= requiredMemory
vms.add(VmServerContext(server, events, simulationContext.domain))
- eventFlow.emit(VirtDriverEvent.VmsUpdated(this, vms.size, availableMemory))
+ eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
@@ -223,7 +222,7 @@ class HypervisorVirtDriver(
}
}
- eventFlow.emit(VirtDriverEvent.SliceFinished(this@HypervisorVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size))
+ eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size))
}
this.call = call
}
@@ -312,7 +311,7 @@ class HypervisorVirtDriver(
availableMemory += server.flavor.memorySize
vms.remove(this)
events.close()
- eventFlow.emit(VirtDriverEvent.VmsUpdated(this@HypervisorVirtDriver, vms.size, availableMemory))
+ eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
}
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
@@ -322,7 +321,14 @@ class HypervisorVirtDriver(
this.burst = burst
requests = cpus.asSequence()
.take(burst.size)
- .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) }
+ .mapIndexed { i, cpu ->
+ CpuRequest(
+ this,
+ cpu,
+ burst[i],
+ limit[i]
+ )
+ }
.toList()
// Wait until the burst has been run or the coroutine is cancelled
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index 296f170e..d7ae0c12 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -27,6 +27,7 @@ package com.atlarge.opendc.compute.virt.driver
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
import kotlinx.coroutines.flow.Flow
import java.util.UUID
@@ -39,7 +40,7 @@ public interface VirtDriver {
/**
* The events emitted by the driver.
*/
- public val events: Flow<VirtDriverEvent>
+ public val events: Flow<HypervisorEvent>
/**
* Spawn the given [Image] on the compute resource of this driver.
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 8365f8c9..8393dfa9 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -8,22 +8,26 @@ import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
-import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException
+import com.atlarge.opendc.compute.virt.HypervisorImage
+import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.services.ServiceKey
+import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
-import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
public override val allocationPolicy: AllocationPolicy,
private val ctx: SimulationContext,
private val provisioningService: ProvisioningService
-) : VirtProvisioningService {
+) : VirtProvisioningService, CoroutineScope by ctx.domain {
/**
* The hypervisors that have been launched by the service.
*/
@@ -45,18 +49,17 @@ class SimpleVirtProvisioningService(
private val activeImages: MutableSet<ImageView> = mutableSetOf()
init {
- ctx.domain.launch {
+ launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
val hypervisorImage = HypervisorImage
val node = provisioningService.deploy(node, hypervisorImage)
node.server!!.events.onEach { event ->
- when (event) {
- is ServerEvent.StateChanged -> stateChanged(event.server, event.previousState)
- is ServerEvent.ServicePublished -> servicePublished(event.server, event.key)
- }
+ when (event) {
+ is ServerEvent.StateChanged -> stateChanged(event.server)
+ is ServerEvent.ServicePublished -> servicePublished(event.server, event.key)
}
- .launchIn(ctx.domain)
+ }.collect()
}
}
}
@@ -64,8 +67,8 @@ class SimpleVirtProvisioningService(
override suspend fun deploy(
image: Image,
flavor: Flavor
- ) {
- val vmInstance = ImageView(image, flavor)
+ ): Server = suspendCancellableCoroutine { cont ->
+ val vmInstance = ImageView(image, flavor, cont)
incomingImages += vmInstance
requestCycle()
}
@@ -77,7 +80,7 @@ class SimpleVirtProvisioningService(
return
}
- val call = ctx.domain.launch {
+ val call = launch {
schedule()
}
call.invokeOnCompletion { this.call = null }
@@ -92,10 +95,12 @@ class SimpleVirtProvisioningService(
try {
println("Spawning ${imageInstance.image}")
incomingImages -= imageInstance
- imageInstance.server = selectedHv.driver.spawn(
+ val server = selectedHv.driver.spawn(
imageInstance.image,
imageInstance.flavor
)
+ imageInstance.server = server
+ imageInstance.continuation.resume(server)
activeImages += imageInstance
} catch (e: InsufficientMemoryOnServerException) {
println("Unable to deploy image due to insufficient memory")
@@ -103,7 +108,7 @@ class SimpleVirtProvisioningService(
}
}
- private fun stateChanged(server: Server, previousState: ServerState) {
+ private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
val hvView = HypervisorView(
@@ -134,6 +139,7 @@ class SimpleVirtProvisioningService(
data class ImageView(
val image: Image,
val flavor: Flavor,
+ val continuation: Continuation<Server>,
var server: Server? = null
)
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index da72d742..12543ce3 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -1,6 +1,7 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
@@ -16,5 +17,5 @@ interface VirtProvisioningService {
* @param image The image to be deployed.
* @param flavor The flavor of the machine instance to run this [image] on.
*/
- public suspend fun deploy(image: Image, flavor: Flavor)
+ public suspend fun deploy(image: Image, flavor: Flavor): Server
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
index d86045c0..bcaafb59 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
@@ -30,6 +30,7 @@ import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import com.atlarge.opendc.compute.virt.HypervisorImage
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
@@ -75,8 +76,10 @@ internal class HypervisorTest {
val flavor = Flavor(1, 0)
val vmDriver = metalDriver.refresh().server!!.services[VirtDriver]
- vmDriver.spawn(workloadA, flavor).events.onEach { println(it) }.launchIn(this)
- vmDriver.spawn(workloadB, flavor)
+ val vmA = vmDriver.spawn(workloadA, flavor)
+ vmA.events.onEach { println(it) }.launchIn(this)
+ val vmB = vmDriver.spawn(workloadB, flavor)
+ vmB.events.onEach { println(it) }.launchIn(this)
}
runBlocking {
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index d5e1404a..b0182ab3 100644
--- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -29,8 +29,8 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.WorkflowEvent
import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
@@ -38,12 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
@@ -62,28 +62,6 @@ fun main(args: Array<String>) {
var finished = 0
val token = Channel<Boolean>()
-
- val monitor = object : WorkflowMonitor {
- override suspend fun onJobStart(job: Job, time: Long) {
- println("Job ${job.uid} started")
- }
-
- override suspend fun onJobFinish(job: Job, time: Long) {
- finished += 1
- println("Jobs $finished/$total finished (${job.tasks.size} tasks)")
-
- if (finished == total) {
- token.send(true)
- }
- }
-
- override suspend fun onTaskStart(job: Job, task: Task, time: Long) {
- }
-
- override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) {
- }
- }
-
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider(name = "sim")
@@ -106,6 +84,27 @@ fun main(args: Array<String>) {
}
val broker = system.newDomain(name = "broker")
+
+ broker.launch {
+ val scheduler = schedulerAsync.await()
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is WorkflowEvent.JobStarted -> {
+ println("Job ${event.job.uid} started")
+ }
+ is WorkflowEvent.JobFinished -> {
+ finished += 1
+ println("Jobs $finished/$total finished (${event.job.tasks.size} tasks)")
+
+ if (finished == total) {
+ token.send(true)
+ }
+ }
+ }
+ }
+ .collect()
+ }
broker.launch {
val ctx = simulationContext
val reader = GwfTraceReader(File(args[0]))
@@ -115,7 +114,7 @@ fun main(args: Array<String>) {
val (time, job) = reader.next()
total += 1
delay(max(0, time * 1000 - ctx.clock.millis()))
- scheduler.submit(job, monitor)
+ scheduler.submit(job)
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 0f4d0c1b..e18bbe30 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -2,9 +2,7 @@ package com.atlarge.opendc.experiments.sc20
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import kotlinx.coroutines.flow.first
import java.io.BufferedWriter
import java.io.Closeable
@@ -12,7 +10,7 @@ import java.io.FileWriter
class Sc20Monitor(
destination: String
-) : HypervisorMonitor, ServerMonitor, Closeable {
+) : Closeable {
private val outputFile = BufferedWriter(FileWriter(destination))
private var failed: Int = 0
@@ -20,14 +18,14 @@ class Sc20Monitor(
outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
}
- override fun stateChanged(server: Server, previousState: ServerState) {
+ fun stateChanged(server: Server) {
println("${server.uid} ${server.state}")
if (server.state == ServerState.ERROR) {
failed++
}
}
- override suspend fun onSliceFinish(
+ suspend fun onSliceFinish(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 4273c39e..96033ea7 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -27,6 +27,7 @@ package com.atlarge.opendc.experiments.sc20
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
@@ -41,6 +42,8 @@ import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
@@ -107,11 +110,10 @@ fun main(args: Array<String>) {
println(simulationContext.clock.instant())
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key]
-
val scheduler = SimpleVirtProvisioningService(
AvailableMemoryAllocationPolicy(),
simulationContext,
- bareMetalProvisioner,
+ bareMetalProvisioner
)
val faultInjectorDomain = root.newDomain(name = "failures")
@@ -131,8 +133,11 @@ fun main(args: Array<String>) {
while (reader.hasNext()) {
val (time, workload) = reader.next()
delay(max(0, time - simulationContext.clock.millis()))
- chan.send(Unit)
- scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory))
+ launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory))
+ server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect()
+ }
}
println(simulationContext.clock.instant())
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
index b444f91c..1cb2de97 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
@@ -24,10 +24,9 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.workload.Job
-class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) {
+class JobState(val job: Job, val submittedAt: Long) {
/**
* A flag to indicate whether this job is finished.
*/
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index a055a3fe..7a20363c 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -25,13 +25,13 @@
package com.atlarge.opendc.workflows.service
import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.service.ProvisioningService
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy
@@ -39,6 +39,11 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import java.util.PriorityQueue
import java.util.Queue
import kotlinx.coroutines.launch
@@ -58,7 +63,7 @@ class StageWorkflowService(
taskOrderPolicy: TaskOrderPolicy,
resourceFilterPolicy: ResourceFilterPolicy,
resourceSelectionPolicy: ResourceSelectionPolicy
-) : WorkflowService, ServerMonitor {
+) : WorkflowService, CoroutineScope by domain {
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -167,6 +172,7 @@ class StageWorkflowService(
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
private val resourceFilterPolicy: ResourceFilterPolicy.Logic
private val resourceSelectionPolicy: Comparator<Node>
+ private val eventFlow = EventFlow<WorkflowEvent>()
init {
domain.launch {
@@ -183,9 +189,11 @@ class StageWorkflowService(
this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
- override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) {
+ override val events: Flow<WorkflowEvent> = eventFlow
+
+ override suspend fun submit(job: Job) = withContext(domain.coroutineContext) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, monitor, simulationContext.clock.millis())
+ val jobInstance = JobState(job, simulationContext.clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -217,6 +225,7 @@ class StageWorkflowService(
/**
* Perform a scheduling cycle immediately.
*/
+ @OptIn(ExperimentalCoroutinesApi::class)
internal suspend fun schedule() {
// J2 Create list of eligible jobs
val iterator = incomingJobs.iterator()
@@ -232,7 +241,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis())
+ eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis()))
rootListener.jobStarted(jobInstance)
}
@@ -280,10 +289,13 @@ class StageWorkflowService(
// T4 Submit task to machine
available -= host
instance.state = TaskStatus.ACTIVE
-
- val newHost = provisioningService.deploy(host, instance.task.image, this)
+ val newHost = provisioningService.deploy(host, instance.task.image)
+ val server = newHost.server!!
instance.host = newHost
- taskByServer[newHost.server!!] = instance
+ taskByServer[server] = instance
+ server.events
+ .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
+ .launchIn(this)
activeTasks += instance
taskQueue.poll()
@@ -294,50 +306,48 @@ class StageWorkflowService(
}
}
- override fun stateChanged(server: Server, previousState: ServerState) {
- domain.launch {
- when (server.state) {
- ServerState.ACTIVE -> {
- val task = taskByServer.getValue(server)
- task.startedAt = simulationContext.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
- rootListener.taskStarted(task)
- }
- ServerState.SHUTOFF, ServerState.ERROR -> {
- val task = taskByServer.remove(server) ?: throw IllegalStateException()
- val job = task.job
- task.state = TaskStatus.FINISHED
- task.finishedAt = simulationContext.clock.millis()
- job.tasks.remove(task)
- available += task.host!!
- activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
- rootListener.taskFinished(task)
-
- // Add job roots to the scheduling queue
- for (dependent in task.dependents) {
- if (dependent.state != TaskStatus.READY) {
- continue
- }
-
- incomingTasks += dependent
- rootListener.taskReady(dependent)
+ private suspend fun stateChanged(server: Server) {
+ when (server.state) {
+ ServerState.ACTIVE -> {
+ val task = taskByServer.getValue(server)
+ task.startedAt = simulationContext.clock.millis()
+ eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ rootListener.taskStarted(task)
+ }
+ ServerState.SHUTOFF, ServerState.ERROR -> {
+ val task = taskByServer.remove(server) ?: throw IllegalStateException()
+ val job = task.job
+ task.state = TaskStatus.FINISHED
+ task.finishedAt = simulationContext.clock.millis()
+ job.tasks.remove(task)
+ available += task.host!!
+ activeTasks -= task
+ eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ rootListener.taskFinished(task)
+
+ // Add job roots to the scheduling queue
+ for (dependent in task.dependents) {
+ if (dependent.state != TaskStatus.READY) {
+ continue
}
- if (job.isFinished) {
- finishJob(job)
- }
+ incomingTasks += dependent
+ rootListener.taskReady(dependent)
+ }
- requestCycle()
+ if (job.isFinished) {
+ finishJob(job)
}
- else -> throw IllegalStateException()
+
+ requestCycle()
}
+ else -> throw IllegalStateException()
}
}
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- job.monitor.onJobFinish(job.job, simulationContext.clock.millis())
+ eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis()))
rootListener.jobFinished(job)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt
new file mode 100644
index 00000000..2ca5a19d
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt
@@ -0,0 +1,76 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+import com.atlarge.opendc.workflows.workload.Job
+import com.atlarge.opendc.workflows.workload.Task
+
+/**
+ * An event emitted by the [WorkflowService].
+ */
+public sealed class WorkflowEvent {
+ /**
+ * The [WorkflowService] that emitted the event.
+ */
+ public abstract val service: WorkflowService
+
+ /**
+ * This event is emitted when a job has become active.
+ */
+ public data class JobStarted(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a job has finished processing.
+ */
+ public data class JobFinished(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a task of a job has started processing.
+ */
+ public data class TaskStarted(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val task: Task,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a task of a job has started processing.
+ */
+ public data class TaskFinished(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val task: Task,
+ public val time: Long
+ ) : WorkflowEvent()
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index 524f4f9e..38ea49c4 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -25,8 +25,8 @@
package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.core.services.AbstractServiceKey
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.workload.Job
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -36,9 +36,14 @@ import java.util.UUID
*/
public interface WorkflowService {
/**
+ * Thie events emitted by the workflow scheduler.
+ */
+ public val events: Flow<WorkflowEvent>
+
+ /**
* Submit the specified [Job] to the workflow service for scheduling.
*/
- public suspend fun submit(job: Job, monitor: WorkflowMonitor)
+ public suspend fun submit(job: Job)
/**
* The service key for the workflow scheduler.
diff --git a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 19e56482..5ee6d5e6 100644
--- a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -29,17 +29,16 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
@@ -64,24 +63,6 @@ internal class StageWorkflowSchedulerIntegrationTest {
var tasksStarted = 0L
var tasksFinished = 0L
- val monitor = object : WorkflowMonitor {
- override suspend fun onJobStart(job: Job, time: Long) {
- jobsStarted++
- }
-
- override suspend fun onJobFinish(job: Job, time: Long) {
- jobsFinished++
- }
-
- override suspend fun onTaskStart(job: Job, task: Task, time: Long) {
- tasksStarted++
- }
-
- override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) {
- tasksFinished++
- }
- }
-
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider(name = "sim")
@@ -104,6 +85,21 @@ internal class StageWorkflowSchedulerIntegrationTest {
}
val broker = system.newDomain(name = "broker")
+
+ broker.launch {
+ val scheduler = schedulerAsync.await()
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is WorkflowEvent.JobStarted -> jobsStarted++
+ is WorkflowEvent.JobFinished -> jobsFinished++
+ is WorkflowEvent.TaskStarted -> tasksStarted++
+ is WorkflowEvent.TaskFinished -> tasksFinished++
+ }
+ }
+ .collect()
+ }
+
broker.launch {
val ctx = simulationContext
val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
@@ -113,7 +109,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
val (time, job) = reader.next()
jobsSubmitted++
delay(max(0, time * 1000 - ctx.clock.millis()))
- scheduler.submit(job, monitor)
+ scheduler.submit(job)
}
}