diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-02 21:15:38 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-07 16:13:13 +0100 |
| commit | e85a11645a2262e2e6fd1e3570ad001eb805c85f (patch) | |
| tree | c5a6958d89ab1cfc6b557f2a50446d603bb05b57 /simulator/opendc-utils/src/main | |
| parent | 58c73773a75a0e0a8f85217e2e97c64128ce8ab8 (diff) | |
compute: Separate cloud compute layer from bare-metal layer
This change separates the cloud compute layer in OpenDC (e.g., Server)
from the bare-metal layer (e.g., Node), such that Node and
BareMetalDriver are unaware of the existence of Server and co.
Diffstat (limited to 'simulator/opendc-utils/src/main')
| -rw-r--r-- | simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt index 948595b1..10f29f4e 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt @@ -58,12 +58,22 @@ public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl() @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) private class EventFlowImpl<T> : EventFlow<T> { private var closed: Boolean = false - private val subscribers = HashMap<SendChannel<T>, Unit>() + private val subscribers = mutableListOf<SendChannel<T>>() override fun emit(event: T) { + if (closed) { + return + } + + val it = subscribers.iterator() synchronized(this) { - for ((chan, _) in subscribers) { - chan.offer(event) + while (it.hasNext()) { + val chan = it.next() + if (chan.isClosedForSend) { + it.remove() + } else { + chan.offer(event) + } } } } @@ -72,9 +82,11 @@ private class EventFlowImpl<T> : EventFlow<T> { synchronized(this) { closed = true - for ((chan, _) in subscribers) { + for (chan in subscribers) { chan.close() } + + subscribers.clear() } } @@ -87,9 +99,13 @@ private class EventFlowImpl<T> : EventFlow<T> { } channel = Channel(Channel.UNLIMITED) - subscribers[channel] = Unit + subscribers.add(channel) + } + try { + channel.consumeAsFlow().collect(collector) + } finally { + channel.close() } - channel.consumeAsFlow().collect(collector) } override fun toString(): String = "EventFlow" |
