summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/opendc-trace/build.gradle.kts21
-rw-r--r--simulator/opendc-trace/opendc-trace-core/build.gradle.kts32
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt34
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt76
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt84
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt73
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt52
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt139
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt77
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt44
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt157
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt30
-rw-r--r--simulator/settings.gradle.kts1
13 files changed, 820 insertions, 0 deletions
diff --git a/simulator/opendc-trace/build.gradle.kts b/simulator/opendc-trace/build.gradle.kts
new file mode 100644
index 00000000..a1a751a2
--- /dev/null
+++ b/simulator/opendc-trace/build.gradle.kts
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
diff --git a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts b/simulator/opendc-trace/opendc-trace-core/build.gradle.kts
new file mode 100644
index 00000000..3db6669a
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/build.gradle.kts
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Event tracing library for OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-convention`
+}
+
+dependencies {
+ api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Library.KOTLINX_COROUTINES}")
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt
new file mode 100644
index 00000000..1f4bb267
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+/**
+ * Base class for events reported by the OpenDC tracing library.
+ */
+public abstract class Event(timestamp: Long = Long.MIN_VALUE) {
+ /**
+ * The timestamp at which the event has occurred.
+ */
+ public var timestamp: Long = timestamp
+ internal set
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt
new file mode 100644
index 00000000..ac2b5e9b
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+/**
+ * A stream of [Event]s.
+ */
+public interface EventStream : AutoCloseable {
+ /**
+ * Register the specified [action] to be performed on every event in the stream.
+ */
+ public fun onEvent(action: (Event) -> Unit)
+
+ /**
+ * Register the specified [action] to be performed on events of type [E].
+ */
+ public fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit)
+
+ /**
+ * Register the specified [action] to be performed on errors.
+ */
+ public fun onError(action: (Throwable) -> Unit)
+
+ /**
+ * Register the specified [action] to be performed when the stream is closed.
+ */
+ public fun onClose(action: Runnable)
+
+ /**
+ * Unregister the specified [action].
+ *
+ * @return `true` if an action was unregistered, `false` otherwise.
+ */
+ public fun remove(action: Any): Boolean
+
+ /**
+ * Start the processing of events in the current coroutine.
+ *
+ * @throws IllegalStateException if the stream was already started.
+ */
+ public suspend fun start()
+
+ /**
+ * Release all resources associated with this stream.
+ *
+ * @throws IllegalStateException if the stream was already stopped.
+ */
+ public override fun close()
+}
+
+/**
+ * Register the specified [action] to be performed on events of type [E].
+ */
+public inline fun <reified E : Event> EventStream.onEvent(noinline action: (E) -> Unit) {
+ onEvent(E::class.java, action)
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt
new file mode 100644
index 00000000..4f978f4f
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+import org.opendc.trace.core.internal.EventTracerImpl
+import java.time.Clock
+
+/**
+ * An [EventTracer] is responsible for recording the events that occur in a system.
+ */
+public interface EventTracer : AutoCloseable {
+ /**
+ * The [Clock] used to measure the timestamp and duration of the events.
+ */
+ public val clock: Clock
+
+ /**
+ * Determine whether the specified [Event] class is currently enabled in any of the active recordings.
+ *
+ * @return `true` if the event is enabled, `false` otherwise.
+ */
+ public fun isEnabled(type: Class<out Event>): Boolean
+
+ /**
+ * Commit the specified [event] to the appropriate event streams.
+ */
+ public fun commit(event: Event)
+
+ /**
+ * Create a new [RecordingStream] which is able to actively capture events emitted to the [EventTracer].
+ */
+ public fun openRecording(): RecordingStream
+
+ /**
+ * Terminate the lifecycle of the [EventTracer] and close its associated event streams.
+ */
+ public override fun close()
+
+ public companion object {
+ /**
+ * Construct a new [EventTracer] instance.
+ *
+ * @param clock The [Clock] used to measure the timestamps.
+ */
+ @JvmName("create")
+ public operator fun invoke(clock: Clock): EventTracer = EventTracerImpl(clock)
+ }
+}
+
+/**
+ * Determine whether the [Event] of type [E] is currently enabled in any of the active recordings.
+ *
+ * @return `true` if the event is enabled, `false` otherwise.
+ */
+public inline fun <reified E : Event> EventTracer.isEnabled(): Boolean = isEnabled(E::class.java)
+
+/**
+ * Lazily construct an [Event] of type [E] if it is enabled and commit it to the appropriate event streams.
+ */
+public inline fun <reified E : Event> EventTracer.commit(block: () -> E) {
+ if (isEnabled<E>()) {
+ commit(block())
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt
new file mode 100644
index 00000000..84dcc61a
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.awaitClose
+import kotlinx.coroutines.channels.sendBlocking
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.callbackFlow
+
+/**
+ * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public fun EventStream.asFlow(): Flow<Event> = callbackFlow {
+ onEvent { sendBlocking(it) }
+ onError { cancel(CancellationException("API error", it)) }
+ onClose { channel.close() }
+ awaitClose { this@asFlow.close() }
+}
+
+/**
+ * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public fun EventStream.consumeAsFlow(): Flow<Event> = callbackFlow {
+ onEvent { sendBlocking(it) }
+ onError { cancel(CancellationException("API error", it)) }
+ start()
+}
+
+/**
+ * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public inline fun <reified E : Event> EventStream.asTypedFlow(): Flow<E> = callbackFlow {
+ onEvent<E> { sendBlocking(it) }
+ onError { cancel(CancellationException("API error", it)) }
+ onClose { channel.close() }
+ awaitClose { this@asTypedFlow.close() }
+}
+
+/**
+ * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public inline fun <reified E : Event> EventStream.consumeAsTypedFlow(): Flow<E> = callbackFlow {
+ onEvent<E> { sendBlocking(it) }
+ onError { cancel(CancellationException("API error", it)) }
+ start()
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt
new file mode 100644
index 00000000..f49e7c49
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+/**
+ * A recording stream that produces events from an [EventTracer].
+ */
+public interface RecordingStream : EventStream {
+ /**
+ * Enable recording of the specified event [type].
+ */
+ public fun enable(type: Class<out Event>)
+
+ /**
+ * Disable recording of the specified event [type]
+ */
+ public fun disable(type: Class<out Event>)
+}
+
+/**
+ * Enable recording of events of type [E].
+ */
+public inline fun <reified E : Event> RecordingStream.enable() {
+ enable(E::class.java)
+}
+
+/**
+ * Disable recording of events of type [E].
+ */
+public inline fun <reified E : Event> RecordingStream.disable() {
+ enable(E::class.java)
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt
new file mode 100644
index 00000000..1887ad7a
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.trace.core.Event
+import org.opendc.trace.core.EventStream
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+
+/**
+ * Base implementation of the [EventStream] implementation.
+ */
+internal abstract class AbstractEventStream : EventStream {
+ /**
+ * The state of the stream.
+ */
+ protected var state = StreamState.Pending
+
+ /**
+ * The event actions to dispatch to.
+ */
+ private val eventActions = mutableListOf<EventDispatcher>()
+
+ /**
+ * The error actions to use.
+ */
+ private val errorActions = mutableListOf<(Throwable) -> Unit>()
+
+ /**
+ * The close actions to use.
+ */
+ private val closeActions = mutableListOf<Runnable>()
+
+ /**
+ * The continuation that is invoked when the stream closes.
+ */
+ private var cont: Continuation<Unit>? = null
+
+ /**
+ * Dispatch the specified [event] to this stream.
+ */
+ fun dispatch(event: Event) {
+ val actions = eventActions
+
+ // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type)
+ for (action in actions) {
+ if (!action.accepts(event)) {
+ continue
+ }
+
+ try {
+ action(event)
+ } catch (e: Exception) {
+ handleError(e)
+ }
+ }
+ }
+
+ /**
+ * Handle the specified [throwable] that occurred while dispatching an event.
+ */
+ private fun handleError(throwable: Throwable) {
+ val actions = errorActions
+
+ // Default exception handler
+ if (actions.isEmpty()) {
+ throwable.printStackTrace()
+ return
+ }
+
+ for (action in actions) {
+ action(throwable)
+ }
+ }
+
+ override fun onEvent(action: (Event) -> Unit) {
+ eventActions += EventDispatcher(null, action)
+ }
+
+ override fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit) {
+ @Suppress("UNCHECKED_CAST") // This cast must succeed
+ eventActions += EventDispatcher(type, action as (Event) -> Unit)
+ }
+
+ override fun onError(action: (Throwable) -> Unit) {
+ errorActions += action
+ }
+
+ override fun onClose(action: Runnable) {
+ closeActions += action
+ }
+
+ override fun remove(action: Any): Boolean {
+ return eventActions.removeIf { it.action == action } || errorActions.remove(action) || closeActions.remove(action)
+ }
+
+ override suspend fun start() {
+ check(state == StreamState.Pending) { "Stream has already started/closed" }
+
+ state = StreamState.Started
+
+ return suspendCancellableCoroutine { cont -> this.cont = cont }
+ }
+
+ override fun close() {
+ if (state != StreamState.Closed) {
+ return
+ }
+
+ state = StreamState.Closed
+ cont?.resume(Unit)
+
+ val actions = closeActions
+ for (action in actions) {
+ action.run()
+ }
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt
new file mode 100644
index 00000000..8b6de75e
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+import org.opendc.trace.core.Event
+
+/**
+ * The [Dispatcher] is responsible for dispatching events onto configured actions.
+ */
+internal class Dispatcher {
+ /**
+ * The event actions to dispatch to.
+ */
+ private val eventActions = mutableListOf<EventDispatcher>()
+
+ /**
+ * The error actions to use.
+ */
+ private val errorActions = mutableListOf<(Throwable) -> Unit>()
+
+ /**
+ * Dispatch the specified [event].
+ */
+ fun dispatch(event: Event) {
+ val actions = eventActions
+
+ // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type)
+ for (action in actions) {
+ if (!action.accepts(event)) {
+ continue
+ }
+
+ try {
+ action(event)
+ } catch (e: Exception) {
+ handleError(e)
+ }
+ }
+ }
+
+ /**
+ * Handle the specified [throwable] that occurred while dispatching an event.
+ */
+ private fun handleError(throwable: Throwable) {
+ val actions = errorActions
+
+ // Default exception handler
+ if (actions.isEmpty()) {
+ throwable.printStackTrace()
+ return
+ }
+
+ for (action in actions) {
+ action(throwable)
+ }
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt
new file mode 100644
index 00000000..b2a662eb
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+import org.opendc.trace.core.Event
+
+/**
+ * A dispatcher responsible for conditionally dispatching an event.
+ */
+internal class EventDispatcher(val type: Class<out Event>?, val action: (Event) -> Unit) {
+ /**
+ * Determine whether this dispatcher accepts the specified event.
+ */
+ fun accepts(event: Event): Boolean {
+ return type == null || type.isAssignableFrom(event.javaClass)
+ }
+
+ /**
+ * Invoke the specified [event] on this action.
+ */
+ operator fun invoke(event: Event) {
+ action(event)
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt
new file mode 100644
index 00000000..e85d0779
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+import org.opendc.trace.core.Event
+import org.opendc.trace.core.EventTracer
+import org.opendc.trace.core.RecordingStream
+import java.lang.reflect.Modifier
+import java.time.Clock
+import java.util.*
+
+/**
+ * Default implementation of the [EventTracer] interface.
+ */
+internal class EventTracerImpl(override val clock: Clock) : EventTracer {
+ /**
+ * The set of enabled events.
+ */
+ private val enabledEvents = IdentityHashMap<Class<out Event>, MutableList<Stream>>()
+
+ /**
+ * The event streams created by the tracer.
+ */
+ private val streams = WeakHashMap<Stream, Unit>()
+
+ /**
+ * A flag to indicate that the stream is closed.
+ */
+ private var isClosed: Boolean = false
+
+ override fun isEnabled(type: Class<out Event>): Boolean = enabledEvents.containsKey(type)
+
+ override fun commit(event: Event) {
+ val type = event.javaClass
+
+ // Assign timestamp if not set
+ if (event.timestamp == Long.MIN_VALUE) {
+ event.timestamp = clock.millis()
+ }
+
+ if (!isEnabled(type) || isClosed) {
+ return
+ }
+
+ val streams = enabledEvents[type] ?: return
+ for (stream in streams) {
+ stream.dispatch(event)
+ }
+ }
+
+ override fun openRecording(): RecordingStream = Stream()
+
+ override fun close() {
+ isClosed = true
+
+ val streams = streams
+ for ((stream, _) in streams) {
+ stream.close()
+ }
+
+ enabledEvents.clear()
+ }
+
+ /**
+ * Enable the specified [type] for the given [stream].
+ */
+ private fun enableFor(type: Class<out Event>, stream: Stream) {
+ val res = enabledEvents.computeIfAbsent(type) { mutableListOf() }
+ res.add(stream)
+ }
+
+ /**
+ * Disable the specified [type] for the given [stream].
+ */
+ private fun disableFor(type: Class<out Event>, stream: Stream) {
+ enabledEvents[type]?.remove(stream)
+ }
+
+ /**
+ * The [RecordingStream] associated with this [EventTracer] implementation.
+ */
+ private inner class Stream : AbstractEventStream(), RecordingStream {
+ /**
+ * The set of enabled events for this stream.
+ */
+ private val enabledEvents = IdentityHashMap<Class<out Event>, Unit>()
+
+ init {
+ streams[this] = Unit
+ }
+
+ override fun enable(type: Class<out Event>) {
+ validateEventClass(type)
+
+ if (enabledEvents.put(type, Unit) == null && state == StreamState.Started) {
+ enableFor(type, this)
+ }
+ }
+
+ override fun disable(type: Class<out Event>) {
+ validateEventClass(type)
+
+ if (enabledEvents.remove(type) != null && state == StreamState.Started) {
+ disableFor(type, this)
+ }
+ }
+
+ override suspend fun start() {
+ val enabledEvents = enabledEvents
+ for ((event, _) in enabledEvents) {
+ enableFor(event, this)
+ }
+
+ super.start()
+ }
+
+ override fun close() {
+ val enabledEvents = enabledEvents
+ for ((event, _) in enabledEvents) {
+ disableFor(event, this)
+ }
+
+ // Remove this stream from the active streams
+ streams.remove(this)
+
+ super.close()
+ }
+
+ /**
+ * Validate the specified event subclass.
+ */
+ private fun validateEventClass(type: Class<out Event>) {
+ require(!Modifier.isAbstract(type.modifiers)) { "Abstract event classes are not allowed" }
+ require(Event::class.java.isAssignableFrom(type)) { "Must be subclass to ${Event::class.qualifiedName}" }
+ }
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt
new file mode 100644
index 00000000..9f411e0d
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+/**
+ * The state of a [Stream].
+ */
+internal enum class StreamState {
+ Pending, Started, Closed
+}
diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts
index 935a18d0..e6f42574 100644
--- a/simulator/settings.gradle.kts
+++ b/simulator/settings.gradle.kts
@@ -32,4 +32,5 @@ include(":opendc-runner-web")
include(":opendc-simulator:opendc-simulator-core")
include(":opendc-simulator:opendc-simulator-compute")
include(":opendc-simulator:opendc-simulator-failures")
+include(":opendc-trace:opendc-trace-core")
include(":opendc-utils")