From e85a11645a2262e2e6fd1e3570ad001eb805c85f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 2 Mar 2021 21:15:38 +0100 Subject: compute: Separate cloud compute layer from bare-metal layer This change separates the cloud compute layer in OpenDC (e.g., Server) from the bare-metal layer (e.g., Node), such that Node and BareMetalDriver are unaware of the existence of Server and co. --- .../main/kotlin/org/opendc/utils/flow/EventFlow.kt | 28 +++++++++++++++++----- 1 file changed, 22 insertions(+), 6 deletions(-) (limited to 'simulator/opendc-utils') diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt index 948595b1..10f29f4e 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt @@ -58,12 +58,22 @@ public fun EventFlow(): EventFlow = EventFlowImpl() @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) private class EventFlowImpl : EventFlow { private var closed: Boolean = false - private val subscribers = HashMap, Unit>() + private val subscribers = mutableListOf>() override fun emit(event: T) { + if (closed) { + return + } + + val it = subscribers.iterator() synchronized(this) { - for ((chan, _) in subscribers) { - chan.offer(event) + while (it.hasNext()) { + val chan = it.next() + if (chan.isClosedForSend) { + it.remove() + } else { + chan.offer(event) + } } } } @@ -72,9 +82,11 @@ private class EventFlowImpl : EventFlow { synchronized(this) { closed = true - for ((chan, _) in subscribers) { + for (chan in subscribers) { chan.close() } + + subscribers.clear() } } @@ -87,9 +99,13 @@ private class EventFlowImpl : EventFlow { } channel = Channel(Channel.UNLIMITED) - subscribers[channel] = Unit + subscribers.add(channel) + } + try { + channel.consumeAsFlow().collect(collector) + } finally { + channel.close() } - channel.consumeAsFlow().collect(collector) } override fun toString(): String = "EventFlow" -- cgit v1.2.3