summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/opendc-compute/opendc-compute-core/build.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt31
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt31
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt30
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt30
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt32
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt30
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt19
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt6
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt6
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt9
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt5
-rw-r--r--simulator/opendc-trace/build.gradle.kts21
-rw-r--r--simulator/opendc-trace/opendc-trace-core/build.gradle.kts32
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt34
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt76
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt84
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt73
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt52
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt139
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt77
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt44
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt157
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt30
-rw-r--r--simulator/opendc-workflows/build.gradle.kts1
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt31
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt23
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt2
-rw-r--r--simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt14
-rw-r--r--simulator/settings.gradle.kts1
31 files changed, 1092 insertions, 32 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts b/simulator/opendc-compute/opendc-compute-core/build.gradle.kts
index 9682b50f..ac2dc78d 100644
--- a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-core/build.gradle.kts
@@ -29,6 +29,7 @@ plugins {
dependencies {
api(project(":opendc-core"))
+ api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
implementation("io.github.microutils:kotlin-logging:1.7.9")
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt
new file mode 100644
index 00000000..c1802e64
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.core.virt.service.events
+
+import org.opendc.trace.core.Event
+import java.util.*
+
+/**
+ * This event is emitted when a hypervisor has become available.
+ */
+public class HypervisorAvailableEvent(public val uid: UUID) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt
new file mode 100644
index 00000000..1fea21ea
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.core.virt.service.events
+
+import org.opendc.trace.core.Event
+import java.util.*
+
+/**
+ * This event is emitted when a hypervisor has become unavailable.
+ */
+public class HypervisorUnavailableEvent(public val uid: UUID) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt
new file mode 100644
index 00000000..662068dd
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.core.virt.service.events
+
+import org.opendc.trace.core.Event
+
+/**
+ * This event is emitted when a virtual machine has successfully been scheduled on a hypervisor.
+ */
+public class VmScheduledEvent(public val name: String) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt
new file mode 100644
index 00000000..96103129
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.core.virt.service.events
+
+import org.opendc.trace.core.Event
+
+/**
+ * This event is emitted when a virtual machine has stopped running.
+ */
+public class VmStoppedEvent(public val name: String) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt
new file mode 100644
index 00000000..f6b71e22
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.core.virt.service.events
+
+import org.opendc.compute.core.Flavor
+import org.opendc.compute.core.image.Image
+import org.opendc.trace.core.Event
+
+/**
+ * This event is emitted when a virtual machine is submitted to the provisioning service.
+ */
+public class VmSubmissionEvent(public val name: String, public val image: Image, public val flavor: Flavor) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt
new file mode 100644
index 00000000..d0e5c102
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.core.virt.service.events
+
+import org.opendc.trace.core.Event
+
+/**
+ * An event that is emitted when the submission is deemed to be invalid.
+ */
+public class VmSubmissionInvalidEvent(public val name: String) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
index e83370d7..0144fd69 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
@@ -38,7 +38,9 @@ import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
import org.opendc.compute.core.virt.driver.VirtDriver
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.compute.core.virt.service.VirtProvisioningService
+import org.opendc.compute.core.virt.service.events.*
import org.opendc.compute.simulator.allocation.AllocationPolicy
+import org.opendc.trace.core.EventTracer
import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
@@ -51,7 +53,8 @@ public class SimVirtProvisioningService(
private val coroutineScope: CoroutineScope,
private val clock: Clock,
private val provisioningService: ProvisioningService,
- public val allocationPolicy: AllocationPolicy
+ public val allocationPolicy: AllocationPolicy,
+ private val tracer: EventTracer
) : VirtProvisioningService {
/**
* The logger instance to use.
@@ -136,6 +139,8 @@ public class SimVirtProvisioningService(
image: Image,
flavor: Flavor
): Server {
+ tracer.commit(VmSubmissionEvent(name, image, flavor))
+
eventFlow.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
@@ -191,6 +196,8 @@ public class SimVirtProvisioningService(
if (selectedHv == null) {
if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
+ tracer.commit(VmSubmissionInvalidEvent(imageInstance.name))
+
eventFlow.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
@@ -231,6 +238,8 @@ public class SimVirtProvisioningService(
imageInstance.server = server
imageInstance.continuation.resume(server)
+ tracer.commit(VmScheduledEvent(imageInstance.name))
+
eventFlow.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
@@ -252,6 +261,8 @@ public class SimVirtProvisioningService(
if (event.server.state == ServerState.SHUTOFF) {
logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
+ tracer.commit(VmStoppedEvent(event.server.name))
+
eventFlow.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
@@ -310,6 +321,8 @@ public class SimVirtProvisioningService(
hypervisors[server] = hv
}
+ tracer.commit(HypervisorAvailableEvent(server.uid))
+
eventFlow.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
@@ -333,6 +346,8 @@ public class SimVirtProvisioningService(
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
+ tracer.commit(HypervisorUnavailableEvent(hv.uid))
+
eventFlow.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
@@ -359,6 +374,8 @@ public class SimVirtProvisioningService(
hv.driver = hypervisor
availableHypervisors += hv
+ tracer.commit(HypervisorAvailableEvent(hv.uid))
+
eventFlow.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
index 3786eebf..1221c7d3 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
@@ -31,6 +31,7 @@ import org.opendc.compute.core.metal.service.ProvisioningService
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.trace.core.EventTracer
import org.opendc.workflows.service.StageWorkflowService
import org.opendc.workflows.service.WorkflowEvent
import org.opendc.workflows.service.WorkflowSchedulerMode
@@ -59,6 +60,7 @@ public fun main(args: Array<String>) {
val token = Channel<Boolean>()
val testScope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(testScope)
+ val tracer = EventTracer(clock)
val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
@@ -67,6 +69,7 @@ public fun main(args: Array<String>) {
StageWorkflowService(
this,
clock,
+ tracer,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 09f44199..f939738d 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -52,6 +52,7 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FailureDomain
import org.opendc.simulator.failures.FaultInjector
+import org.opendc.trace.core.EventTracer
import java.io.File
import java.time.Clock
import kotlin.math.ln
@@ -140,7 +141,8 @@ public suspend fun createProvisioner(
coroutineScope: CoroutineScope,
clock: Clock,
environmentReader: EnvironmentReader,
- allocationPolicy: AllocationPolicy
+ allocationPolicy: AllocationPolicy,
+ eventTracer: EventTracer
): Pair<ProvisioningService, SimVirtProvisioningService> {
val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
@@ -148,7 +150,7 @@ public suspend fun createProvisioner(
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy)
+ val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer)
// Wait for the hypervisors to be spawned
delay(10)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt
index 660fc882..8d8d608d 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt
@@ -37,6 +37,7 @@ import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.trace.core.EventTracer
import java.io.File
import kotlin.random.Random
@@ -102,12 +103,15 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
parent.parent.parent.bufferSize
)
+ val tracer = EventTracer(clock)
+
testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
this,
clock,
environment,
- allocationPolicy
+ allocationPolicy,
+ tracer
)
val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) {
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 9c44edfc..72a2484a 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -48,6 +48,7 @@ import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.trace.core.EventTracer
import java.io.File
import java.time.Clock
@@ -97,13 +98,15 @@ class Sc20IntegrationTest {
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
lateinit var scheduler: SimVirtProvisioningService
+ val tracer = EventTracer(clock)
testScope.launch {
val res = createProvisioner(
this,
clock,
environmentReader,
- allocationPolicy
+ allocationPolicy,
+ tracer
)
val bareMetalProvisioner = res.first
scheduler = res.second
@@ -160,13 +163,15 @@ class Sc20IntegrationTest {
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
lateinit var scheduler: SimVirtProvisioningService
+ val tracer = EventTracer(clock)
testScope.launch {
val res = createProvisioner(
this,
clock,
environmentReader,
- allocationPolicy
+ allocationPolicy,
+ tracer
)
scheduler = res.second
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 2117b675..7796019a 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -50,6 +50,7 @@ import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.trace.core.EventTracer
import java.io.File
import kotlin.coroutines.coroutineContext
import kotlin.random.Random
@@ -238,13 +239,15 @@ public class RunnerCli : CliktCommand(name = "runner") {
val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
val environment = TopologyParser(topologies, topologyId)
val monitor = WebExperimentMonitor()
+ val tracer = EventTracer(clock)
testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
this,
clock,
environment,
- allocationPolicy
+ allocationPolicy,
+ tracer
)
val failureDomain = if (operational.getBoolean("failuresEnabled")) {
diff --git a/simulator/opendc-trace/build.gradle.kts b/simulator/opendc-trace/build.gradle.kts
new file mode 100644
index 00000000..a1a751a2
--- /dev/null
+++ b/simulator/opendc-trace/build.gradle.kts
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
diff --git a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts b/simulator/opendc-trace/opendc-trace-core/build.gradle.kts
new file mode 100644
index 00000000..3db6669a
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/build.gradle.kts
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Event tracing library for OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-convention`
+}
+
+dependencies {
+ api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Library.KOTLINX_COROUTINES}")
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt
new file mode 100644
index 00000000..1f4bb267
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+/**
+ * Base class for events reported by the OpenDC tracing library.
+ */
+public abstract class Event(timestamp: Long = Long.MIN_VALUE) {
+ /**
+ * The timestamp at which the event has occurred.
+ */
+ public var timestamp: Long = timestamp
+ internal set
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt
new file mode 100644
index 00000000..ac2b5e9b
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+/**
+ * A stream of [Event]s.
+ */
+public interface EventStream : AutoCloseable {
+ /**
+ * Register the specified [action] to be performed on every event in the stream.
+ */
+ public fun onEvent(action: (Event) -> Unit)
+
+ /**
+ * Register the specified [action] to be performed on events of type [E].
+ */
+ public fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit)
+
+ /**
+ * Register the specified [action] to be performed on errors.
+ */
+ public fun onError(action: (Throwable) -> Unit)
+
+ /**
+ * Register the specified [action] to be performed when the stream is closed.
+ */
+ public fun onClose(action: Runnable)
+
+ /**
+ * Unregister the specified [action].
+ *
+ * @return `true` if an action was unregistered, `false` otherwise.
+ */
+ public fun remove(action: Any): Boolean
+
+ /**
+ * Start the processing of events in the current coroutine.
+ *
+ * @throws IllegalStateException if the stream was already started.
+ */
+ public suspend fun start()
+
+ /**
+ * Release all resources associated with this stream.
+ *
+ * @throws IllegalStateException if the stream was already stopped.
+ */
+ public override fun close()
+}
+
+/**
+ * Register the specified [action] to be performed on events of type [E].
+ */
+public inline fun <reified E : Event> EventStream.onEvent(noinline action: (E) -> Unit) {
+ onEvent(E::class.java, action)
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt
new file mode 100644
index 00000000..4f978f4f
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+import org.opendc.trace.core.internal.EventTracerImpl
+import java.time.Clock
+
+/**
+ * An [EventTracer] is responsible for recording the events that occur in a system.
+ */
+public interface EventTracer : AutoCloseable {
+ /**
+ * The [Clock] used to measure the timestamp and duration of the events.
+ */
+ public val clock: Clock
+
+ /**
+ * Determine whether the specified [Event] class is currently enabled in any of the active recordings.
+ *
+ * @return `true` if the event is enabled, `false` otherwise.
+ */
+ public fun isEnabled(type: Class<out Event>): Boolean
+
+ /**
+ * Commit the specified [event] to the appropriate event streams.
+ */
+ public fun commit(event: Event)
+
+ /**
+ * Create a new [RecordingStream] which is able to actively capture events emitted to the [EventTracer].
+ */
+ public fun openRecording(): RecordingStream
+
+ /**
+ * Terminate the lifecycle of the [EventTracer] and close its associated event streams.
+ */
+ public override fun close()
+
+ public companion object {
+ /**
+ * Construct a new [EventTracer] instance.
+ *
+ * @param clock The [Clock] used to measure the timestamps.
+ */
+ @JvmName("create")
+ public operator fun invoke(clock: Clock): EventTracer = EventTracerImpl(clock)
+ }
+}
+
+/**
+ * Determine whether the [Event] of type [E] is currently enabled in any of the active recordings.
+ *
+ * @return `true` if the event is enabled, `false` otherwise.
+ */
+public inline fun <reified E : Event> EventTracer.isEnabled(): Boolean = isEnabled(E::class.java)
+
+/**
+ * Lazily construct an [Event] of type [E] if it is enabled and commit it to the appropriate event streams.
+ */
+public inline fun <reified E : Event> EventTracer.commit(block: () -> E) {
+ if (isEnabled<E>()) {
+ commit(block())
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt
new file mode 100644
index 00000000..84dcc61a
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.awaitClose
+import kotlinx.coroutines.channels.sendBlocking
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.callbackFlow
+
+/**
+ * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public fun EventStream.asFlow(): Flow<Event> = callbackFlow {
+ onEvent { sendBlocking(it) }
+ onError { cancel(CancellationException("API error", it)) }
+ onClose { channel.close() }
+ awaitClose { this@asFlow.close() }
+}
+
+/**
+ * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public fun EventStream.consumeAsFlow(): Flow<Event> = callbackFlow {
+ onEvent { sendBlocking(it) }
+ onError { cancel(CancellationException("API error", it)) }
+ start()
+}
+
+/**
+ * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public inline fun <reified E : Event> EventStream.asTypedFlow(): Flow<E> = callbackFlow {
+ onEvent<E> { sendBlocking(it) }
+ onError { cancel(CancellationException("API error", it)) }
+ onClose { channel.close() }
+ awaitClose { this@asTypedFlow.close() }
+}
+
+/**
+ * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public inline fun <reified E : Event> EventStream.consumeAsTypedFlow(): Flow<E> = callbackFlow {
+ onEvent<E> { sendBlocking(it) }
+ onError { cancel(CancellationException("API error", it)) }
+ start()
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt
new file mode 100644
index 00000000..f49e7c49
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core
+
+/**
+ * A recording stream that produces events from an [EventTracer].
+ */
+public interface RecordingStream : EventStream {
+ /**
+ * Enable recording of the specified event [type].
+ */
+ public fun enable(type: Class<out Event>)
+
+ /**
+ * Disable recording of the specified event [type]
+ */
+ public fun disable(type: Class<out Event>)
+}
+
+/**
+ * Enable recording of events of type [E].
+ */
+public inline fun <reified E : Event> RecordingStream.enable() {
+ enable(E::class.java)
+}
+
+/**
+ * Disable recording of events of type [E].
+ */
+public inline fun <reified E : Event> RecordingStream.disable() {
+ enable(E::class.java)
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt
new file mode 100644
index 00000000..1887ad7a
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.trace.core.Event
+import org.opendc.trace.core.EventStream
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+
+/**
+ * Base implementation of the [EventStream] implementation.
+ */
+internal abstract class AbstractEventStream : EventStream {
+ /**
+ * The state of the stream.
+ */
+ protected var state = StreamState.Pending
+
+ /**
+ * The event actions to dispatch to.
+ */
+ private val eventActions = mutableListOf<EventDispatcher>()
+
+ /**
+ * The error actions to use.
+ */
+ private val errorActions = mutableListOf<(Throwable) -> Unit>()
+
+ /**
+ * The close actions to use.
+ */
+ private val closeActions = mutableListOf<Runnable>()
+
+ /**
+ * The continuation that is invoked when the stream closes.
+ */
+ private var cont: Continuation<Unit>? = null
+
+ /**
+ * Dispatch the specified [event] to this stream.
+ */
+ fun dispatch(event: Event) {
+ val actions = eventActions
+
+ // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type)
+ for (action in actions) {
+ if (!action.accepts(event)) {
+ continue
+ }
+
+ try {
+ action(event)
+ } catch (e: Exception) {
+ handleError(e)
+ }
+ }
+ }
+
+ /**
+ * Handle the specified [throwable] that occurred while dispatching an event.
+ */
+ private fun handleError(throwable: Throwable) {
+ val actions = errorActions
+
+ // Default exception handler
+ if (actions.isEmpty()) {
+ throwable.printStackTrace()
+ return
+ }
+
+ for (action in actions) {
+ action(throwable)
+ }
+ }
+
+ override fun onEvent(action: (Event) -> Unit) {
+ eventActions += EventDispatcher(null, action)
+ }
+
+ override fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit) {
+ @Suppress("UNCHECKED_CAST") // This cast must succeed
+ eventActions += EventDispatcher(type, action as (Event) -> Unit)
+ }
+
+ override fun onError(action: (Throwable) -> Unit) {
+ errorActions += action
+ }
+
+ override fun onClose(action: Runnable) {
+ closeActions += action
+ }
+
+ override fun remove(action: Any): Boolean {
+ return eventActions.removeIf { it.action == action } || errorActions.remove(action) || closeActions.remove(action)
+ }
+
+ override suspend fun start() {
+ check(state == StreamState.Pending) { "Stream has already started/closed" }
+
+ state = StreamState.Started
+
+ return suspendCancellableCoroutine { cont -> this.cont = cont }
+ }
+
+ override fun close() {
+ if (state != StreamState.Closed) {
+ return
+ }
+
+ state = StreamState.Closed
+ cont?.resume(Unit)
+
+ val actions = closeActions
+ for (action in actions) {
+ action.run()
+ }
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt
new file mode 100644
index 00000000..8b6de75e
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+import org.opendc.trace.core.Event
+
+/**
+ * The [Dispatcher] is responsible for dispatching events onto configured actions.
+ */
+internal class Dispatcher {
+ /**
+ * The event actions to dispatch to.
+ */
+ private val eventActions = mutableListOf<EventDispatcher>()
+
+ /**
+ * The error actions to use.
+ */
+ private val errorActions = mutableListOf<(Throwable) -> Unit>()
+
+ /**
+ * Dispatch the specified [event].
+ */
+ fun dispatch(event: Event) {
+ val actions = eventActions
+
+ // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type)
+ for (action in actions) {
+ if (!action.accepts(event)) {
+ continue
+ }
+
+ try {
+ action(event)
+ } catch (e: Exception) {
+ handleError(e)
+ }
+ }
+ }
+
+ /**
+ * Handle the specified [throwable] that occurred while dispatching an event.
+ */
+ private fun handleError(throwable: Throwable) {
+ val actions = errorActions
+
+ // Default exception handler
+ if (actions.isEmpty()) {
+ throwable.printStackTrace()
+ return
+ }
+
+ for (action in actions) {
+ action(throwable)
+ }
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt
new file mode 100644
index 00000000..b2a662eb
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+import org.opendc.trace.core.Event
+
+/**
+ * A dispatcher responsible for conditionally dispatching an event.
+ */
+internal class EventDispatcher(val type: Class<out Event>?, val action: (Event) -> Unit) {
+ /**
+ * Determine whether this dispatcher accepts the specified event.
+ */
+ fun accepts(event: Event): Boolean {
+ return type == null || type.isAssignableFrom(event.javaClass)
+ }
+
+ /**
+ * Invoke the specified [event] on this action.
+ */
+ operator fun invoke(event: Event) {
+ action(event)
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt
new file mode 100644
index 00000000..e85d0779
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+import org.opendc.trace.core.Event
+import org.opendc.trace.core.EventTracer
+import org.opendc.trace.core.RecordingStream
+import java.lang.reflect.Modifier
+import java.time.Clock
+import java.util.*
+
+/**
+ * Default implementation of the [EventTracer] interface.
+ */
+internal class EventTracerImpl(override val clock: Clock) : EventTracer {
+ /**
+ * The set of enabled events.
+ */
+ private val enabledEvents = IdentityHashMap<Class<out Event>, MutableList<Stream>>()
+
+ /**
+ * The event streams created by the tracer.
+ */
+ private val streams = WeakHashMap<Stream, Unit>()
+
+ /**
+ * A flag to indicate that the stream is closed.
+ */
+ private var isClosed: Boolean = false
+
+ override fun isEnabled(type: Class<out Event>): Boolean = enabledEvents.containsKey(type)
+
+ override fun commit(event: Event) {
+ val type = event.javaClass
+
+ // Assign timestamp if not set
+ if (event.timestamp == Long.MIN_VALUE) {
+ event.timestamp = clock.millis()
+ }
+
+ if (!isEnabled(type) || isClosed) {
+ return
+ }
+
+ val streams = enabledEvents[type] ?: return
+ for (stream in streams) {
+ stream.dispatch(event)
+ }
+ }
+
+ override fun openRecording(): RecordingStream = Stream()
+
+ override fun close() {
+ isClosed = true
+
+ val streams = streams
+ for ((stream, _) in streams) {
+ stream.close()
+ }
+
+ enabledEvents.clear()
+ }
+
+ /**
+ * Enable the specified [type] for the given [stream].
+ */
+ private fun enableFor(type: Class<out Event>, stream: Stream) {
+ val res = enabledEvents.computeIfAbsent(type) { mutableListOf() }
+ res.add(stream)
+ }
+
+ /**
+ * Disable the specified [type] for the given [stream].
+ */
+ private fun disableFor(type: Class<out Event>, stream: Stream) {
+ enabledEvents[type]?.remove(stream)
+ }
+
+ /**
+ * The [RecordingStream] associated with this [EventTracer] implementation.
+ */
+ private inner class Stream : AbstractEventStream(), RecordingStream {
+ /**
+ * The set of enabled events for this stream.
+ */
+ private val enabledEvents = IdentityHashMap<Class<out Event>, Unit>()
+
+ init {
+ streams[this] = Unit
+ }
+
+ override fun enable(type: Class<out Event>) {
+ validateEventClass(type)
+
+ if (enabledEvents.put(type, Unit) == null && state == StreamState.Started) {
+ enableFor(type, this)
+ }
+ }
+
+ override fun disable(type: Class<out Event>) {
+ validateEventClass(type)
+
+ if (enabledEvents.remove(type) != null && state == StreamState.Started) {
+ disableFor(type, this)
+ }
+ }
+
+ override suspend fun start() {
+ val enabledEvents = enabledEvents
+ for ((event, _) in enabledEvents) {
+ enableFor(event, this)
+ }
+
+ super.start()
+ }
+
+ override fun close() {
+ val enabledEvents = enabledEvents
+ for ((event, _) in enabledEvents) {
+ disableFor(event, this)
+ }
+
+ // Remove this stream from the active streams
+ streams.remove(this)
+
+ super.close()
+ }
+
+ /**
+ * Validate the specified event subclass.
+ */
+ private fun validateEventClass(type: Class<out Event>) {
+ require(!Modifier.isAbstract(type.modifiers)) { "Abstract event classes are not allowed" }
+ require(Event::class.java.isAssignableFrom(type)) { "Must be subclass to ${Event::class.qualifiedName}" }
+ }
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt
new file mode 100644
index 00000000..9f411e0d
--- /dev/null
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.core.internal
+
+/**
+ * The state of a [Stream].
+ */
+internal enum class StreamState {
+ Pending, Started, Closed
+}
diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts
index f61bdac6..fa03508c 100644
--- a/simulator/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc-workflows/build.gradle.kts
@@ -30,6 +30,7 @@ plugins {
dependencies {
api(project(":opendc-core"))
api(project(":opendc-compute:opendc-compute-core"))
+ api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index 3b4e6eab..91657f83 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -26,6 +26,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import org.opendc.compute.core.Server
@@ -33,7 +34,9 @@ import org.opendc.compute.core.ServerEvent
import org.opendc.compute.core.ServerState
import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.utils.flow.EventFlow
+import org.opendc.trace.core.EventTracer
+import org.opendc.trace.core.consumeAsFlow
+import org.opendc.trace.core.enable
import org.opendc.workflows.service.stage.job.JobAdmissionPolicy
import org.opendc.workflows.service.stage.job.JobOrderPolicy
import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy
@@ -51,6 +54,7 @@ import java.util.*
public class StageWorkflowService(
internal val coroutineScope: CoroutineScope,
internal val clock: Clock,
+ internal val tracer: EventTracer,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -168,7 +172,6 @@ public class StageWorkflowService(
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
private val resourceFilterPolicy: ResourceFilterPolicy.Logic
private val resourceSelectionPolicy: Comparator<Node>
- private val eventFlow = EventFlow<WorkflowEvent>()
init {
coroutineScope.launch {
@@ -185,7 +188,14 @@ public class StageWorkflowService(
this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
- override val events: Flow<WorkflowEvent> = eventFlow
+ override val events: Flow<WorkflowEvent> = tracer.openRecording().let {
+ it.enable<WorkflowEvent.JobSubmitted>()
+ it.enable<WorkflowEvent.JobStarted>()
+ it.enable<WorkflowEvent.JobFinished>()
+ it.enable<WorkflowEvent.TaskStarted>()
+ it.enable<WorkflowEvent.TaskFinished>()
+ it.consumeAsFlow().map { event -> event as WorkflowEvent }
+ }
override suspend fun submit(job: Job) {
// J1 Incoming Jobs
@@ -209,6 +219,7 @@ public class StageWorkflowService(
instances.values.toCollection(jobInstance.tasks)
incomingJobs += jobInstance
rootListener.jobSubmitted(jobInstance)
+ tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job))
requestCycle()
}
@@ -237,7 +248,7 @@ public class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis()))
+ tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job))
rootListener.jobStarted(jobInstance)
}
@@ -307,12 +318,11 @@ public class StageWorkflowService(
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
- eventFlow.emit(
+ tracer.commit(
WorkflowEvent.TaskStarted(
this@StageWorkflowService,
task.job.job,
- task.task,
- clock.millis()
+ task.task
)
)
rootListener.taskStarted(task)
@@ -325,12 +335,11 @@ public class StageWorkflowService(
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- eventFlow.emit(
+ tracer.commit(
WorkflowEvent.TaskFinished(
this@StageWorkflowService,
task.job.job,
- task.task,
- clock.millis()
+ task.task
)
)
rootListener.taskFinished(task)
@@ -357,7 +366,7 @@ public class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis()))
+ tracer.commit(WorkflowEvent.JobFinished(this, job.job))
rootListener.jobFinished(job)
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt
index dadccb50..bcf93562 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt
@@ -22,25 +22,33 @@
package org.opendc.workflows.service
+import org.opendc.trace.core.Event
import org.opendc.workflows.workload.Job
import org.opendc.workflows.workload.Task
/**
* An event emitted by the [WorkflowService].
*/
-public sealed class WorkflowEvent {
+public sealed class WorkflowEvent : Event() {
/**
* The [WorkflowService] that emitted the event.
*/
public abstract val service: WorkflowService
/**
+ * This event is emitted when a job was submitted to the scheduler.
+ */
+ public data class JobSubmitted(
+ override val service: WorkflowService,
+ public val job: Job
+ ) : WorkflowEvent()
+
+ /**
* This event is emitted when a job has become active.
*/
public data class JobStarted(
override val service: WorkflowService,
- public val job: Job,
- public val time: Long
+ public val job: Job
) : WorkflowEvent()
/**
@@ -48,8 +56,7 @@ public sealed class WorkflowEvent {
*/
public data class JobFinished(
override val service: WorkflowService,
- public val job: Job,
- public val time: Long
+ public val job: Job
) : WorkflowEvent()
/**
@@ -58,8 +65,7 @@ public sealed class WorkflowEvent {
public data class TaskStarted(
override val service: WorkflowService,
public val job: Job,
- public val task: Task,
- public val time: Long
+ public val task: Task
) : WorkflowEvent()
/**
@@ -68,7 +74,6 @@ public sealed class WorkflowEvent {
public data class TaskFinished(
override val service: WorkflowService,
public val job: Job,
- public val task: Task,
- public val time: Long
+ public val task: Task
) : WorkflowEvent()
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt
index 319a8b85..b24f80da 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt
@@ -34,7 +34,7 @@ import java.util.*
*/
public interface WorkflowService {
/**
- * Thie events emitted by the workflow scheduler.
+ * The events emitted by the workflow scheduler.
*/
public val events: Flow<WorkflowEvent>
diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 90cf5b99..62955a11 100644
--- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -35,10 +35,12 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.metal.service.ProvisioningService
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.trace.core.EventTracer
import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
@@ -66,6 +68,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
val testScope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(testScope)
+ val tracer = EventTracer(clock)
val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
@@ -74,6 +77,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
StageWorkflowService(
testScope,
clock,
+ tracer,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
@@ -113,9 +117,11 @@ internal class StageWorkflowSchedulerIntegrationTest {
testScope.advanceUntilIdle()
- assertNotEquals(0, jobsSubmitted, "No jobs submitted")
- assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started")
- assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished")
- assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished")
+ assertAll(
+ { assertNotEquals(0, jobsSubmitted, "No jobs submitted") },
+ { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") },
+ { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") },
+ { assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") }
+ )
}
}
diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts
index 935a18d0..e6f42574 100644
--- a/simulator/settings.gradle.kts
+++ b/simulator/settings.gradle.kts
@@ -32,4 +32,5 @@ include(":opendc-runner-web")
include(":opendc-simulator:opendc-simulator-core")
include(":opendc-simulator:opendc-simulator-compute")
include(":opendc-simulator:opendc-simulator-failures")
+include(":opendc-trace:opendc-trace-core")
include(":opendc-utils")