summaryrefslogtreecommitdiff
path: root/simulator/opendc-utils/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-27 12:33:13 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-27 12:36:18 +0100
commit5b0eaf76ec00192c755b268b7655f6463c5bc62f (patch)
treecf7c04ce21d0da964172ef35deb45c843aea8b98 /simulator/opendc-utils/src/main
parentccd1f96f8568978c80aa0b9a100ca6158ade34ba (diff)
compute: Migrate compute service simulator to OpenTelemetry
This change updates the compute service simulator to use OpenTelemetry for reporting metrics of the (simulated) hosts as opposed to using custom event flows. This approach is more generic, flexible and possibly offers better performance as we can collect metrics of all services in a single sweep, as opposed to listening to several services and each invoking the handlers.
Diffstat (limited to 'simulator/opendc-utils/src/main')
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt112
1 files changed, 0 insertions, 112 deletions
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
deleted file mode 100644
index 10f29f4e..00000000
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.utils.flow
-
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.channels.SendChannel
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.FlowCollector
-import kotlinx.coroutines.flow.consumeAsFlow
-
-/**
- * A [Flow] that can be used to emit events.
- */
-public interface EventFlow<T> : Flow<T> {
- /**
- * Emit the specified [event].
- */
- public fun emit(event: T)
-
- /**
- * Close the flow.
- */
- public fun close()
-}
-
-/**
- * Creates a new [EventFlow].
- */
-@Suppress("FunctionName")
-public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl()
-
-/**
- * Internal implementation of the [EventFlow] class.
- */
-@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
-private class EventFlowImpl<T> : EventFlow<T> {
- private var closed: Boolean = false
- private val subscribers = mutableListOf<SendChannel<T>>()
-
- override fun emit(event: T) {
- if (closed) {
- return
- }
-
- val it = subscribers.iterator()
- synchronized(this) {
- while (it.hasNext()) {
- val chan = it.next()
- if (chan.isClosedForSend) {
- it.remove()
- } else {
- chan.offer(event)
- }
- }
- }
- }
-
- override fun close() {
- synchronized(this) {
- closed = true
-
- for (chan in subscribers) {
- chan.close()
- }
-
- subscribers.clear()
- }
- }
-
- @InternalCoroutinesApi
- override suspend fun collect(collector: FlowCollector<T>) {
- val channel: Channel<T>
- synchronized(this) {
- if (closed) {
- return
- }
-
- channel = Channel(Channel.UNLIMITED)
- subscribers.add(channel)
- }
- try {
- channel.consumeAsFlow().collect(collector)
- } finally {
- channel.close()
- }
- }
-
- override fun toString(): String = "EventFlow"
-}