summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
Diffstat (limited to 'simulator')
-rw-r--r--simulator/buildSrc/src/main/kotlin/Versions.kt5
-rw-r--r--simulator/gradle.properties6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/build.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt9
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt174
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt131
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt59
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt12
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt26
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt129
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt2
-rw-r--r--simulator/opendc-runner-web/build.gradle.kts2
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt161
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt22
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt7
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt1
-rw-r--r--simulator/opendc-telemetry/build.gradle.kts21
-rw-r--r--simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts34
-rw-r--r--simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts38
-rw-r--r--simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt (renamed from simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt)32
-rw-r--r--simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt99
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt4
-rw-r--r--simulator/settings.gradle.kts2
26 files changed, 619 insertions, 366 deletions
diff --git a/simulator/buildSrc/src/main/kotlin/Versions.kt b/simulator/buildSrc/src/main/kotlin/Versions.kt
index d1df6284..6aa9260b 100644
--- a/simulator/buildSrc/src/main/kotlin/Versions.kt
+++ b/simulator/buildSrc/src/main/kotlin/Versions.kt
@@ -49,6 +49,11 @@ public class Versions(private val project: Project) {
val kotlinxCoroutines by version(name = "kotlinx-coroutines")
+ val otelApi by version(name = "opentelemetry-api")
+ val otelApiMetrics by version(name = "opentelemetry-api-metrics")
+ val otelSdk by version(name = "opentelemetry-sdk")
+ val otelSdkMetrics by version(name = "opentelemetry-sdk-metrics")
+
/**
* Obtain the version for the specified [dependency][name].
diff --git a/simulator/gradle.properties b/simulator/gradle.properties
index 8d41408c..99b08bb2 100644
--- a/simulator/gradle.properties
+++ b/simulator/gradle.properties
@@ -28,6 +28,12 @@ kotlin-logging.version = 2.0.6
slf4j.version = 1.7.30
log4j.version = 2.14.1
+# Dependencies (Telemetry)
+opentelemetry-api.version = 1.0.1
+opentelemetry-api-metrics.version = 1.0.1-alpha
+opentelemetry-sdk.version = 1.0.1
+opentelemetry-sdk-metrics.version = 1.0.1-alpha
+
# Dependencies (CLI)
clikt.version = 3.1.0
progressbar.version = 0.9.0
diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
index 41b506b2..909e2dcd 100644
--- a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
@@ -32,6 +32,7 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-compute:opendc-compute-api"))
+ api(project(":opendc-telemetry:opendc-telemetry-api"))
implementation(project(":opendc-utils"))
implementation("io.github.microutils:kotlin-logging")
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 28cef83a..4bc0ba78 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.service
+import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.flow.Flow
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
@@ -35,11 +36,6 @@ import kotlin.coroutines.CoroutineContext
*/
public interface ComputeService : AutoCloseable {
/**
- * The events emitted by the service.
- */
- public val events: Flow<ComputeServiceEvent>
-
- /**
* The hosts that are used by the compute service.
*/
public val hosts: Set<Host>
@@ -80,10 +76,11 @@ public interface ComputeService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
+ meter: Meter,
allocationPolicy: AllocationPolicy,
schedulingQuantum: Long = 300000,
): ComputeService {
- return ComputeServiceImpl(context, clock, allocationPolicy, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meter, allocationPolicy, schedulingQuantum)
}
}
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index f9bd7fbc..26a34ad9 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,18 +22,16 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.Flow
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.utils.TimerScheduler
-import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -48,6 +46,7 @@ import kotlin.math.max
internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
+ private val meter: Meter,
private val allocationPolicy: AllocationPolicy,
private val schedulingQuantum: Long
) : ComputeService, HostListener {
@@ -101,24 +100,70 @@ internal class ComputeServiceImpl(
*/
private val servers = mutableMapOf<UUID, InternalServer>()
- private var submittedVms: Int = 0
- private var queuedVms: Int = 0
- private var runningVms: Int = 0
- private var finishedVms: Int = 0
- private var unscheduledVms: Int = 0
-
private var maxCores = 0
private var maxMemory = 0L
/**
+ * The number of servers that have been submitted to the service for provisioning.
+ */
+ private val _submittedServers = meter.longCounterBuilder("servers.submitted")
+ .setDescription("Number of start requests")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that failed to be scheduled.
+ */
+ private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled")
+ .setDescription("Number of unscheduled servers")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that are waiting to be provisioned.
+ */
+ private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting")
+ .setDescription("Number of servers waiting to be provisioned")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that are waiting to be provisioned.
+ */
+ private val _runningServers = meter.longUpDownCounterBuilder("servers.active")
+ .setDescription("Number of servers currently running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that have finished running.
+ */
+ private val _finishedServers = meter.longCounterBuilder("servers.finished")
+ .setDescription("Number of servers that finished running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of hosts registered at the compute service.
+ */
+ private val _hostCount = meter.longUpDownCounterBuilder("hosts.total")
+ .setDescription("Number of hosts")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of available hosts registered at the compute service.
+ */
+ private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available")
+ .setDescription("Number of available hosts")
+ .setUnit("1")
+ .build()
+
+ /**
* The allocation logic to use.
*/
private val allocationLogic = allocationPolicy()
- override val events: Flow<ComputeServiceEvent>
- get() = _events
- private val _events = EventFlow<ComputeServiceEvent>()
-
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
@@ -204,18 +249,6 @@ internal class ComputeServiceImpl(
start: Boolean
): Server {
check(!isClosed) { "Client is closed" }
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- ++submittedVms,
- runningVms,
- finishedVms,
- ++queuedVms,
- unscheduledVms
- )
- )
val uid = UUID(clock.millis(), random.nextLong())
val server = InternalServer(
@@ -269,14 +302,23 @@ internal class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
+ _availableHostCount.add(1)
availableHosts += hv
}
+ _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
- host.removeListener(this)
+ val view = hostToView.remove(host)
+ if (view != null) {
+ if (availableHosts.remove(view)) {
+ _availableHostCount.add(-1)
+ }
+ host.removeListener(this)
+ _hostCount.add(-1)
+ }
}
override fun close() {
@@ -288,6 +330,8 @@ internal class ComputeServiceImpl(
val request = SchedulingRequest(server)
queue.add(request)
+ _submittedServers.add(1)
+ _waitingServers.add(1)
requestSchedulingCycle()
return request
}
@@ -332,6 +376,7 @@ internal class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
+ _waitingServers.add(-1)
continue
}
@@ -341,21 +386,10 @@ internal class ComputeServiceImpl(
logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" }
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- runningVms,
- finishedVms,
- --queuedVms,
- ++unscheduledVms
- )
- )
-
// Remove the incoming image
queue.poll()
+ _waitingServers.add(-1)
+ _unscheduledServers.add(1)
logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
@@ -370,6 +404,7 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
+ _waitingServers.add(-1)
logger.info { "Assigned server $server to host $host." }
@@ -384,19 +419,6 @@ internal class ComputeServiceImpl(
server.host = host
host.spawn(server)
activeServers[server] = host
-
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- )
- )
} catch (e: Throwable) {
logger.error("Failed to deploy VM", e)
@@ -427,21 +449,9 @@ internal class ComputeServiceImpl(
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
+ _availableHostCount.add(1)
}
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
// Re-schedule on the new machine
requestSchedulingCycle()
}
@@ -450,19 +460,7 @@ internal class ComputeServiceImpl(
val hv = hostToView[host] ?: return
availableHosts -= hv
-
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
+ _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -480,23 +478,15 @@ internal class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ if (newState == ServerState.RUNNING) {
+ _runningServers.add(1)
+ } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
activeServers -= server
+ _runningServers.add(-1)
+ _finishedServers.add(1)
+
val hv = hostToView[host]
if (hv != null) {
hv.provisionedCores -= server.flavor.cpuCount
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index e1482152..45a306aa 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -23,6 +23,7 @@
package org.opendc.compute.service
import io.mockk.*
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.TestCoroutineScope
@@ -55,7 +56,8 @@ internal class ComputeServiceTest {
scope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(scope)
val policy = AvailableMemoryAllocationPolicy()
- service = ComputeService(scope.coroutineContext, clock, policy)
+ val meter = MeterProvider.noop().get("opendc-compute")
+ service = ComputeService(scope.coroutineContext, clock, meter, policy)
}
@AfterEach
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 2e4191cc..89784803 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -59,7 +59,7 @@ public class SimHost(
/**
* The [CoroutineScope] of the host bounded by the lifecycle of the host.
*/
- override val scope: CoroutineScope = CoroutineScope(context)
+ override val scope: CoroutineScope = CoroutineScope(context + Job())
/**
* The logger instance of this server.
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 2d0da1bf..b2d7cc30 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -47,4 +47,6 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
+
+ implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 6f99a44e..4f48bba7 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -22,6 +22,11 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.launchIn
@@ -29,7 +34,6 @@ import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostEvent
import org.opendc.compute.service.driver.HostListener
@@ -45,8 +49,10 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FaultInjector
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Clock
+import kotlin.coroutines.coroutineContext
import kotlin.coroutines.resume
import kotlin.math.ln
import kotlin.math.max
@@ -130,12 +136,13 @@ public fun createTraceReader(
/**
* Construct the environment for a simulated compute service..
*/
-public fun createComputeService(
- coroutineScope: CoroutineScope,
+public suspend fun withComputeService(
clock: Clock,
+ meter: Meter,
environmentReader: EnvironmentReader,
- allocationPolicy: AllocationPolicy
-): ComputeService {
+ allocationPolicy: AllocationPolicy,
+ block: suspend CoroutineScope.(ComputeService) -> Unit
+): Unit = coroutineScope {
val hosts = environmentReader
.use { it.read() }
.map { def ->
@@ -144,7 +151,7 @@ public fun createComputeService(
def.name,
def.model,
def.meta,
- coroutineScope.coroutineContext,
+ coroutineContext,
clock,
SimFairShareHypervisorProvider(),
def.powerModel
@@ -152,26 +159,33 @@ public fun createComputeService(
}
val scheduler =
- ComputeService(coroutineScope.coroutineContext, clock, allocationPolicy)
+ ComputeService(coroutineContext, clock, meter, allocationPolicy)
for (host in hosts) {
scheduler.addHost(host)
}
- return scheduler
+ try {
+ block(this, scheduler)
+ } finally {
+ scheduler.close()
+ hosts.forEach(SimHost::close)
+ }
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public fun attachMonitor(
- coroutineScope: CoroutineScope,
+public suspend fun withMonitor(
+ monitor: ExperimentMonitor,
clock: Clock,
+ metricProducer: MetricProducer,
scheduler: ComputeService,
- monitor: ExperimentMonitor
-): MonitorResults {
- val results = MonitorResults()
+ block: suspend CoroutineScope.() -> Unit
+): Unit = coroutineScope {
+ val monitorJobs = mutableSetOf<Job>()
+
// Monitor host events
for (host in scheduler.hosts) {
monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
@@ -181,7 +195,7 @@ public fun attachMonitor(
}
})
- host.events
+ monitorJobs += host.events
.onEach { event ->
when (event) {
is HostEvent.SliceFinished -> monitor.reportHostSlice(
@@ -197,37 +211,81 @@ public fun attachMonitor(
)
}
}
- .launchIn(coroutineScope)
+ .launchIn(this)
- (host as SimHost).machine.powerDraw
+ monitorJobs += (host as SimHost).machine.powerDraw
.onEach { monitor.reportPowerConsumption(host, it) }
- .launchIn(coroutineScope)
+ .launchIn(this)
}
- scheduler.events
- .onEach { event ->
- when (event) {
- is ComputeServiceEvent.MetricsAvailable -> {
- results.submittedVms = event.totalVmCount
- results.queuedVms = event.waitingVmCount
- results.runningVms = event.activeVmCount
- results.finishedVms = event.inactiveVmCount
- results.unscheduledVms = event.failedVmCount
- monitor.reportProvisionerMetrics(clock.millis(), event)
- }
+ val reader = CoroutineMetricReader(
+ this, listOf(metricProducer),
+ object : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+
+ val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+
+ monitor.reportProvisionerMetrics(
+ clock.millis(),
+ hosts,
+ availableHosts,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ return CompletableResultCode.ofSuccess()
}
- }
- .launchIn(coroutineScope)
- return results
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ },
+ exportInterval = 5 * 60 * 1000
+ )
+
+ try {
+ block(this)
+ } finally {
+ monitorJobs.forEach(Job::cancel)
+ reader.close()
+ monitor.close()
+ }
}
-public class MonitorResults {
+public class ComputeMetrics {
public var submittedVms: Int = 0
public var queuedVms: Int = 0
public var runningVms: Int = 0
- public var finishedVms: Int = 0
public var unscheduledVms: Int = 0
+ public var finishedVms: Int = 0
+}
+
+/**
+ * Collect the metrics of the compute service.
+ */
+public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
+ val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ val res = ComputeMetrics()
+ try {
+ // Hack to extract metrics from OpenTelemetry SDK
+ res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Failed to collect metrics" }
+ }
+ return res
}
/**
@@ -242,12 +300,17 @@ public suspend fun processTrace(
) {
val client = scheduler.newClient()
val image = client.newImage("vm-image")
+ var offset = Long.MIN_VALUE
try {
coroutineScope {
while (reader.hasNext()) {
val entry = reader.next()
- delay(max(0, entry.start - clock.millis()))
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ delay(max(0, (entry.start - offset) - clock.millis()))
launch {
chan.send(Unit)
val server = client.newServer(
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 46e0bcb9..2921daba 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -22,11 +22,13 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.opendc.compute.service.scheduler.*
import org.opendc.experiments.capelin.model.CompositeWorkload
@@ -41,6 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random
@@ -110,15 +113,21 @@ public abstract class Portfolio(name: String) : Experiment(name) {
* Perform a single trial for this portfolio.
*/
@OptIn(ExperimentalCoroutinesApi::class)
- override fun doRun(repeat: Int) {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
+ override fun doRun(repeat: Int): Unit = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val seeder = Random(repeat)
val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = createAllocationPolicy(seeder)
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val meter = meterProvider.get("opendc-compute")
+
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
@@ -144,14 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environment,
- allocationPolicy
- )
-
+ withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
@@ -166,30 +168,21 @@ public abstract class Portfolio(name: String) : Experiment(name) {
null
}
- val monitorResults = attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
-
- logger.debug("SUBMIT=${monitorResults.submittedVms}")
- logger.debug("FAIL=${monitorResults.unscheduledVms}")
- logger.debug("QUEUED=${monitorResults.queuedVms}")
- logger.debug("RUNNING=${monitorResults.runningVms}")
- logger.debug("FINISHED=${monitorResults.finishedVms}")
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
failureDomain?.cancel()
- scheduler.close()
}
- try {
- testScope.advanceUntilIdle()
- } finally {
- monitor.close()
- }
+ val monitorResults = collectMetrics(meterProvider as MetricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
}
/**
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 14cc06dc..a57c8d78 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.monitor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import java.io.Closeable
@@ -68,5 +67,14 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked for a provisioner event.
*/
- public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {}
+ public fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index c9d57a98..0e675d87 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.telemetry.HostEvent
@@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
}
}
- override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
provisionerWriter.write(
ProvisionerEvent(
time,
- event.totalHostCount,
- event.availableHostCount,
- event.totalVmCount,
- event.activeVmCount,
- event.inactiveVmCount,
- event.waitingVmCount,
- event.failedVmCount
+ totalHostCount,
+ availableHostCount,
+ totalVmCount,
+ activeVmCount,
+ inactiveVmCount,
+ waitingVmCount,
+ failedVmCount
)
)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index a836b334..fd906f4d 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,18 +22,18 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.AfterEach
+import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
import org.opendc.experiments.capelin.model.Workload
@@ -45,8 +45,8 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
-import java.time.Clock
/**
* An integration test suite for the SC20 experiments.
@@ -54,16 +54,6 @@ import java.time.Clock
@OptIn(ExperimentalCoroutinesApi::class)
class CapelinIntegrationTest {
/**
- * The [TestCoroutineScope] to use.
- */
- private lateinit var testScope: TestCoroutineScope
-
- /**
- * The simulation clock to use.
- */
- private lateinit var clock: Clock
-
- /**
* The monitor used to keep track of the metrics.
*/
private lateinit var monitor: TestExperimentReporter
@@ -73,37 +63,28 @@ class CapelinIntegrationTest {
*/
@BeforeEach
fun setUp() {
- testScope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(testScope)
-
monitor = TestExperimentReporter()
}
- /**
- * Tear down the experimental environment.
- */
- @AfterEach
- fun tearDown() = testScope.cleanupTestCoroutines()
-
@Test
- fun testLarge() {
+ fun testLarge() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val failures = false
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var scheduler: ComputeService
- lateinit var monitorResults: MonitorResults
+ lateinit var monitorResults: ComputeMetrics
- testScope.launch {
- scheduler = createComputeService(
- this,
- clock,
- environmentReader,
- allocationPolicy
- )
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+ val meter: Meter = meterProvider.get("opendc-compute")
+
+ withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler ->
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
@@ -118,28 +99,28 @@ class CapelinIntegrationTest {
null
}
- monitorResults = attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
-
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}")
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
failureDomain?.cancel()
- scheduler.close()
- monitor.close()
}
- runSimulation()
+ monitorResults = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
// Note that these values have been verified beforehand
assertAll(
{ assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(50, monitorResults.finishedVms, "All VMs should finish after a run") },
+ { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
+ { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
+ { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
{ assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
{ assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
{ assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
@@ -148,38 +129,35 @@ class CapelinIntegrationTest {
}
@Test
- fun testSmall() {
+ fun testSmall() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environmentReader,
- allocationPolicy
- )
- val monitorResults = attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
-
- yield()
-
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}")
-
- scheduler.close()
- monitor.close()
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val meter: Meter = meterProvider.get("opendc-compute")
+
+ withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler ->
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
}
- runSimulation()
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
// Note that these values have been verified beforehand
assertAll(
@@ -191,11 +169,6 @@ class CapelinIntegrationTest {
}
/**
- * Run the simulation.
- */
- private fun runSimulation() = testScope.advanceUntilIdle()
-
- /**
* Obtain the trace reader for the test.
*/
private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
index 98e25be9..225200c9 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.sc18
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.test.TestCoroutineScope
import org.opendc.compute.service.ComputeService
@@ -100,6 +101,7 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
val compute = ComputeService(
testScope.coroutineContext,
clock,
+ MeterProvider.noop().get("opendc-compute"),
NumberOfActiveServersAllocationPolicy(),
)
diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts
index d07fe7a6..fcc78a83 100644
--- a/simulator/opendc-runner-web/build.gradle.kts
+++ b/simulator/opendc-runner-web/build.gradle.kts
@@ -48,4 +48,6 @@ dependencies {
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}")
runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}")
+
+ implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 68ea3fb9..706efdc9 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -34,9 +34,13 @@ import com.mongodb.client.MongoClients
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.bson.Document
import org.bson.types.ObjectId
@@ -45,17 +49,14 @@ import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
import org.opendc.compute.service.scheduler.RandomAllocationPolicy
-import org.opendc.experiments.capelin.attachMonitor
-import org.opendc.experiments.capelin.createComputeService
-import org.opendc.experiments.capelin.createFailureDomain
+import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.processTrace
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
-import kotlin.coroutines.coroutineContext
import kotlin.random.Random
private val logger = KotlinLogging.logger {}
@@ -206,86 +207,86 @@ public class RunnerCli : CliktCommand(name = "runner") {
traceReader: Sc20RawParquetTraceReader,
performanceInterferenceReader: Sc20PerformanceInterferenceReader?
): WebExperimentMonitor.Result {
- val seed = repeat
- val traceDocument = scenario.get("trace", Document::class.java)
- val workloadName = traceDocument.getString("traceId")
- val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
-
- val seeder = Random(seed)
- val testScope = TestCoroutineScope(Job(parent = coroutineContext[Job]))
- val clock = DelayControllerClockAdapter(testScope)
-
- val chan = Channel<Unit>(Channel.CONFLATED)
-
- val operational = scenario.get("operational", Document::class.java)
- val allocationPolicy =
- when (val policyName = operational.getString("schedulerName")) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
- else -> throw IllegalArgumentException("Unknown policy $policyName")
- }
-
- val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(
- listOf(traceReader),
- performanceInterferenceModel,
- Workload(workloadName, workloadFraction),
- seed
- )
- val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
- val environment = TopologyParser(topologies, topologyId)
val monitor = WebExperimentMonitor()
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environment,
- allocationPolicy
- )
-
- val failureDomain = if (operational.getBoolean("failuresEnabled")) {
- logger.debug("ENABLING failures")
- createFailureDomain(
- testScope,
- clock,
- seeder.nextInt(),
- operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
- scheduler,
- chan
- )
- } else {
- null
- }
+ try {
+ runBlockingTest {
+ val seed = repeat
+ val traceDocument = scenario.get("trace", Document::class.java)
+ val workloadName = traceDocument.getString("traceId")
+ val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
+
+ val seeder = Random(seed)
+ val clock = DelayControllerClockAdapter(this)
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+ val metricProducer = meterProvider as MetricProducer
+ val meter: Meter = meterProvider.get("opendc-compute")
+
+ val operational = scenario.get("operational", Document::class.java)
+ val allocationPolicy =
+ when (val policyName = operational.getString("schedulerName")) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ else -> throw IllegalArgumentException("Unknown policy $policyName")
+ }
- val monitorResults = attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
+ val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(
+ listOf(traceReader),
+ performanceInterferenceModel,
+ Workload(workloadName, workloadFraction),
+ seed
+ )
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
+ val environment = TopologyParser(topologies, topologyId)
+ val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7
+
+ withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
+ val failureDomain = if (failureFrequency > 0) {
+ logger.debug { "ENABLING failures" }
+ createFailureDomain(
+ this,
+ clock,
+ seeder.nextInt(),
+ failureFrequency,
+ scheduler,
+ chan
+ )
+ } else {
+ null
+ }
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}" }
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
- failureDomain?.cancel()
- scheduler.close()
- }
+ failureDomain?.cancel()
+ }
- try {
- testScope.advanceUntilIdle()
- testScope.uncaughtExceptions.forEach { it.printStackTrace() }
- } finally {
- monitor.close()
- testScope.cancel()
+ val monitorResults = collectMetrics(metricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Experiment failed" }
}
return monitor.getResult()
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
index a8ac6c10..fcd43ea7 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -25,7 +25,6 @@ package org.opendc.runner.web
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -205,13 +204,22 @@ public class WebExperimentMonitor : ExperimentMonitor {
private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
- override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
provisionerMetrics = AggregateProvisionerMetrics(
- max(event.totalVmCount, provisionerMetrics.vmTotalCount),
- max(event.waitingVmCount, provisionerMetrics.vmWaitingCount),
- max(event.activeVmCount, provisionerMetrics.vmActiveCount),
- max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount),
- max(event.failedVmCount, provisionerMetrics.vmFailedCount),
+ max(totalVmCount, provisionerMetrics.vmTotalCount),
+ max(waitingVmCount, provisionerMetrics.vmWaitingCount),
+ max(activeVmCount, provisionerMetrics.vmActiveCount),
+ max(inactiveVmCount, provisionerMetrics.vmInactiveCount),
+ max(failedVmCount, provisionerMetrics.vmFailedCount),
)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 1c0f94fd..2127b066 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -108,8 +108,11 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
.launchIn(this)
launch {
- source.consume(consumer)
- job.cancel()
+ try {
+ source.consume(consumer)
+ } finally {
+ job.cancel()
+ }
}
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index f86c4198..830ff70e 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -71,6 +71,7 @@ public class SimBareMetalMachine(
override fun close() {
super.close()
+
scheduler.close()
scope.cancel()
}
diff --git a/simulator/opendc-telemetry/build.gradle.kts b/simulator/opendc-telemetry/build.gradle.kts
new file mode 100644
index 00000000..7edfd134
--- /dev/null
+++ b/simulator/opendc-telemetry/build.gradle.kts
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
diff --git a/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
new file mode 100644
index 00000000..d9a4b4dd
--- /dev/null
+++ b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Telemetry API for OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(platform(project(":opendc-platform")))
+ api("io.opentelemetry:opentelemetry-api:${versions.otelApi}")
+ api("io.opentelemetry:opentelemetry-api-metrics:${versions.otelApiMetrics}")
+}
diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts
new file mode 100644
index 00000000..350a0f74
--- /dev/null
+++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Telemetry SDK for OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(platform(project(":opendc-platform")))
+ api(project(":opendc-telemetry:opendc-telemetry-api"))
+ api("org.jetbrains.kotlinx:kotlinx-coroutines-core")
+ api("io.opentelemetry:opentelemetry-sdk:${versions.otelSdk}")
+ api("io.opentelemetry:opentelemetry-sdk-metrics:${versions.otelSdkMetrics}")
+
+ implementation("io.github.microutils:kotlin-logging")
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt
index 193008a7..86f6647e 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt
+++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt
@@ -20,28 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.compute.service
+package org.opendc.telemetry.sdk
+
+import io.opentelemetry.sdk.common.Clock
/**
- * An event that is emitted by the [ComputeService].
+ * An adapter class that bridges a [java.time.Clock] to a [Clock]
*/
-public sealed class ComputeServiceEvent {
- /**
- * The service that has emitted the event.
- */
- public abstract val provisioner: ComputeService
+public class OtelClockAdapter(private val clock: java.time.Clock) : Clock {
+ override fun now(): Long = clock.millis()
- /**
- * An event emitted for writing metrics.
- */
- public data class MetricsAvailable(
- override val provisioner: ComputeService,
- public val totalHostCount: Int,
- public val availableHostCount: Int,
- public val totalVmCount: Int,
- public val activeVmCount: Int,
- public val inactiveVmCount: Int,
- public val waitingVmCount: Int,
- public val failedVmCount: Int
- ) : ComputeServiceEvent()
+ override fun nanoTime(): Long = clock.millis() * 1_000_000L
}
+
+/**
+ * Convert the specified [java.time.Clock] to a [Clock].
+ */
+public fun java.time.Clock.toOtelClock(): Clock = OtelClockAdapter(this)
diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
new file mode 100644
index 00000000..9ee16fac
--- /dev/null
+++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.sdk.metrics.export
+
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.consumeAsFlow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import mu.KotlinLogging
+import java.util.*
+import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
+
+/**
+ * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every
+ * export interval.
+ *
+ * The reader runs in a [CoroutineScope] which enables collection of metrics in environments with a custom clock.
+ *
+ * @param scope The [CoroutineScope] to run the reader in.
+ * @param producers The metric producers to gather metrics from.
+ * @param exporter The export to export the metrics to.
+ * @param exportInterval The export interval in milliseconds.
+ */
+public class CoroutineMetricReader(
+ scope: CoroutineScope,
+ private val producers: List<MetricProducer>,
+ private val exporter: MetricExporter,
+ private val exportInterval: Long = 60_000
+) : AutoCloseable {
+ private val logger = KotlinLogging.logger {}
+ private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS)
+
+ /**
+ * The metric reader job.
+ */
+ private val readerJob = scope.launch {
+ while (isActive) {
+ delay(exportInterval)
+
+ val metrics = mutableListOf<MetricData>()
+ for (producer in producers) {
+ metrics.addAll(producer.collectAllMetrics())
+ }
+ chan.send(Collections.unmodifiableList(metrics))
+ }
+ }
+
+ /**
+ * The exporter job runs in the background to actually export the metrics.
+ */
+ private val exporterJob = chan.consumeAsFlow()
+ .onEach { metrics ->
+ suspendCoroutine<Unit> { cont ->
+ try {
+ val result = exporter.export(metrics)
+ result.whenComplete {
+ if (!result.isSuccess) {
+ logger.trace { "Exporter failed" }
+ }
+ cont.resume(Unit)
+ }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Exporter threw an Exception" }
+ cont.resume(Unit)
+ }
+ }
+ }
+ .launchIn(scope)
+
+ override fun close() {
+ readerJob.cancel()
+ exporterJob.cancel()
+ }
+}
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt
index 91b22266..5e276edf 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -22,6 +22,7 @@
package org.opendc.workflow.service
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -85,7 +86,8 @@ internal class StageWorkflowSchedulerIntegrationTest {
)
}
- val compute = ComputeService(testScope.coroutineContext, clock, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000)
+ val meter = MeterProvider.noop().get("opendc-compute")
+ val compute = ComputeService(testScope.coroutineContext, clock, meter, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000)
hosts.forEach { compute.addHost(it) }
diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts
index d5603664..73b4d8e7 100644
--- a/simulator/settings.gradle.kts
+++ b/simulator/settings.gradle.kts
@@ -39,5 +39,7 @@ include(":opendc-simulator:opendc-simulator-resources")
include(":opendc-simulator:opendc-simulator-compute")
include(":opendc-simulator:opendc-simulator-failures")
include(":opendc-trace:opendc-trace-core")
+include(":opendc-telemetry:opendc-telemetry-api")
+include(":opendc-telemetry:opendc-telemetry-sdk")
include(":opendc-harness")
include(":opendc-utils")