summaryrefslogtreecommitdiff
path: root/opendc-workflow
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-14 14:41:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-17 16:48:53 +0200
commit3ca64e0110adab65526a0ccfd5b252e9f047ab10 (patch)
tree9cad172501154d01b00a1e5c45d94d7e2f1e5698 /opendc-workflow
parent5f0b6b372487d79594cf59010822e160f351e0be (diff)
refactor(telemetry): Create separate MeterProvider per service/host
This change refactors the telemetry implementation by creating a separate MeterProvider per service or host. This means we have to keep track of multiple metric producers, but that we can attach resource information to each of the MeterProviders like we would in a real world scenario.
Diffstat (limited to 'opendc-workflow')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt8
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt8
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt8
3 files changed, 15 insertions, 9 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index d3358ef1..a0248a93 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,7 +22,7 @@
package org.opendc.workflow.service
-import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -62,7 +62,7 @@ public interface WorkflowService : AutoCloseable {
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param tracer The event tracer to use.
- * @param meter The meter to use.
+ * @param meterProvider The meter provider to use.
* @param compute The compute client to use.
* @param mode The scheduling mode to use.
* @param jobAdmissionPolicy The job admission policy to use.
@@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
compute: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable {
return WorkflowServiceImpl(
context,
clock,
- meter,
+ meterProvider,
compute,
mode,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 5329143d..a0fd3fad 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.workflow.service.internal
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.map
import mu.KotlinLogging
@@ -48,7 +49,7 @@ import kotlin.coroutines.resume
public class WorkflowServiceImpl(
context: CoroutineContext,
internal val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val computeClient: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -67,6 +68,11 @@ public class WorkflowServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to collect metrics of this service.
+ */
+ private val meter = meterProvider.get("org.opendc.workflow.service")
+
+ /**
* The incoming jobs ready to be processed by the scheduler.
*/
internal val incomingJobs: MutableSet<JobState> = linkedSetOf()
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 07433d1f..74316437 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -51,6 +51,7 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
+import java.time.Duration
import java.util.*
/**
@@ -79,24 +80,23 @@ internal class WorkflowServiceTest {
emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ MeterProvider.noop(),
hvProvider,
)
}
- val meter = MeterProvider.noop().get("opendc-compute")
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
- val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000)
+ val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
hosts.forEach { compute.addHost(it) }
val scheduler = WorkflowService(
coroutineContext,
clock,
- meterProvider.get("opendc-workflow"),
+ meterProvider,
compute.newClient(),
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,