diff options
Diffstat (limited to 'simulator')
31 files changed, 1092 insertions, 32 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts b/simulator/opendc-compute/opendc-compute-core/build.gradle.kts index 9682b50f..ac2dc78d 100644 --- a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-core/build.gradle.kts @@ -29,6 +29,7 @@ plugins { dependencies { api(project(":opendc-core")) + api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging:1.7.9") diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt new file mode 100644 index 00000000..c1802e64 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt.service.events + +import org.opendc.trace.core.Event +import java.util.* + +/** + * This event is emitted when a hypervisor has become available. + */ +public class HypervisorAvailableEvent(public val uid: UUID) : Event() diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt new file mode 100644 index 00000000..1fea21ea --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt.service.events + +import org.opendc.trace.core.Event +import java.util.* + +/** + * This event is emitted when a hypervisor has become unavailable. + */ +public class HypervisorUnavailableEvent(public val uid: UUID) : Event() diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt new file mode 100644 index 00000000..662068dd --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt.service.events + +import org.opendc.trace.core.Event + +/** + * This event is emitted when a virtual machine has successfully been scheduled on a hypervisor. + */ +public class VmScheduledEvent(public val name: String) : Event() diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt new file mode 100644 index 00000000..96103129 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt.service.events + +import org.opendc.trace.core.Event + +/** + * This event is emitted when a virtual machine has stopped running. + */ +public class VmStoppedEvent(public val name: String) : Event() diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt new file mode 100644 index 00000000..f6b71e22 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt.service.events + +import org.opendc.compute.core.Flavor +import org.opendc.compute.core.image.Image +import org.opendc.trace.core.Event + +/** + * This event is emitted when a virtual machine is submitted to the provisioning service. + */ +public class VmSubmissionEvent(public val name: String, public val image: Image, public val flavor: Flavor) : Event() diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt new file mode 100644 index 00000000..d0e5c102 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt.service.events + +import org.opendc.trace.core.Event + +/** + * An event that is emitted when the submission is deemed to be invalid. + */ +public class VmSubmissionInvalidEvent(public val name: String) : Event() diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt index e83370d7..0144fd69 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt @@ -38,7 +38,9 @@ import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException import org.opendc.compute.core.virt.driver.VirtDriver import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.compute.core.virt.service.VirtProvisioningService +import org.opendc.compute.core.virt.service.events.* import org.opendc.compute.simulator.allocation.AllocationPolicy +import org.opendc.trace.core.EventTracer import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* @@ -51,7 +53,8 @@ public class SimVirtProvisioningService( private val coroutineScope: CoroutineScope, private val clock: Clock, private val provisioningService: ProvisioningService, - public val allocationPolicy: AllocationPolicy + public val allocationPolicy: AllocationPolicy, + private val tracer: EventTracer ) : VirtProvisioningService { /** * The logger instance to use. @@ -136,6 +139,8 @@ public class SimVirtProvisioningService( image: Image, flavor: Flavor ): Server { + tracer.commit(VmSubmissionEvent(name, image, flavor)) + eventFlow.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, @@ -191,6 +196,8 @@ public class SimVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { + tracer.commit(VmSubmissionInvalidEvent(imageInstance.name)) + eventFlow.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, @@ -231,6 +238,8 @@ public class SimVirtProvisioningService( imageInstance.server = server imageInstance.continuation.resume(server) + tracer.commit(VmScheduledEvent(imageInstance.name)) + eventFlow.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, @@ -252,6 +261,8 @@ public class SimVirtProvisioningService( if (event.server.state == ServerState.SHUTOFF) { logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } + tracer.commit(VmStoppedEvent(event.server.name)) + eventFlow.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, @@ -310,6 +321,8 @@ public class SimVirtProvisioningService( hypervisors[server] = hv } + tracer.commit(HypervisorAvailableEvent(server.uid)) + eventFlow.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, @@ -333,6 +346,8 @@ public class SimVirtProvisioningService( val hv = hypervisors[server] ?: return availableHypervisors -= hv + tracer.commit(HypervisorUnavailableEvent(hv.uid)) + eventFlow.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, @@ -359,6 +374,8 @@ public class SimVirtProvisioningService( hv.driver = hypervisor availableHypervisors += hv + tracer.commit(HypervisorAvailableEvent(hv.uid)) + eventFlow.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt index 3786eebf..1221c7d3 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt @@ -31,6 +31,7 @@ import org.opendc.compute.core.metal.service.ProvisioningService import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer import org.opendc.workflows.service.StageWorkflowService import org.opendc.workflows.service.WorkflowEvent import org.opendc.workflows.service.WorkflowSchedulerMode @@ -59,6 +60,7 @@ public fun main(args: Array<String>) { val token = Channel<Boolean>() val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) + val tracer = EventTracer(clock) val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) @@ -67,6 +69,7 @@ public fun main(args: Array<String>) { StageWorkflowService( this, clock, + tracer, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 09f44199..f939738d 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -52,6 +52,7 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FailureDomain import org.opendc.simulator.failures.FaultInjector +import org.opendc.trace.core.EventTracer import java.io.File import java.time.Clock import kotlin.math.ln @@ -140,7 +141,8 @@ public suspend fun createProvisioner( coroutineScope: CoroutineScope, clock: Clock, environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy + allocationPolicy: AllocationPolicy, + eventTracer: EventTracer ): Pair<ProvisioningService, SimVirtProvisioningService> { val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] @@ -148,7 +150,7 @@ public suspend fun createProvisioner( // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy) + val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer) // Wait for the hypervisors to be spawned delay(10) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt index 660fc882..8d8d608d 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt @@ -37,6 +37,7 @@ import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer import java.io.File import kotlin.random.Random @@ -102,12 +103,15 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) parent.parent.parent.bufferSize ) + val tracer = EventTracer(clock) + testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( this, clock, environment, - allocationPolicy + allocationPolicy, + tracer ) val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) { diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt index 9c44edfc..72a2484a 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -48,6 +48,7 @@ import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer import java.io.File import java.time.Clock @@ -97,13 +98,15 @@ class Sc20IntegrationTest { val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() lateinit var scheduler: SimVirtProvisioningService + val tracer = EventTracer(clock) testScope.launch { val res = createProvisioner( this, clock, environmentReader, - allocationPolicy + allocationPolicy, + tracer ) val bareMetalProvisioner = res.first scheduler = res.second @@ -160,13 +163,15 @@ class Sc20IntegrationTest { val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") lateinit var scheduler: SimVirtProvisioningService + val tracer = EventTracer(clock) testScope.launch { val res = createProvisioner( this, clock, environmentReader, - allocationPolicy + allocationPolicy, + tracer ) scheduler = res.second diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 2117b675..7796019a 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -50,6 +50,7 @@ import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer import java.io.File import kotlin.coroutines.coroutineContext import kotlin.random.Random @@ -238,13 +239,15 @@ public class RunnerCli : CliktCommand(name = "runner") { val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = TopologyParser(topologies, topologyId) val monitor = WebExperimentMonitor() + val tracer = EventTracer(clock) testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( this, clock, environment, - allocationPolicy + allocationPolicy, + tracer ) val failureDomain = if (operational.getBoolean("failuresEnabled")) { diff --git a/simulator/opendc-trace/build.gradle.kts b/simulator/opendc-trace/build.gradle.kts new file mode 100644 index 00000000..a1a751a2 --- /dev/null +++ b/simulator/opendc-trace/build.gradle.kts @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ diff --git a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts b/simulator/opendc-trace/opendc-trace-core/build.gradle.kts new file mode 100644 index 00000000..3db6669a --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/build.gradle.kts @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Event tracing library for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-convention` +} + +dependencies { + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Library.KOTLINX_COROUTINES}") +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt new file mode 100644 index 00000000..1f4bb267 --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +/** + * Base class for events reported by the OpenDC tracing library. + */ +public abstract class Event(timestamp: Long = Long.MIN_VALUE) { + /** + * The timestamp at which the event has occurred. + */ + public var timestamp: Long = timestamp + internal set +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt new file mode 100644 index 00000000..ac2b5e9b --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +/** + * A stream of [Event]s. + */ +public interface EventStream : AutoCloseable { + /** + * Register the specified [action] to be performed on every event in the stream. + */ + public fun onEvent(action: (Event) -> Unit) + + /** + * Register the specified [action] to be performed on events of type [E]. + */ + public fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit) + + /** + * Register the specified [action] to be performed on errors. + */ + public fun onError(action: (Throwable) -> Unit) + + /** + * Register the specified [action] to be performed when the stream is closed. + */ + public fun onClose(action: Runnable) + + /** + * Unregister the specified [action]. + * + * @return `true` if an action was unregistered, `false` otherwise. + */ + public fun remove(action: Any): Boolean + + /** + * Start the processing of events in the current coroutine. + * + * @throws IllegalStateException if the stream was already started. + */ + public suspend fun start() + + /** + * Release all resources associated with this stream. + * + * @throws IllegalStateException if the stream was already stopped. + */ + public override fun close() +} + +/** + * Register the specified [action] to be performed on events of type [E]. + */ +public inline fun <reified E : Event> EventStream.onEvent(noinline action: (E) -> Unit) { + onEvent(E::class.java, action) +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt new file mode 100644 index 00000000..4f978f4f --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +import org.opendc.trace.core.internal.EventTracerImpl +import java.time.Clock + +/** + * An [EventTracer] is responsible for recording the events that occur in a system. + */ +public interface EventTracer : AutoCloseable { + /** + * The [Clock] used to measure the timestamp and duration of the events. + */ + public val clock: Clock + + /** + * Determine whether the specified [Event] class is currently enabled in any of the active recordings. + * + * @return `true` if the event is enabled, `false` otherwise. + */ + public fun isEnabled(type: Class<out Event>): Boolean + + /** + * Commit the specified [event] to the appropriate event streams. + */ + public fun commit(event: Event) + + /** + * Create a new [RecordingStream] which is able to actively capture events emitted to the [EventTracer]. + */ + public fun openRecording(): RecordingStream + + /** + * Terminate the lifecycle of the [EventTracer] and close its associated event streams. + */ + public override fun close() + + public companion object { + /** + * Construct a new [EventTracer] instance. + * + * @param clock The [Clock] used to measure the timestamps. + */ + @JvmName("create") + public operator fun invoke(clock: Clock): EventTracer = EventTracerImpl(clock) + } +} + +/** + * Determine whether the [Event] of type [E] is currently enabled in any of the active recordings. + * + * @return `true` if the event is enabled, `false` otherwise. + */ +public inline fun <reified E : Event> EventTracer.isEnabled(): Boolean = isEnabled(E::class.java) + +/** + * Lazily construct an [Event] of type [E] if it is enabled and commit it to the appropriate event streams. + */ +public inline fun <reified E : Event> EventTracer.commit(block: () -> E) { + if (isEnabled<E>()) { + commit(block()) + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt new file mode 100644 index 00000000..84dcc61a --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.sendBlocking +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow + +/** + * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public fun EventStream.asFlow(): Flow<Event> = callbackFlow { + onEvent { sendBlocking(it) } + onError { cancel(CancellationException("API error", it)) } + onClose { channel.close() } + awaitClose { this@asFlow.close() } +} + +/** + * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public fun EventStream.consumeAsFlow(): Flow<Event> = callbackFlow { + onEvent { sendBlocking(it) } + onError { cancel(CancellationException("API error", it)) } + start() +} + +/** + * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public inline fun <reified E : Event> EventStream.asTypedFlow(): Flow<E> = callbackFlow { + onEvent<E> { sendBlocking(it) } + onError { cancel(CancellationException("API error", it)) } + onClose { channel.close() } + awaitClose { this@asTypedFlow.close() } +} + +/** + * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public inline fun <reified E : Event> EventStream.consumeAsTypedFlow(): Flow<E> = callbackFlow { + onEvent<E> { sendBlocking(it) } + onError { cancel(CancellationException("API error", it)) } + start() +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt new file mode 100644 index 00000000..f49e7c49 --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core + +/** + * A recording stream that produces events from an [EventTracer]. + */ +public interface RecordingStream : EventStream { + /** + * Enable recording of the specified event [type]. + */ + public fun enable(type: Class<out Event>) + + /** + * Disable recording of the specified event [type] + */ + public fun disable(type: Class<out Event>) +} + +/** + * Enable recording of events of type [E]. + */ +public inline fun <reified E : Event> RecordingStream.enable() { + enable(E::class.java) +} + +/** + * Disable recording of events of type [E]. + */ +public inline fun <reified E : Event> RecordingStream.disable() { + enable(E::class.java) +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt new file mode 100644 index 00000000..1887ad7a --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.trace.core.Event +import org.opendc.trace.core.EventStream +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume + +/** + * Base implementation of the [EventStream] implementation. + */ +internal abstract class AbstractEventStream : EventStream { + /** + * The state of the stream. + */ + protected var state = StreamState.Pending + + /** + * The event actions to dispatch to. + */ + private val eventActions = mutableListOf<EventDispatcher>() + + /** + * The error actions to use. + */ + private val errorActions = mutableListOf<(Throwable) -> Unit>() + + /** + * The close actions to use. + */ + private val closeActions = mutableListOf<Runnable>() + + /** + * The continuation that is invoked when the stream closes. + */ + private var cont: Continuation<Unit>? = null + + /** + * Dispatch the specified [event] to this stream. + */ + fun dispatch(event: Event) { + val actions = eventActions + + // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type) + for (action in actions) { + if (!action.accepts(event)) { + continue + } + + try { + action(event) + } catch (e: Exception) { + handleError(e) + } + } + } + + /** + * Handle the specified [throwable] that occurred while dispatching an event. + */ + private fun handleError(throwable: Throwable) { + val actions = errorActions + + // Default exception handler + if (actions.isEmpty()) { + throwable.printStackTrace() + return + } + + for (action in actions) { + action(throwable) + } + } + + override fun onEvent(action: (Event) -> Unit) { + eventActions += EventDispatcher(null, action) + } + + override fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit) { + @Suppress("UNCHECKED_CAST") // This cast must succeed + eventActions += EventDispatcher(type, action as (Event) -> Unit) + } + + override fun onError(action: (Throwable) -> Unit) { + errorActions += action + } + + override fun onClose(action: Runnable) { + closeActions += action + } + + override fun remove(action: Any): Boolean { + return eventActions.removeIf { it.action == action } || errorActions.remove(action) || closeActions.remove(action) + } + + override suspend fun start() { + check(state == StreamState.Pending) { "Stream has already started/closed" } + + state = StreamState.Started + + return suspendCancellableCoroutine { cont -> this.cont = cont } + } + + override fun close() { + if (state != StreamState.Closed) { + return + } + + state = StreamState.Closed + cont?.resume(Unit) + + val actions = closeActions + for (action in actions) { + action.run() + } + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt new file mode 100644 index 00000000..8b6de75e --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +import org.opendc.trace.core.Event + +/** + * The [Dispatcher] is responsible for dispatching events onto configured actions. + */ +internal class Dispatcher { + /** + * The event actions to dispatch to. + */ + private val eventActions = mutableListOf<EventDispatcher>() + + /** + * The error actions to use. + */ + private val errorActions = mutableListOf<(Throwable) -> Unit>() + + /** + * Dispatch the specified [event]. + */ + fun dispatch(event: Event) { + val actions = eventActions + + // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type) + for (action in actions) { + if (!action.accepts(event)) { + continue + } + + try { + action(event) + } catch (e: Exception) { + handleError(e) + } + } + } + + /** + * Handle the specified [throwable] that occurred while dispatching an event. + */ + private fun handleError(throwable: Throwable) { + val actions = errorActions + + // Default exception handler + if (actions.isEmpty()) { + throwable.printStackTrace() + return + } + + for (action in actions) { + action(throwable) + } + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt new file mode 100644 index 00000000..b2a662eb --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +import org.opendc.trace.core.Event + +/** + * A dispatcher responsible for conditionally dispatching an event. + */ +internal class EventDispatcher(val type: Class<out Event>?, val action: (Event) -> Unit) { + /** + * Determine whether this dispatcher accepts the specified event. + */ + fun accepts(event: Event): Boolean { + return type == null || type.isAssignableFrom(event.javaClass) + } + + /** + * Invoke the specified [event] on this action. + */ + operator fun invoke(event: Event) { + action(event) + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt new file mode 100644 index 00000000..e85d0779 --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +import org.opendc.trace.core.Event +import org.opendc.trace.core.EventTracer +import org.opendc.trace.core.RecordingStream +import java.lang.reflect.Modifier +import java.time.Clock +import java.util.* + +/** + * Default implementation of the [EventTracer] interface. + */ +internal class EventTracerImpl(override val clock: Clock) : EventTracer { + /** + * The set of enabled events. + */ + private val enabledEvents = IdentityHashMap<Class<out Event>, MutableList<Stream>>() + + /** + * The event streams created by the tracer. + */ + private val streams = WeakHashMap<Stream, Unit>() + + /** + * A flag to indicate that the stream is closed. + */ + private var isClosed: Boolean = false + + override fun isEnabled(type: Class<out Event>): Boolean = enabledEvents.containsKey(type) + + override fun commit(event: Event) { + val type = event.javaClass + + // Assign timestamp if not set + if (event.timestamp == Long.MIN_VALUE) { + event.timestamp = clock.millis() + } + + if (!isEnabled(type) || isClosed) { + return + } + + val streams = enabledEvents[type] ?: return + for (stream in streams) { + stream.dispatch(event) + } + } + + override fun openRecording(): RecordingStream = Stream() + + override fun close() { + isClosed = true + + val streams = streams + for ((stream, _) in streams) { + stream.close() + } + + enabledEvents.clear() + } + + /** + * Enable the specified [type] for the given [stream]. + */ + private fun enableFor(type: Class<out Event>, stream: Stream) { + val res = enabledEvents.computeIfAbsent(type) { mutableListOf() } + res.add(stream) + } + + /** + * Disable the specified [type] for the given [stream]. + */ + private fun disableFor(type: Class<out Event>, stream: Stream) { + enabledEvents[type]?.remove(stream) + } + + /** + * The [RecordingStream] associated with this [EventTracer] implementation. + */ + private inner class Stream : AbstractEventStream(), RecordingStream { + /** + * The set of enabled events for this stream. + */ + private val enabledEvents = IdentityHashMap<Class<out Event>, Unit>() + + init { + streams[this] = Unit + } + + override fun enable(type: Class<out Event>) { + validateEventClass(type) + + if (enabledEvents.put(type, Unit) == null && state == StreamState.Started) { + enableFor(type, this) + } + } + + override fun disable(type: Class<out Event>) { + validateEventClass(type) + + if (enabledEvents.remove(type) != null && state == StreamState.Started) { + disableFor(type, this) + } + } + + override suspend fun start() { + val enabledEvents = enabledEvents + for ((event, _) in enabledEvents) { + enableFor(event, this) + } + + super.start() + } + + override fun close() { + val enabledEvents = enabledEvents + for ((event, _) in enabledEvents) { + disableFor(event, this) + } + + // Remove this stream from the active streams + streams.remove(this) + + super.close() + } + + /** + * Validate the specified event subclass. + */ + private fun validateEventClass(type: Class<out Event>) { + require(!Modifier.isAbstract(type.modifiers)) { "Abstract event classes are not allowed" } + require(Event::class.java.isAssignableFrom(type)) { "Must be subclass to ${Event::class.qualifiedName}" } + } + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt new file mode 100644 index 00000000..9f411e0d --- /dev/null +++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.core.internal + +/** + * The state of a [Stream]. + */ +internal enum class StreamState { + Pending, Started, Closed +} diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts index f61bdac6..fa03508c 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflows/build.gradle.kts @@ -30,6 +30,7 @@ plugins { dependencies { api(project(":opendc-core")) api(project(":opendc-compute:opendc-compute-core")) + api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) testImplementation(project(":opendc-simulator:opendc-simulator-core")) diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index 3b4e6eab..91657f83 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import org.opendc.compute.core.Server @@ -33,7 +34,9 @@ import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.utils.flow.EventFlow +import org.opendc.trace.core.EventTracer +import org.opendc.trace.core.consumeAsFlow +import org.opendc.trace.core.enable import org.opendc.workflows.service.stage.job.JobAdmissionPolicy import org.opendc.workflows.service.stage.job.JobOrderPolicy import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy @@ -51,6 +54,7 @@ import java.util.* public class StageWorkflowService( internal val coroutineScope: CoroutineScope, internal val clock: Clock, + internal val tracer: EventTracer, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -168,7 +172,6 @@ public class StageWorkflowService( private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private val resourceFilterPolicy: ResourceFilterPolicy.Logic private val resourceSelectionPolicy: Comparator<Node> - private val eventFlow = EventFlow<WorkflowEvent>() init { coroutineScope.launch { @@ -185,7 +188,14 @@ public class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override val events: Flow<WorkflowEvent> = eventFlow + override val events: Flow<WorkflowEvent> = tracer.openRecording().let { + it.enable<WorkflowEvent.JobSubmitted>() + it.enable<WorkflowEvent.JobStarted>() + it.enable<WorkflowEvent.JobFinished>() + it.enable<WorkflowEvent.TaskStarted>() + it.enable<WorkflowEvent.TaskFinished>() + it.consumeAsFlow().map { event -> event as WorkflowEvent } + } override suspend fun submit(job: Job) { // J1 Incoming Jobs @@ -209,6 +219,7 @@ public class StageWorkflowService( instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) + tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) requestCycle() } @@ -237,7 +248,7 @@ public class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis())) + tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job)) rootListener.jobStarted(jobInstance) } @@ -307,12 +318,11 @@ public class StageWorkflowService( ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - eventFlow.emit( + tracer.commit( WorkflowEvent.TaskStarted( this@StageWorkflowService, task.job.job, - task.task, - clock.millis() + task.task ) ) rootListener.taskStarted(task) @@ -325,12 +335,11 @@ public class StageWorkflowService( job.tasks.remove(task) available += task.host!! activeTasks -= task - eventFlow.emit( + tracer.commit( WorkflowEvent.TaskFinished( this@StageWorkflowService, task.job.job, - task.task, - clock.millis() + task.task ) ) rootListener.taskFinished(task) @@ -357,7 +366,7 @@ public class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis())) + tracer.commit(WorkflowEvent.JobFinished(this, job.job)) rootListener.jobFinished(job) } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt index dadccb50..bcf93562 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt @@ -22,25 +22,33 @@ package org.opendc.workflows.service +import org.opendc.trace.core.Event import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task /** * An event emitted by the [WorkflowService]. */ -public sealed class WorkflowEvent { +public sealed class WorkflowEvent : Event() { /** * The [WorkflowService] that emitted the event. */ public abstract val service: WorkflowService /** + * This event is emitted when a job was submitted to the scheduler. + */ + public data class JobSubmitted( + override val service: WorkflowService, + public val job: Job + ) : WorkflowEvent() + + /** * This event is emitted when a job has become active. */ public data class JobStarted( override val service: WorkflowService, - public val job: Job, - public val time: Long + public val job: Job ) : WorkflowEvent() /** @@ -48,8 +56,7 @@ public sealed class WorkflowEvent { */ public data class JobFinished( override val service: WorkflowService, - public val job: Job, - public val time: Long + public val job: Job ) : WorkflowEvent() /** @@ -58,8 +65,7 @@ public sealed class WorkflowEvent { public data class TaskStarted( override val service: WorkflowService, public val job: Job, - public val task: Task, - public val time: Long + public val task: Task ) : WorkflowEvent() /** @@ -68,7 +74,6 @@ public sealed class WorkflowEvent { public data class TaskFinished( override val service: WorkflowService, public val job: Job, - public val task: Task, - public val time: Long + public val task: Task ) : WorkflowEvent() } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt index 319a8b85..b24f80da 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt @@ -34,7 +34,7 @@ import java.util.* */ public interface WorkflowService { /** - * Thie events emitted by the workflow scheduler. + * The events emitted by the workflow scheduler. */ public val events: Flow<WorkflowEvent> diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 90cf5b99..62955a11 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -35,10 +35,12 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll import org.opendc.compute.core.metal.service.ProvisioningService import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy @@ -66,6 +68,7 @@ internal class StageWorkflowSchedulerIntegrationTest { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) + val tracer = EventTracer(clock) val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) @@ -74,6 +77,7 @@ internal class StageWorkflowSchedulerIntegrationTest { StageWorkflowService( testScope, clock, + tracer, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, @@ -113,9 +117,11 @@ internal class StageWorkflowSchedulerIntegrationTest { testScope.advanceUntilIdle() - assertNotEquals(0, jobsSubmitted, "No jobs submitted") - assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") - assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") - assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") + assertAll( + { assertNotEquals(0, jobsSubmitted, "No jobs submitted") }, + { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") }, + { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") }, + { assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") } + ) } } diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index 935a18d0..e6f42574 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -32,4 +32,5 @@ include(":opendc-runner-web") include(":opendc-simulator:opendc-simulator-core") include(":opendc-simulator:opendc-simulator-compute") include(":opendc-simulator:opendc-simulator-failures") +include(":opendc-trace:opendc-trace-core") include(":opendc-utils") |
