summaryrefslogtreecommitdiff
path: root/simulator/opendc-utils/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-02 21:15:38 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-07 16:13:13 +0100
commite85a11645a2262e2e6fd1e3570ad001eb805c85f (patch)
treec5a6958d89ab1cfc6b557f2a50446d603bb05b57 /simulator/opendc-utils/src/main
parent58c73773a75a0e0a8f85217e2e97c64128ce8ab8 (diff)
compute: Separate cloud compute layer from bare-metal layer
This change separates the cloud compute layer in OpenDC (e.g., Server) from the bare-metal layer (e.g., Node), such that Node and BareMetalDriver are unaware of the existence of Server and co.
Diffstat (limited to 'simulator/opendc-utils/src/main')
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt28
1 files changed, 22 insertions, 6 deletions
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
index 948595b1..10f29f4e 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
@@ -58,12 +58,22 @@ public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl()
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
private class EventFlowImpl<T> : EventFlow<T> {
private var closed: Boolean = false
- private val subscribers = HashMap<SendChannel<T>, Unit>()
+ private val subscribers = mutableListOf<SendChannel<T>>()
override fun emit(event: T) {
+ if (closed) {
+ return
+ }
+
+ val it = subscribers.iterator()
synchronized(this) {
- for ((chan, _) in subscribers) {
- chan.offer(event)
+ while (it.hasNext()) {
+ val chan = it.next()
+ if (chan.isClosedForSend) {
+ it.remove()
+ } else {
+ chan.offer(event)
+ }
}
}
}
@@ -72,9 +82,11 @@ private class EventFlowImpl<T> : EventFlow<T> {
synchronized(this) {
closed = true
- for ((chan, _) in subscribers) {
+ for (chan in subscribers) {
chan.close()
}
+
+ subscribers.clear()
}
}
@@ -87,9 +99,13 @@ private class EventFlowImpl<T> : EventFlow<T> {
}
channel = Channel(Channel.UNLIMITED)
- subscribers[channel] = Unit
+ subscribers.add(channel)
+ }
+ try {
+ channel.consumeAsFlow().collect(collector)
+ } finally {
+ channel.close()
}
- channel.consumeAsFlow().collect(collector)
}
override fun toString(): String = "EventFlow"