diff options
Diffstat (limited to 'simulator/opendc-trace')
12 files changed, 819 insertions, 0 deletions
diff --git a/simulator/opendc-trace/build.gradle.kts b/simulator/opendc-trace/build.gradle.kts new file mode 100644 index 00000000..a1a751a2 --- /dev/null +++ b/simulator/opendc-trace/build.gradle.kts @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ diff --git a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts b/simulator/opendc-trace/opendc-trace-core/build.gradle.kts new file mode 100644 index 00000000..3db6669a --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/build.gradle.kts @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Event tracing library for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-convention` +} + +dependencies { + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Library.KOTLINX_COROUTINES}") +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt new file mode 100644 index 00000000..1f4bb267 --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +/** + * Base class for events reported by the OpenDC tracing library. + */ +public abstract class Event(timestamp: Long = Long.MIN_VALUE) { + /** + * The timestamp at which the event has occurred. + */ + public var timestamp: Long = timestamp + internal set +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt new file mode 100644 index 00000000..ac2b5e9b --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +/** + * A stream of [Event]s. + */ +public interface EventStream : AutoCloseable { + /** + * Register the specified [action] to be performed on every event in the stream. + */ + public fun onEvent(action: (Event) -> Unit) + + /** + * Register the specified [action] to be performed on events of type [E]. + */ + public fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit) + + /** + * Register the specified [action] to be performed on errors. + */ + public fun onError(action: (Throwable) -> Unit) + + /** + * Register the specified [action] to be performed when the stream is closed. + */ + public fun onClose(action: Runnable) + + /** + * Unregister the specified [action]. + * + * @return `true` if an action was unregistered, `false` otherwise. + */ + public fun remove(action: Any): Boolean + + /** + * Start the processing of events in the current coroutine. + * + * @throws IllegalStateException if the stream was already started. + */ + public suspend fun start() + + /** + * Release all resources associated with this stream. + * + * @throws IllegalStateException if the stream was already stopped. + */ + public override fun close() +} + +/** + * Register the specified [action] to be performed on events of type [E]. + */ +public inline fun <reified E : Event> EventStream.onEvent(noinline action: (E) -> Unit) { + onEvent(E::class.java, action) +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt new file mode 100644 index 00000000..4f978f4f --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +import org.opendc.trace.core.internal.EventTracerImpl +import java.time.Clock + +/** + * An [EventTracer] is responsible for recording the events that occur in a system. + */ +public interface EventTracer : AutoCloseable { + /** + * The [Clock] used to measure the timestamp and duration of the events. + */ + public val clock: Clock + + /** + * Determine whether the specified [Event] class is currently enabled in any of the active recordings. + * + * @return `true` if the event is enabled, `false` otherwise. + */ + public fun isEnabled(type: Class<out Event>): Boolean + + /** + * Commit the specified [event] to the appropriate event streams. + */ + public fun commit(event: Event) + + /** + * Create a new [RecordingStream] which is able to actively capture events emitted to the [EventTracer]. + */ + public fun openRecording(): RecordingStream + + /** + * Terminate the lifecycle of the [EventTracer] and close its associated event streams. + */ + public override fun close() + + public companion object { + /** + * Construct a new [EventTracer] instance. + * + * @param clock The [Clock] used to measure the timestamps. + */ + @JvmName("create") + public operator fun invoke(clock: Clock): EventTracer = EventTracerImpl(clock) + } +} + +/** + * Determine whether the [Event] of type [E] is currently enabled in any of the active recordings. + * + * @return `true` if the event is enabled, `false` otherwise. + */ +public inline fun <reified E : Event> EventTracer.isEnabled(): Boolean = isEnabled(E::class.java) + +/** + * Lazily construct an [Event] of type [E] if it is enabled and commit it to the appropriate event streams. + */ +public inline fun <reified E : Event> EventTracer.commit(block: () -> E) { + if (isEnabled<E>()) { + commit(block()) + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt new file mode 100644 index 00000000..84dcc61a --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.sendBlocking +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow + +/** + * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public fun EventStream.asFlow(): Flow<Event> = callbackFlow { + onEvent { sendBlocking(it) } + onError { cancel(CancellationException("API error", it)) } + onClose { channel.close() } + awaitClose { this@asFlow.close() } +} + +/** + * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public fun EventStream.consumeAsFlow(): Flow<Event> = callbackFlow { + onEvent { sendBlocking(it) } + onError { cancel(CancellationException("API error", it)) } + start() +} + +/** + * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public inline fun <reified E : Event> EventStream.asTypedFlow(): Flow<E> = callbackFlow { + onEvent<E> { sendBlocking(it) } + onError { cancel(CancellationException("API error", it)) } + onClose { channel.close() } + awaitClose { this@asTypedFlow.close() } +} + +/** + * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public inline fun <reified E : Event> EventStream.consumeAsTypedFlow(): Flow<E> = callbackFlow { + onEvent<E> { sendBlocking(it) } + onError { cancel(CancellationException("API error", it)) } + start() +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt new file mode 100644 index 00000000..f49e7c49 --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +/** + * A recording stream that produces events from an [EventTracer]. + */ +public interface RecordingStream : EventStream { + /** + * Enable recording of the specified event [type]. + */ + public fun enable(type: Class<out Event>) + + /** + * Disable recording of the specified event [type] + */ + public fun disable(type: Class<out Event>) +} + +/** + * Enable recording of events of type [E]. + */ +public inline fun <reified E : Event> RecordingStream.enable() { + enable(E::class.java) +} + +/** + * Disable recording of events of type [E]. + */ +public inline fun <reified E : Event> RecordingStream.disable() { + enable(E::class.java) +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt new file mode 100644 index 00000000..1887ad7a --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.trace.core.Event +import org.opendc.trace.core.EventStream +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume + +/** + * Base implementation of the [EventStream] implementation. + */ +internal abstract class AbstractEventStream : EventStream { + /** + * The state of the stream. + */ + protected var state = StreamState.Pending + + /** + * The event actions to dispatch to. + */ + private val eventActions = mutableListOf<EventDispatcher>() + + /** + * The error actions to use. + */ + private val errorActions = mutableListOf<(Throwable) -> Unit>() + + /** + * The close actions to use. + */ + private val closeActions = mutableListOf<Runnable>() + + /** + * The continuation that is invoked when the stream closes. + */ + private var cont: Continuation<Unit>? = null + + /** + * Dispatch the specified [event] to this stream. + */ + fun dispatch(event: Event) { + val actions = eventActions + + // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type) + for (action in actions) { + if (!action.accepts(event)) { + continue + } + + try { + action(event) + } catch (e: Exception) { + handleError(e) + } + } + } + + /** + * Handle the specified [throwable] that occurred while dispatching an event. + */ + private fun handleError(throwable: Throwable) { + val actions = errorActions + + // Default exception handler + if (actions.isEmpty()) { + throwable.printStackTrace() + return + } + + for (action in actions) { + action(throwable) + } + } + + override fun onEvent(action: (Event) -> Unit) { + eventActions += EventDispatcher(null, action) + } + + override fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit) { + @Suppress("UNCHECKED_CAST") // This cast must succeed + eventActions += EventDispatcher(type, action as (Event) -> Unit) + } + + override fun onError(action: (Throwable) -> Unit) { + errorActions += action + } + + override fun onClose(action: Runnable) { + closeActions += action + } + + override fun remove(action: Any): Boolean { + return eventActions.removeIf { it.action == action } || errorActions.remove(action) || closeActions.remove(action) + } + + override suspend fun start() { + check(state == StreamState.Pending) { "Stream has already started/closed" } + + state = StreamState.Started + + return suspendCancellableCoroutine { cont -> this.cont = cont } + } + + override fun close() { + if (state != StreamState.Closed) { + return + } + + state = StreamState.Closed + cont?.resume(Unit) + + val actions = closeActions + for (action in actions) { + action.run() + } + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt new file mode 100644 index 00000000..8b6de75e --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +import org.opendc.trace.core.Event + +/** + * The [Dispatcher] is responsible for dispatching events onto configured actions. + */ +internal class Dispatcher { + /** + * The event actions to dispatch to. + */ + private val eventActions = mutableListOf<EventDispatcher>() + + /** + * The error actions to use. + */ + private val errorActions = mutableListOf<(Throwable) -> Unit>() + + /** + * Dispatch the specified [event]. + */ + fun dispatch(event: Event) { + val actions = eventActions + + // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type) + for (action in actions) { + if (!action.accepts(event)) { + continue + } + + try { + action(event) + } catch (e: Exception) { + handleError(e) + } + } + } + + /** + * Handle the specified [throwable] that occurred while dispatching an event. + */ + private fun handleError(throwable: Throwable) { + val actions = errorActions + + // Default exception handler + if (actions.isEmpty()) { + throwable.printStackTrace() + return + } + + for (action in actions) { + action(throwable) + } + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt new file mode 100644 index 00000000..b2a662eb --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +import org.opendc.trace.core.Event + +/** + * A dispatcher responsible for conditionally dispatching an event. + */ +internal class EventDispatcher(val type: Class<out Event>?, val action: (Event) -> Unit) { + /** + * Determine whether this dispatcher accepts the specified event. + */ + fun accepts(event: Event): Boolean { + return type == null || type.isAssignableFrom(event.javaClass) + } + + /** + * Invoke the specified [event] on this action. + */ + operator fun invoke(event: Event) { + action(event) + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt new file mode 100644 index 00000000..e85d0779 --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +import org.opendc.trace.core.Event +import org.opendc.trace.core.EventTracer +import org.opendc.trace.core.RecordingStream +import java.lang.reflect.Modifier +import java.time.Clock +import java.util.* + +/** + * Default implementation of the [EventTracer] interface. + */ +internal class EventTracerImpl(override val clock: Clock) : EventTracer { + /** + * The set of enabled events. + */ + private val enabledEvents = IdentityHashMap<Class<out Event>, MutableList<Stream>>() + + /** + * The event streams created by the tracer. + */ + private val streams = WeakHashMap<Stream, Unit>() + + /** + * A flag to indicate that the stream is closed. + */ + private var isClosed: Boolean = false + + override fun isEnabled(type: Class<out Event>): Boolean = enabledEvents.containsKey(type) + + override fun commit(event: Event) { + val type = event.javaClass + + // Assign timestamp if not set + if (event.timestamp == Long.MIN_VALUE) { + event.timestamp = clock.millis() + } + + if (!isEnabled(type) || isClosed) { + return + } + + val streams = enabledEvents[type] ?: return + for (stream in streams) { + stream.dispatch(event) + } + } + + override fun openRecording(): RecordingStream = Stream() + + override fun close() { + isClosed = true + + val streams = streams + for ((stream, _) in streams) { + stream.close() + } + + enabledEvents.clear() + } + + /** + * Enable the specified [type] for the given [stream]. + */ + private fun enableFor(type: Class<out Event>, stream: Stream) { + val res = enabledEvents.computeIfAbsent(type) { mutableListOf() } + res.add(stream) + } + + /** + * Disable the specified [type] for the given [stream]. + */ + private fun disableFor(type: Class<out Event>, stream: Stream) { + enabledEvents[type]?.remove(stream) + } + + /** + * The [RecordingStream] associated with this [EventTracer] implementation. + */ + private inner class Stream : AbstractEventStream(), RecordingStream { + /** + * The set of enabled events for this stream. + */ + private val enabledEvents = IdentityHashMap<Class<out Event>, Unit>() + + init { + streams[this] = Unit + } + + override fun enable(type: Class<out Event>) { + validateEventClass(type) + + if (enabledEvents.put(type, Unit) == null && state == StreamState.Started) { + enableFor(type, this) + } + } + + override fun disable(type: Class<out Event>) { + validateEventClass(type) + + if (enabledEvents.remove(type) != null && state == StreamState.Started) { + disableFor(type, this) + } + } + + override suspend fun start() { + val enabledEvents = enabledEvents + for ((event, _) in enabledEvents) { + enableFor(event, this) + } + + super.start() + } + + override fun close() { + val enabledEvents = enabledEvents + for ((event, _) in enabledEvents) { + disableFor(event, this) + } + + // Remove this stream from the active streams + streams.remove(this) + + super.close() + } + + /** + * Validate the specified event subclass. + */ + private fun validateEventClass(type: Class<out Event>) { + require(!Modifier.isAbstract(type.modifiers)) { "Abstract event classes are not allowed" } + require(Event::class.java.isAssignableFrom(type)) { "Must be subclass to ${Event::class.qualifiedName}" } + } + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt new file mode 100644 index 00000000..9f411e0d --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +/** + * The state of a [Stream]. + */ +internal enum class StreamState { + Pending, Started, Closed +} |
