diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-27 12:48:51 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-27 12:48:51 +0100 |
| commit | bef2b2fc9ab97941613ec4537ebca1eb3fccdee6 (patch) | |
| tree | 4a4c1e0fb2de6aa0a0b9810f9c255e039a7ce47b /simulator/opendc-workflow/opendc-workflow-service | |
| parent | 57ebc8a1c6779d7e7276754838e83f1c026cceb9 (diff) | |
| parent | 5b0eaf76ec00192c755b268b7655f6463c5bc62f (diff) | |
Integrate OpenTelemetry into OpenDC
This pull request resolves issue #91 by adopting the industry-standard OpenTelemetry standard
for collecting traces and metrics in OpenDC simulations:
* Metrics will now be exposed through OpenTelemetry's metrics API
* `opendc-telemetry` provides complementary code to support
gathering telemetry in OpenDC.
**Breaking API Changes**
* `opendc-tracer` has been removed.
* `EventFlow` and all usages of it have been removed.
* `opendc-experiments-sc18` has been removed for now, but a
suitable replacement will follow soon.
Diffstat (limited to 'simulator/opendc-workflow/opendc-workflow-service')
7 files changed, 258 insertions, 275 deletions
diff --git a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts index bec18ae9..040a9ff6 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -33,13 +33,14 @@ dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-workflow:opendc-workflow-api")) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-trace:opendc-trace-core")) + api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) testImplementation(project(":opendc-compute:opendc-compute-simulator")) testImplementation(project(":opendc-format")) + testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk")) testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") } diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt deleted file mode 100644 index bb2ad6c6..00000000 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import org.opendc.trace.core.Event -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task - -/** - * An event emitted by the [WorkflowService]. - */ -public sealed class WorkflowEvent : Event() { - /** - * The [WorkflowService] that emitted the event. - */ - public abstract val service: WorkflowService - - /** - * This event is emitted when a job was submitted to the scheduler. - */ - public data class JobSubmitted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has become active. - */ - public data class JobStarted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has finished processing. - */ - public data class JobFinished( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskStarted( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskFinished( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 2f83e376..d3358ef1 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,9 +22,8 @@ package org.opendc.workflow.service -import kotlinx.coroutines.flow.Flow +import io.opentelemetry.api.metrics.Meter import org.opendc.compute.api.ComputeClient -import org.opendc.trace.core.EventTracer import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode @@ -42,14 +41,14 @@ import kotlin.coroutines.CoroutineContext */ public interface WorkflowService : AutoCloseable { /** - * The events emitted by the workflow scheduler. + * Submit the specified [Job] to the workflow service for scheduling. */ - public val events: Flow<WorkflowEvent> + public suspend fun submit(job: Job) /** - * Submit the specified [Job] to the workflow service for scheduling. + * Run the specified [Job] and suspend execution until the job is finished. */ - public suspend fun submit(job: Job) + public suspend fun run(job: Job) /** * Terminate the lifecycle of the workflow service, stopping all running workflows. @@ -63,6 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. + * @param meter The meter to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - tracer: EventTracer, + meter: Meter, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - tracer, + meter, compute, mode, jobAdmissionPolicy, diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 85a88acd..32191b8f 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,17 +22,11 @@ package org.opendc.workflow.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.flow.Flow +import io.opentelemetry.api.metrics.Meter +import kotlinx.coroutines.* import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* -import org.opendc.trace.core.EventTracer -import org.opendc.trace.core.consumeAsFlow -import org.opendc.trace.core.enable import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* @@ -43,7 +37,9 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock import java.util.* +import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -52,7 +48,7 @@ import kotlin.coroutines.CoroutineContext public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - internal val tracer: EventTracer, + private val meter: Meter, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -63,7 +59,7 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - internal val scope = CoroutineScope(context) + internal val scope = CoroutineScope(context + Job()) /** * The logger instance to use. @@ -106,6 +102,11 @@ public class WorkflowServiceImpl( internal val taskByServer = mutableMapOf<Server, TaskState>() /** + * The continuation of the jobs. + */ + private val conts = mutableMapOf<Job, Continuation<Unit>>() + + /** * The root listener of this scheduler. */ private val rootListener = object : WorkflowSchedulerListener { @@ -151,6 +152,54 @@ public class WorkflowServiceImpl( } } + /** + * The number of jobs that have been submitted to the service. + */ + private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + .setDescription("Number of submitted jobs") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + .setDescription("Number of jobs running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedJobs = meter.longCounterBuilder("jobs.finished") + .setDescription("Number of jobs that finished running") + .setUnit("1") + .build() + + /** + * The number of tasks that have been submitted to the service. + */ + private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + .setDescription("Number of submitted tasks") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + .setDescription("Number of tasks running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedTasks = meter.longCounterBuilder("tasks.finished") + .setDescription("Number of tasks that finished running") + .setUnit("1") + .build() + private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic @@ -167,16 +216,7 @@ public class WorkflowServiceImpl( } } - override val events: Flow<WorkflowEvent> = tracer.openRecording().let { - it.enable<WorkflowEvent.JobSubmitted>() - it.enable<WorkflowEvent.JobStarted>() - it.enable<WorkflowEvent.JobFinished>() - it.enable<WorkflowEvent.TaskStarted>() - it.enable<WorkflowEvent.TaskFinished>() - it.consumeAsFlow().map { event -> event as WorkflowEvent } - } - - override suspend fun submit(job: Job) { + override suspend fun run(job: Job) { // J1 Incoming Jobs val jobInstance = JobState(job, clock.millis()) val instances = job.tasks.associateWith { @@ -193,14 +233,24 @@ public class WorkflowServiceImpl( if (instance.isRoot) { instance.state = TaskStatus.READY } + + submittedTasks.add(1) } - instances.values.toCollection(jobInstance.tasks) - incomingJobs += jobInstance - rootListener.jobSubmitted(jobInstance) - tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) + return suspendCancellableCoroutine { cont -> + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + conts[job] = cont + + submittedJobs.add(1) + + requestCycle() + } + } - requestCycle() + override suspend fun submit(job: Job) { + scope.launch { run(job) } } override fun close() { @@ -231,12 +281,8 @@ public class WorkflowServiceImpl( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - tracer.commit( - WorkflowEvent.JobStarted( - this, - jobInstance.job - ) - ) + + runningJobs.add(1) rootListener.jobStarted(jobInstance) } @@ -311,18 +357,11 @@ public class WorkflowServiceImpl( public override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { - ServerState.PROVISIONING -> { - } + ServerState.PROVISIONING -> {} ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - tracer.commit( - WorkflowEvent.TaskStarted( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) + runningTasks.add(1) rootListener.taskStarted(task) } ServerState.TERMINATED, ServerState.ERROR -> { @@ -338,13 +377,9 @@ public class WorkflowServiceImpl( task.finishedAt = clock.millis() job.tasks.remove(task) activeTasks -= task - tracer.commit( - WorkflowEvent.TaskFinished( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) + + runningTasks.add(-1) + finishedTasks.add(1) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -371,8 +406,11 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job - tracer.commit(WorkflowEvent.JobFinished(this, job.job)) + runningJobs.add(-1) + finishedJobs.add(1) rootListener.jobFinished(job) + + conts.remove(job.job)?.resume(Unit) } public fun addListener(listener: WorkflowSchedulerListener) { diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt deleted file mode 100644 index 2161f5f2..00000000 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNotEquals -import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import org.opendc.workflow.service.internal.WorkflowServiceImpl -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode -import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy -import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy -import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import kotlin.math.max - -/** - * Integration test suite for the [WorkflowServiceImpl]. - */ -@DisplayName("WorkflowServiceImpl") -@OptIn(ExperimentalCoroutinesApi::class) -internal class StageWorkflowSchedulerIntegrationTest { - /** - * A large integration test where we check whether all tasks in some trace are executed correctly. - */ - @Test - fun testTrace() { - var jobsSubmitted = 0L - var jobsStarted = 0L - var jobsFinished = 0L - var tasksStarted = 0L - var tasksFinished = 0L - - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) - - val scheduler = let { - val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - testScope.coroutineContext, - clock, - SimSpaceSharedHypervisorProvider() - ) - } - - val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) - - hosts.forEach { compute.addHost(it) } - - WorkflowService( - testScope.coroutineContext, - clock, - tracer, - compute.newClient(), - mode = WorkflowSchedulerMode.Batch(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - } - - testScope.launch { - scheduler.events - .onEach { event -> - when (event) { - is WorkflowEvent.JobStarted -> jobsStarted++ - is WorkflowEvent.JobFinished -> jobsFinished++ - is WorkflowEvent.TaskStarted -> tasksStarted++ - is WorkflowEvent.TaskFinished -> tasksFinished++ - } - } - .collect() - } - - testScope.launch { - val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) - - while (reader.hasNext()) { - val entry = reader.next() - jobsSubmitted++ - delay(max(0, entry.start - clock.millis())) - scheduler.submit(entry.workload) - } - } - - testScope.advanceUntilIdle() - - assertAll( - { assertEquals(emptyList<Throwable>(), testScope.uncaughtExceptions) }, - { assertNotEquals(0, jobsSubmitted, "No jobs submitted") }, - { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") }, - { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") }, - { assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") } - ) - } -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt new file mode 100644 index 00000000..46c0d835 --- /dev/null +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHost +import org.opendc.format.environment.sc18.Sc18EnvironmentReader +import org.opendc.format.trace.gwf.GwfTraceReader +import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import kotlin.math.max + +/** + * Integration test suite for the [WorkflowServiceImpl]. + */ +@DisplayName("WorkflowServiceImpl") +@OptIn(ExperimentalCoroutinesApi::class) +internal class WorkflowServiceIntegrationTest { + /** + * A large integration test where we check whether all tasks in some trace are executed correctly. + */ + @Test + fun testTrace() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) + .use { it.read() } + .map { def -> + SimHost( + def.uid, + def.name, + def.model, + def.meta, + coroutineContext, + clock, + MeterProvider.noop().get("opendc-compute-simulator"), + SimSpaceSharedHypervisorProvider() + ) + } + + val meter = MeterProvider.noop().get("opendc-compute") + val compute = ComputeService(coroutineContext, clock, meter, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + + hosts.forEach { compute.addHost(it) } + + val scheduler = WorkflowService( + coroutineContext, + clock, + meterProvider.get("opendc-workflow"), + compute.newClient(), + mode = WorkflowSchedulerMode.Batch(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ) + + val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) + var offset = Long.MIN_VALUE + + coroutineScope { + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + delay(max(0, (entry.start - offset) - clock.millis())) + launch { + scheduler.run(entry.workload) + } + } + } + + hosts.forEach(SimHost::close) + scheduler.close() + compute.close() + + val metrics = collectMetrics(meterProvider as MetricProducer) + + assertAll( + { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, + { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, + { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") } + ) + } + + class WorkflowMetrics { + var jobsSubmitted = 0L + var jobsActive = 0L + var jobsFinished = 0L + var tasksSubmitted = 0L + var tasksActive = 0L + var tasksFinished = 0L + } + + /** + * Collect the metrics of the workflow service. + */ + private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics { + val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + val res = WorkflowMetrics() + res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0 + res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0 + res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0 + return res + } +} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml index 70a0eacc..edcf41ed 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml @@ -28,6 +28,9 @@ </Console> </Appenders> <Loggers> + <Logger name="org.opendc.workflow" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> <Root level="warn"> <AppenderRef ref="Console"/> </Root> |
