summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gradle/libs.versions.toml6
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt10
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt159
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt18
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt3
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt81
-rw-r--r--opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt509
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt305
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt (renamed from opendc-experiments/opendc-experiments-energy21/build.gradle.kts)35
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt134
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt256
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt77
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt124
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt14
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt86
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt95
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt86
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt222
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt97
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt262
-rw-r--r--opendc-experiments/opendc-experiments-energy21/.gitignore3
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt208
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf14
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt3
-rw-r--r--opendc-faas/opendc-faas-service/build.gradle.kts1
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt7
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt26
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt5
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt8
-rw-r--r--opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt30
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt8
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/build.gradle.kts1
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt448
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt113
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt14
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt81
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt51
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt33
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt28
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt18
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt37
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt18
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt52
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt83
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt8
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt87
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt8
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt8
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt8
-rw-r--r--settings.gradle.kts1
57 files changed, 2476 insertions, 1658 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index ddede2e8..3f0e180b 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -13,9 +13,9 @@ kotlinx-coroutines = "1.5.1"
ktor = "1.6.3"
log4j = "2.14.1"
mockk = "1.12.0"
-opentelemetry-main = "1.5.0"
-opentelemetry-metrics = "1.5.0-alpha"
-opentelemetry-semconv = "1.5.0-alpha"
+opentelemetry-main = "1.6.0"
+opentelemetry-metrics = "1.6.0-alpha"
+opentelemetry-semconv = "1.6.0-alpha"
parquet = "1.12.0"
progressbar = "0.9.0"
sentry = "5.1.2"
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 1873eb99..2a1fbaa0 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -23,11 +23,13 @@
package org.opendc.compute.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
* @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
scheduler: ComputeScheduler,
- schedulingQuantum: Long = 300000,
+ schedulingQuantum: Duration = Duration.ofMinutes(5),
): ComputeService {
- return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index f1c055d4..57e70fcd 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,9 +22,10 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -35,6 +36,7 @@ import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -42,15 +44,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use.
- * @param clock The clock instance to keep track of time.
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
+ * @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val scheduler: ComputeScheduler,
- private val schedulingQuantum: Long
+ private val schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -63,6 +68,11 @@ internal class ComputeServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the [ComputeService].
+ */
+ private val meter = meterProvider.get("org.opendc.compute.service")
+
+ /**
* The [Random] instance used to generate unique identifiers for the objects.
*/
private val random = Random(0)
@@ -106,69 +116,37 @@ internal class ComputeServiceImpl(
private var maxMemory = 0L
/**
- * The number of servers that have been submitted to the service for provisioning.
- */
- private val _submittedServers = meter.counterBuilder("servers.submitted")
- .setDescription("Number of start requests")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that failed to be scheduled.
- */
- private val _unscheduledServers = meter.counterBuilder("servers.unscheduled")
- .setDescription("Number of unscheduled servers")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _waitingServers = meter.upDownCounterBuilder("servers.waiting")
- .setDescription("Number of servers waiting to be provisioned")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _runningServers = meter.upDownCounterBuilder("servers.active")
- .setDescription("Number of servers currently running")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that have finished running.
+ * The number of scheduling attempts.
*/
- private val _finishedServers = meter.counterBuilder("servers.finished")
- .setDescription("Number of servers that finished running")
+ private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts")
+ .setDescription("Number of scheduling attempts")
.setUnit("1")
.build()
+ private val _schedulingAttemptsSuccess = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "success"))
+ private val _schedulingAttemptsFailure = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "failure"))
+ private val _schedulingAttemptsError = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "error"))
/**
- * The number of hosts registered at the compute service.
+ * The response time of the service.
*/
- private val _hostCount = meter.upDownCounterBuilder("hosts.total")
- .setDescription("Number of hosts")
- .setUnit("1")
+ private val _schedulingLatency = meter.histogramBuilder("scheduler.latency")
+ .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
+ .ofLongs()
+ .setUnit("ms")
.build()
/**
- * The number of available hosts registered at the compute service.
+ * The number of servers that are pending.
*/
- private val _availableHostCount = meter.upDownCounterBuilder("hosts.available")
- .setDescription("Number of available hosts")
+ private val _servers = meter.upDownCounterBuilder("scheduler.servers")
+ .setDescription("Number of servers managed by the scheduler")
.setUnit("1")
.build()
-
- /**
- * The response time of the service.
- */
- private val _schedulerDuration = meter.histogramBuilder("scheduler.duration")
- .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
- .ofLongs()
- .setUnit("ms")
- .build()
+ private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending"))
+ private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active"))
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -181,6 +159,22 @@ internal class ComputeServiceImpl(
override val hostCount: Int
get() = hostToView.size
+ init {
+ val upState = Attributes.of(AttributeKey.stringKey("state"), "up")
+ val downState = Attributes.of(AttributeKey.stringKey("state"), "down")
+
+ meter.upDownCounterBuilder("scheduler.hosts")
+ .setDescription("Number of hosts registered with the scheduler")
+ .setUnit("1")
+ .buildWithCallback { result ->
+ val total = hostCount
+ val available = availableHosts.size.toLong()
+
+ result.observe(available, upState)
+ result.observe(total - available, downState)
+ }
+ }
+
override fun newClient(): ComputeClient {
check(scope.isActive) { "Service is already closed" }
return object : ComputeClient {
@@ -308,24 +302,19 @@ internal class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
- _availableHostCount.add(1)
availableHosts += hv
}
scheduler.addHost(hv)
- _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
val view = hostToView.remove(host)
if (view != null) {
- if (availableHosts.remove(view)) {
- _availableHostCount.add(-1)
- }
+ availableHosts.remove(view)
scheduler.removeHost(view)
host.removeListener(this)
- _hostCount.add(-1)
}
}
@@ -338,8 +327,7 @@ internal class ComputeServiceImpl(
val request = SchedulingRequest(server, clock.millis())
queue.add(request)
- _submittedServers.add(1)
- _waitingServers.add(1)
+ _serversPending.add(1)
requestSchedulingCycle()
return request
}
@@ -365,10 +353,12 @@ internal class ComputeServiceImpl(
return
}
+ val quantum = schedulingQuantum.toMillis()
+
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
+ val delay = quantum - (clock.millis() % quantum)
timerScheduler.startSingleTimer(Unit, delay) {
doSchedule()
@@ -385,7 +375,7 @@ internal class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
continue
}
@@ -397,10 +387,10 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
- _waitingServers.add(-1)
- _unscheduledServers.add(1)
+ _serversPending.add(-1)
+ _schedulingAttemptsFailure.add(1)
- logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
+ logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
server.state = ServerState.TERMINATED
continue
@@ -413,8 +403,8 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
- _waitingServers.add(-1)
- _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString()))
+ _serversPending.add(-1)
+ _schedulingLatency.record(now - request.submitTime, server.attributes)
logger.info { "Assigned server $server to host $host." }
@@ -429,12 +419,17 @@ internal class ComputeServiceImpl(
server.host = host
host.spawn(server)
activeServers[server] = host
+
+ _serversActive.add(1)
+ _schedulingAttemptsSuccess.add(1)
} catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
+ logger.error(e) { "Failed to deploy VM" }
hv.instanceCount--
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
+
+ _schedulingAttemptsError.add(1)
}
}
}
@@ -453,24 +448,22 @@ internal class ComputeServiceImpl(
override fun onStateChanged(host: Host, newState: HostState) {
when (newState) {
HostState.UP -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host]
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
- _availableHostCount.add(1)
}
// Re-schedule on the new machine
requestSchedulingCycle()
}
HostState.DOWN -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host] ?: return
availableHosts -= hv
- _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -488,16 +481,12 @@ internal class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.RUNNING) {
- _runningServers.add(1)
- } else if (newState == ServerState.ERROR) {
- _runningServers.add(-1)
- } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
- logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+ if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- activeServers -= server
- _runningServers.add(-1)
- _finishedServers.add(1)
+ if (activeServers.remove(server) != null) {
+ _serversActive.add(-1)
+ }
val hv = hostToView[host]
if (hv != null) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index d9d0f3fc..05a7e1bf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -22,6 +22,9 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
@@ -50,6 +53,21 @@ internal class InternalServer(
private val watchers = mutableListOf<ServerWatcher>()
/**
+ * The attributes of a server.
+ */
+ internal val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, name)
+ .put(ResourceAttributes.HOST_ID, uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString())
+ .build()
+
+ /**
* The [Host] that has been assigned to host the server.
*/
internal var host: Host? = null
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index d036ec00..564f9493 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -61,8 +61,7 @@ internal class ComputeServiceTest {
filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
weighers = listOf(RamWeigher())
)
- val meter = MeterProvider.noop().get("opendc-compute")
- service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler)
+ service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler)
}
@Test
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index 28fd8217..dfd3bc67 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -47,8 +47,9 @@ class InternalServerTest {
fun testEquality() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
+
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
@@ -59,8 +60,8 @@ class InternalServerTest {
fun testEqualityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -73,8 +74,8 @@ class InternalServerTest {
fun testInequalityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -87,8 +88,8 @@ class InternalServerTest {
fun testInequalityWithIncorrectType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
assertNotEquals(a, Unit)
@@ -98,8 +99,8 @@ class InternalServerTest {
fun testStartTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) }
@@ -114,8 +115,8 @@ class InternalServerTest {
fun testStartDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -127,8 +128,8 @@ class InternalServerTest {
fun testStartProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.PROVISIONING
@@ -142,8 +143,8 @@ class InternalServerTest {
fun testStartRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.RUNNING
@@ -157,8 +158,8 @@ class InternalServerTest {
fun testStopProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val request = ComputeServiceImpl.SchedulingRequest(server, 0)
@@ -175,8 +176,8 @@ class InternalServerTest {
fun testStopTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -189,8 +190,8 @@ class InternalServerTest {
fun testStopDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -203,8 +204,8 @@ class InternalServerTest {
fun testStopRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -220,8 +221,8 @@ class InternalServerTest {
fun testDeleteProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val request = ComputeServiceImpl.SchedulingRequest(server, 0)
@@ -239,8 +240,8 @@ class InternalServerTest {
fun testDeleteTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -255,8 +256,8 @@ class InternalServerTest {
fun testDeleteDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -269,8 +270,8 @@ class InternalServerTest {
fun testDeleteRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -282,4 +283,20 @@ class InternalServerTest {
coVerify { host.delete(server) }
verify { service.delete(server) }
}
+
+ private fun mockFlavor(): InternalFlavor {
+ val flavor = mockk<InternalFlavor>()
+ every { flavor.name } returns "c5.large"
+ every { flavor.uid } returns UUID.randomUUID()
+ every { flavor.cpuCount } returns 2
+ every { flavor.memorySize } returns 4096
+ return flavor
+ }
+
+ private fun mockImage(): InternalImage {
+ val image = mockk<InternalImage>()
+ every { image.name } returns "ubuntu-20.04"
+ every { image.uid } returns UUID.randomUUID()
+ return image
+ }
}
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts
index cad051e6..aaf69f78 100644
--- a/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -40,5 +40,6 @@ dependencies {
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ testImplementation(projects.opendcTelemetry.opendcTelemetryCompute)
testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index a1cc3390..793db907 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -25,13 +25,17 @@ package org.opendc.compute.simulator
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
+import org.opendc.compute.simulator.internal.Guest
+import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimHypervisorProvider
@@ -43,11 +47,11 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.resources.SimResourceDistributorMaxMin
import org.opendc.simulator.resources.SimResourceInterpreter
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
+import kotlin.math.roundToLong
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -59,7 +63,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
interpreter: SimResourceInterpreter,
- private val meter: Meter,
+ meterProvider: MeterProvider,
hypervisor: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
@@ -82,6 +86,11 @@ public class SimHost(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the simulated host.
+ */
+ private val meter = meterProvider.get("org.opendc.compute.simulator")
+
+ /**
* The event listeners registered with this host.
*/
private val listeners = mutableListOf<HostListener>()
@@ -94,32 +103,29 @@ public class SimHost(
/**
* The hypervisor to run multiple workloads.
*/
- public val hypervisor: SimHypervisor = hypervisor.create(
+ private val hypervisor: SimHypervisor = hypervisor.create(
interpreter,
scalingGovernor = scalingGovernor,
interferenceDomain = interferenceDomain,
listener = object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
- _totalWork.add(requestedWork)
- _grantedWork.add(grantedWork)
- _overcommittedWork.add(overcommittedWork)
- _interferedWork.add(interferedWork)
- _cpuDemand.record(cpuDemand)
- _cpuUsage.record(cpuUsage)
- _powerUsage.record(machine.powerDraw)
-
- reportTime()
+ _cpuDemand = cpuDemand
+ _cpuUsage = cpuUsage
+
+ collectTime()
}
}
)
+ private var _cpuUsage = 0.0
+ private var _cpuDemand = 0.0
/**
* The virtual machines running on the hypervisor.
@@ -139,121 +145,23 @@ public class SimHost(
override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
/**
- * The total number of guests.
+ * The [GuestListener] that listens for guest events.
*/
- private val _guests = meter.upDownCounterBuilder("guests.total")
- .setDescription("Number of guests")
- .setUnit("1")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The number of active guests on the host.
- */
- private val _activeGuests = meter.upDownCounterBuilder("guests.active")
- .setDescription("Number of active guests")
- .setUnit("1")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The CPU demand of the host.
- */
- private val _cpuDemand = meter.histogramBuilder("cpu.demand")
- .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
- .setUnit("MHz")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The CPU usage of the host.
- */
- private val _cpuUsage = meter.histogramBuilder("cpu.usage")
- .setDescription("The amount of CPU resources used by the host")
- .setUnit("MHz")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The power usage of the host.
- */
- private val _powerUsage = meter.histogramBuilder("power.usage")
- .setDescription("The amount of power used by the CPU")
- .setUnit("W")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The total amount of work supplied to the CPU.
- */
- private val _totalWork = meter.counterBuilder("cpu.work.total")
- .setDescription("The amount of work supplied to the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The work performed by the CPU.
- */
- private val _grantedWork = meter.counterBuilder("cpu.work.granted")
- .setDescription("The amount of work performed by the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The amount not performed by the CPU due to overcommitment.
- */
- private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit")
- .setDescription("The amount of work not performed by the CPU due to overcommitment")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The amount of work not performed by the CPU due to interference.
- */
- private val _interferedWork = meter.counterBuilder("cpu.work.interference")
- .setDescription("The amount of work not performed by the CPU due to interference")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The amount of time in the system.
- */
- private val _totalTime = meter.counterBuilder("host.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The uptime of the host.
- */
- private val _upTime = meter.counterBuilder("host.time.up")
- .setDescription("The uptime of the host")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+ private val guestListener = object : GuestListener {
+ override fun onStart(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
- /**
- * The downtime of the host.
- */
- private val _downTime = meter.counterBuilder("host.time.down")
- .setDescription("The downtime of the host")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+ override fun onStop(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
+ }
init {
// Launch hypervisor onto machine
scope.launch {
try {
+ _bootTime = clock.millis()
_state = HostState.UP
machine.run(this@SimHost.hypervisor, emptyMap())
} catch (_: CancellationException) {
@@ -265,29 +173,48 @@ public class SimHost(
_state = HostState.DOWN
}
}
- }
-
- private var _lastReport = clock.millis()
-
- private fun reportTime() {
- if (!scope.isActive)
- return
-
- val now = clock.millis()
- val duration = now - _lastReport
-
- _totalTime.add(duration)
- when (_state) {
- HostState.UP -> _upTime.add(duration)
- HostState.DOWN -> _downTime.add(duration)
- }
-
- // Track time of guests
- for (guest in guests.values) {
- guest.reportTime()
- }
- _lastReport = now
+ meter.upDownCounterBuilder("system.guests")
+ .setDescription("Number of guests on this host")
+ .setUnit("1")
+ .buildWithCallback(::collectGuests)
+ meter.gaugeBuilder("system.cpu.limit")
+ .setDescription("Amount of CPU resources available to the host")
+ .buildWithCallback(::collectCpuLimit)
+ meter.gaugeBuilder("system.cpu.demand")
+ .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuDemand) }
+ meter.gaugeBuilder("system.cpu.usage")
+ .setDescription("Amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuUsage) }
+ meter.gaugeBuilder("system.cpu.utilization")
+ .setDescription("Utilization of the CPU resources of the host")
+ .setUnit("%")
+ .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) }
+ meter.counterBuilder("system.cpu.time")
+ .setDescription("Amount of CPU time spent by the host")
+ .setUnit("s")
+ .buildWithCallback(::collectCpuTime)
+ meter.gaugeBuilder("system.power.usage")
+ .setDescription("Power usage of the host ")
+ .setUnit("W")
+ .buildWithCallback { result -> result.observe(machine.powerDraw) }
+ meter.counterBuilder("system.power.total")
+ .setDescription("Amount of energy used by the CPU")
+ .setUnit("J")
+ .ofDoubles()
+ .buildWithCallback(::collectPowerTotal)
+ meter.counterBuilder("system.time")
+ .setDescription("The uptime of the host")
+ .setUnit("s")
+ .buildWithCallback(::collectTime)
+ meter.gaugeBuilder("system.time.boot")
+ .setDescription("The boot time of the host")
+ .setUnit("1")
+ .ofLongs()
+ .buildWithCallback(::collectBootTime)
}
override fun canFit(server: Server): Boolean {
@@ -301,8 +228,17 @@ public class SimHost(
override suspend fun spawn(server: Server, start: Boolean) {
val guest = guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
- _guests.add(1)
- Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name))
+
+ val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
+ Guest(
+ scope.coroutineContext,
+ clock,
+ this,
+ mapper,
+ guestListener,
+ server,
+ machine
+ )
}
if (start) {
@@ -326,7 +262,6 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
- _guests.add(-1)
guest.terminate()
}
@@ -339,7 +274,7 @@ public class SimHost(
}
override fun close() {
- reportTime()
+ reset()
scope.cancel()
machine.close()
}
@@ -347,22 +282,35 @@ public class SimHost(
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
public suspend fun fail() {
- reportTime()
- _state = HostState.DOWN
+ reset()
+
for (guest in guests.values) {
guest.fail()
}
}
public suspend fun recover() {
- reportTime()
+ collectTime()
_state = HostState.UP
+ _bootTime = clock.millis()
+
for (guest in guests.values) {
guest.start()
}
}
/**
+ * Reset the machine.
+ */
+ private fun reset() {
+ collectTime()
+
+ _state = HostState.DOWN
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ }
+
+ /**
* Convert flavor to machine model.
*/
private fun Flavor.toMachineModel(): MachineModel {
@@ -374,147 +322,168 @@ public class SimHost(
return MachineModel(processingUnits, memoryUnits)
}
- private fun onGuestStart(vm: Guest) {
- _activeGuests.add(1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val STATE_KEY = AttributeKey.stringKey("state")
- private fun onGuestStop(vm: Guest) {
- _activeGuests.add(-1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val terminatedState = Attributes.of(STATE_KEY, "terminated")
+ private val runningState = Attributes.of(STATE_KEY, "running")
+ private val errorState = Attributes.of(STATE_KEY, "error")
+ private val invalidState = Attributes.of(STATE_KEY, "invalid")
/**
- * A virtual machine instance that the driver manages.
+ * Helper function to collect the guest counts on this host.
*/
- private inner class Guest(val server: Server, val machine: SimMachine) {
- var state: ServerState = ServerState.TERMINATED
-
- /**
- * The amount of time in the system.
- */
- private val _totalTime = meter.counterBuilder("guest.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
-
- /**
- * The uptime of the guest.
- */
- private val _runningTime = meter.counterBuilder("guest.time.running")
- .setDescription("The uptime of the guest")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
-
- /**
- * The time the guest is in an error state.
- */
- private val _errorTime = meter.counterBuilder("guest.time.error")
- .setDescription("The time the guest is in an error state")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
-
- suspend fun start() {
- when (state) {
- ServerState.TERMINATED, ServerState.ERROR -> {
- logger.info { "User requested to start server ${server.uid}" }
- launch()
- }
- ServerState.RUNNING -> return
- ServerState.DELETED -> {
- logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
- }
- else -> assert(false) { "Invalid state transition" }
+ private fun collectGuests(result: ObservableLongMeasurement) {
+ var terminated = 0L
+ var running = 0L
+ var error = 0L
+ var invalid = 0L
+
+ for ((_, guest) in guests) {
+ when (guest.state) {
+ ServerState.TERMINATED -> terminated++
+ ServerState.RUNNING -> running++
+ ServerState.ERROR -> error++
+ else -> invalid++
}
}
- suspend fun stop() {
- when (state) {
- ServerState.RUNNING, ServerState.ERROR -> {
- val job = job ?: throw IllegalStateException("Server should be active")
- job.cancel()
- job.join()
- }
- ServerState.TERMINATED, ServerState.DELETED -> return
- else -> assert(false) { "Invalid state transition" }
- }
- }
+ result.observe(terminated, terminatedState)
+ result.observe(running, runningState)
+ result.observe(error, errorState)
+ result.observe(invalid, invalidState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
- suspend fun terminate() {
- stop()
- machine.close()
- state = ServerState.DELETED
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ private fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit)
+
+ for (guest in guests.values) {
+ guest.collectCpuLimit(result)
}
+ }
- suspend fun fail() {
- if (state != ServerState.RUNNING) {
- return
- }
- stop()
- state = ServerState.ERROR
+ private var _lastCpuTimeCallback = clock.millis()
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastCpuTimeCallback
+
+ try {
+ collectCpuTime(duration, result)
+ } finally {
+ _lastCpuTimeCallback = now
}
+ }
- private var job: Job? = null
-
- private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
- assert(job == null) { "Concurrent job running" }
- val workload = mapper.createWorkload(server)
-
- job = scope.launch {
- try {
- delay(1) // TODO Introduce boot time
- init()
- cont.resume(Unit)
- } catch (e: Throwable) {
- cont.resumeWithException(e)
- }
- try {
- machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- job = null
- }
- }
+ private val _activeState = Attributes.of(STATE_KEY, "active")
+ private val _stealState = Attributes.of(STATE_KEY, "steal")
+ private val _lostState = Attributes.of(STATE_KEY, "lost")
+ private val _idleState = Attributes.of(STATE_KEY, "idle")
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = this.model.cpuCount
+ val d = coreCount / _cpuLimit
+
+ val counters = hypervisor.counters
+ val grantedWork = counters.actual
+ val overcommittedWork = counters.overcommit
+ val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+ val lostTime = (interferedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(lostTime, _lostState)
+
+ for (guest in guests.values) {
+ guest.collectCpuTime(duration, result)
}
+ }
- private fun init() {
- state = ServerState.RUNNING
- onGuestStart(this)
+ private var _lastPowerCallback = clock.millis()
+ private var _totalPower = 0.0
+
+ /**
+ * Helper function to collect the total power usage of the machine.
+ */
+ private fun collectPowerTotal(result: ObservableDoubleMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastPowerCallback
+
+ _totalPower += duration / 1000.0 * machine.powerDraw
+ result.observe(_totalPower)
+
+ _lastPowerCallback = now
+ }
+
+ private var _lastReport = clock.millis()
+
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(result: ObservableLongMeasurement? = null) {
+ val now = clock.millis()
+ val duration = now - _lastReport
+
+ try {
+ collectTime(duration, result)
+ } finally {
+ _lastReport = now
}
+ }
- private fun exit(cause: Throwable?) {
- state =
- if (cause == null)
- ServerState.TERMINATED
- else
- ServerState.ERROR
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.of(STATE_KEY, "up")
+ private val _downState = Attributes.of(STATE_KEY, "down")
- onGuestStop(this)
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == HostState.UP) {
+ _uptime += duration
+ } else if (state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
}
- private var _lastReport = clock.millis()
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
- fun reportTime() {
- if (state == ServerState.DELETED)
- return
+ for (guest in guests.values) {
+ guest.collectUptime(duration, result)
+ }
+ }
- val now = clock.millis()
- val duration = now - _lastReport
+ private var _bootTime = Long.MIN_VALUE
- _totalTime.add(duration)
- when (state) {
- ServerState.RUNNING -> _runningTime.add(duration)
- ServerState.ERROR -> _errorTime.add(duration)
- else -> {}
- }
+ /**
+ * Helper function to track the boot time of a machine.
+ */
+ private fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
- _lastReport = now
+ for (guest in guests.values) {
+ guest.collectBootTime(result)
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
new file mode 100644
index 00000000..90562e2f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -0,0 +1,305 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.*
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.SimAbstractMachine
+import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.roundToLong
+
+/**
+ * A virtual machine instance that is managed by a [SimHost].
+ */
+internal class Guest(
+ context: CoroutineContext,
+ private val clock: Clock,
+ val host: SimHost,
+ private val mapper: SimWorkloadMapper,
+ private val listener: GuestListener,
+ val server: Server,
+ val machine: SimMachine
+) {
+ /**
+ * The [CoroutineScope] of the guest.
+ */
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ /**
+ * The logger instance of this guest.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The state of the [Guest].
+ *
+ * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for
+ * a server.
+ */
+ var state: ServerState = ServerState.TERMINATED
+
+ /**
+ * The attributes of the guest.
+ */
+ val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, server.name)
+ .put(ResourceAttributes.HOST_ID, server.uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, server.flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString())
+ .build()
+
+ /**
+ * Start the guest.
+ */
+ suspend fun start() {
+ when (state) {
+ ServerState.TERMINATED, ServerState.ERROR -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ doStart()
+ }
+ ServerState.RUNNING -> return
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Stop the guest.
+ */
+ suspend fun stop() {
+ when (state) {
+ ServerState.RUNNING -> doStop(ServerState.TERMINATED)
+ ServerState.ERROR -> doRecover()
+ ServerState.TERMINATED, ServerState.DELETED -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Terminate the guest.
+ *
+ * This operation will stop the guest if it is running on the host and remove all resources associated with the
+ * guest.
+ */
+ suspend fun terminate() {
+ stop()
+
+ state = ServerState.DELETED
+
+ machine.close()
+ scope.cancel()
+ }
+
+ /**
+ * Fail the guest if it is active.
+ *
+ * This operation forcibly stops the guest and puts the server into an error state.
+ */
+ suspend fun fail() {
+ if (state != ServerState.RUNNING) {
+ return
+ }
+
+ doStop(ServerState.ERROR)
+ }
+
+ /**
+ * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
+ */
+ private var job: Job? = null
+
+ /**
+ * Launch the guest on the simulated
+ */
+ private suspend fun doStart() {
+ assert(job == null) { "Concurrent job running" }
+ val workload = mapper.createWorkload(server)
+
+ val job = scope.launch { runMachine(workload) }
+ this.job = job
+
+ state = ServerState.RUNNING
+ onStart()
+
+ job.invokeOnCompletion { cause ->
+ this.job = null
+ onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED)
+ }
+ }
+
+ /**
+ * Attempt to stop the server and put it into [target] state.
+ */
+ private suspend fun doStop(target: ServerState) {
+ assert(job != null) { "Invalid job state" }
+ val job = job ?: return
+ job.cancel()
+ job.join()
+
+ state = target
+ }
+
+ /**
+ * Attempt to recover from an error state.
+ */
+ private fun doRecover() {
+ state = ServerState.TERMINATED
+ }
+
+ /**
+ * Run the process that models the virtual machine lifecycle as a coroutine.
+ */
+ private suspend fun runMachine(workload: SimWorkload) {
+ delay(1) // TODO Introduce model for boot time
+ machine.run(workload, mapOf("driver" to host, "server" to server))
+ }
+
+ /**
+ * This method is invoked when the guest was started on the host and has booted into a running state.
+ */
+ private fun onStart() {
+ _bootTime = clock.millis()
+ state = ServerState.RUNNING
+ listener.onStart(this)
+ }
+
+ /**
+ * This method is invoked when the guest stopped.
+ */
+ private fun onStop(target: ServerState) {
+ state = target
+ listener.onStop(this)
+ }
+
+ private val STATE_KEY = AttributeKey.stringKey("state")
+
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "up")
+ .build()
+ private val _downState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "down")
+ .build()
+
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == ServerState.RUNNING) {
+ _uptime += duration
+ } else if (state == ServerState.ERROR) {
+ _downtime += duration
+ }
+
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
+ }
+
+ private var _bootTime = Long.MIN_VALUE
+
+ /**
+ * Helper function to track the boot time of the guest.
+ */
+ fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
+ }
+
+ private val _activeState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "active")
+ .build()
+ private val _stealState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "steal")
+ .build()
+ private val _lostState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "lost")
+ .build()
+ private val _idleState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "idle")
+ .build()
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = server.flavor.cpuCount
+ val d = coreCount / _cpuLimit
+
+ var grantedWork = 0.0
+ var overcommittedWork = 0.0
+
+ for (cpu in (machine as SimAbstractMachine).cpus) {
+ val counters = cpu.counters
+ grantedWork += counters.actual
+ overcommittedWork += counters.overcommit
+ }
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(0, _lostState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit, attributes)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
index cc58e5f1..e6d0fdad 100644
--- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
@@ -20,28 +20,19 @@
* SOFTWARE.
*/
-description = "Experiments for the OpenDC Energy work"
+package org.opendc.compute.simulator.internal
-/* Build configuration */
-plugins {
- `experiment-conventions`
- `testing-conventions`
-}
-
-dependencies {
- api(platform(projects.opendcPlatform))
- api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcSimulator.opendcSimulatorCore)
- implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcCompute.opendcComputeSimulator)
- implementation(projects.opendcExperiments.opendcExperimentsCapelin)
- implementation(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(projects.opendcTelemetry.opendcTelemetryCompute)
- implementation(libs.kotlin.logging)
- implementation(libs.config)
+/**
+ * Helper interface to listen for [Guest] events.
+ */
+internal interface GuestListener {
+ /**
+ * This method is invoked when the guest machine is running.
+ */
+ fun onStart(guest: Guest)
- implementation(libs.parquet) {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- }
+ /**
+ * This method is invoked when the guest machine is stopped.
+ */
+ fun onStop(guest: Guest)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 31215e9a..9c879e5e 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -23,11 +23,9 @@
package org.opendc.compute.simulator
import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -44,15 +42,20 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.HOST_ID
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.resume
/**
* Basic test-suite for the hypervisor.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
private lateinit var machineModel: MachineModel
@@ -71,24 +74,29 @@ internal class SimHostTest {
*/
@Test
fun testOvercommitted() = runBlockingSimulation {
- var requestedWork = 0L
- var grantedWork = 0L
- var overcommittedWork = 0L
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val virtDriver = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ meterProvider,
SimFairShareHypervisorProvider()
)
val duration = 5 * 60L
@@ -130,26 +138,17 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- metricsByName["cpu.work.total"]?.let {
- requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["cpu.work.granted"]?.let {
- grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ stealTime += data.cpuStealTime
}
- metricsByName["cpu.work.overcommit"]?.let {
- overcommittedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- return CompletableResultCode.ofSuccess()
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
- exportInterval = duration * 1000L
+ ),
+ exportInterval = Duration.ofSeconds(duration)
)
coroutineScope {
@@ -175,9 +174,9 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(4147200, requestedWork, "Requested work does not match") },
- { assertEquals(2107200, grantedWork, "Granted work does not match") },
- { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
+ { assertEquals(659, activeTime, "Active time does not match") },
+ { assertEquals(2342, idleTime, "Idle time does not match") },
+ { assertEquals(638, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
@@ -187,27 +186,32 @@ internal class SimHostTest {
*/
@Test
fun testFailure() = runBlockingSimulation {
- var requestedWork = 0L
- var grantedWork = 0L
- var totalTime = 0L
- var downTime = 0L
- var guestTotalTime = 0L
- var guestDownTime = 0L
-
+ var activeTime = 0L
+ var idleTime = 0L
+ var uptime = 0L
+ var downtime = 0L
+ var guestUptime = 0L
+ var guestDowntime = 0L
+
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val host = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ meterProvider,
SimFairShareHypervisorProvider()
)
val duration = 5 * 60L
@@ -233,35 +237,23 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- metricsByName["cpu.work.total"]?.let {
- requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ uptime += data.uptime
+ downtime += data.downtime
}
- metricsByName["cpu.work.granted"]?.let {
- grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["host.time.total"]?.let {
- totalTime = it.longSumData.points.first().value
- }
- metricsByName["host.time.down"]?.let {
- downTime = it.longSumData.points.first().value
- }
- metricsByName["guest.time.total"]?.let {
- guestTotalTime = it.longSumData.points.first().value
- }
- metricsByName["guest.time.error"]?.let {
- guestDownTime = it.longSumData.points.first().value
+
+ override fun record(data: ServerData) {
+ guestUptime += data.uptime
+ guestDowntime += data.downtime
}
- return CompletableResultCode.ofSuccess()
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
- exportInterval = duration * 1000L
+ ),
+ exportInterval = Duration.ofSeconds(duration)
)
coroutineScope {
@@ -289,12 +281,12 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(2226039, requestedWork, "Total time does not match") },
- { assertEquals(1086039, grantedWork, "Down time does not match") },
- { assertEquals(1200001, totalTime, "Total time does not match") },
- { assertEquals(1200001, guestTotalTime, "Guest total time does not match") },
- { assertEquals(5000, downTime, "Down time does not match") },
- { assertEquals(5000, guestDownTime, "Guest down time does not match") },
+ { assertEquals(2661, idleTime, "Idle time does not match") },
+ { assertEquals(339, activeTime, "Active time does not match") },
+ { assertEquals(1195001, uptime, "Uptime does not match") },
+ { assertEquals(5000, downtime, "Downtime does not match") },
+ { assertEquals(1195000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(5000, guestDowntime, "Guest downtime does not match") },
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
deleted file mode 100644
index 8227bca9..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import kotlinx.coroutines.*
-import org.apache.commons.math3.distribution.LogNormalDistribution
-import org.apache.commons.math3.random.Well19937c
-import org.opendc.compute.api.*
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.ReplayScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
-import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
-import org.opendc.compute.service.scheduler.weights.RamWeigher
-import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import org.opendc.compute.simulator.failure.StartStopHostFault
-import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import org.opendc.experiments.capelin.env.EnvironmentReader
-import org.opendc.experiments.capelin.trace.TraceReader
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.sdk.toOtelClock
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
-
-/**
- * Obtain the [FaultInjector] to use for the experiments.
- */
-fun createFaultInjector(
- context: CoroutineContext,
- clock: Clock,
- hosts: Set<SimHost>,
- seed: Int,
- failureInterval: Double
-): HostFaultInjector {
- val rng = Well19937c(seed)
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
-}
-
-/**
- * Construct the environment for a simulated compute service..
- */
-suspend fun withComputeService(
- clock: Clock,
- meterProvider: MeterProvider,
- environmentReader: EnvironmentReader,
- scheduler: ComputeScheduler,
- interferenceModel: VmInterferenceModel? = null,
- block: suspend CoroutineScope.(ComputeService) -> Unit
-): Unit = coroutineScope {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val hosts = environmentReader
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- coroutineContext,
- interpreter,
- meterProvider.get("opendc-compute-simulator"),
- SimFairShareHypervisorProvider(),
- powerDriver = SimplePowerDriver(def.powerModel),
- interferenceDomain = interferenceModel?.newDomain()
- )
- }
-
- val serviceMeter = meterProvider.get("opendc-compute")
- val service =
- ComputeService(coroutineContext, clock, serviceMeter, scheduler)
-
- for (host in hosts) {
- service.addHost(host)
- }
-
- try {
- block(this, service)
- } finally {
- service.close()
- hosts.forEach(SimHost::close)
- }
-}
-
-/**
- * Process the trace.
- */
-suspend fun processTrace(
- clock: Clock,
- reader: TraceReader<SimWorkload>,
- scheduler: ComputeService,
- monitor: ComputeMonitor? = null,
-) {
- val client = scheduler.newClient()
- val watcher = object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor?.onStateChange(clock.millis(), server, newState)
- }
- }
-
- // Create new image for the virtual machine
- val image = client.newImage("vm-image")
-
- try {
- coroutineScope {
- var offset = Long.MIN_VALUE
-
- while (reader.hasNext()) {
- val entry = reader.next()
-
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- // Make sure the trace entries are ordered by submission time
- assert(entry.start - offset >= 0) { "Invalid trace order" }
- delay(max(0, (entry.start - offset) - clock.millis()))
-
- launch {
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
-
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta + mapOf("workload" to workload)
- )
- server.watch(watcher)
-
- // Wait for the server reach its end time
- val endTime = entry.meta["end-time"] as Long
- delay(endTime + workloadOffset - clock.millis() + 1)
-
- // Delete the server after reaching the end-time of the virtual machine
- server.delete()
- }
- }
- }
-
- yield()
- } finally {
- reader.close()
- client.close()
- }
-}
-
-/**
- * Create a [MeterProvider] instance for the experiment.
- */
-fun createMeterProvider(clock: Clock): MeterProvider {
- return SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-}
-
-/**
- * Create a [ComputeScheduler] for the experiment.
- */
-fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
- val cpuAllocationRatio = 16.0
- val ramAllocationRatio = 1.5
- return when (allocationPolicy) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = 1.0))
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = -1.0))
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = -1.0))
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = emptyList(),
- subsetSize = Int.MAX_VALUE,
- random = java.util.Random(seeder.nextLong())
- )
- "replay" -> ReplayScheduler(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 82794471..6261ebbf 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,10 +23,7 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import mu.KotlinLogging
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
@@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.io.FileInputStream
+import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.asKotlinRandom
+import kotlin.math.roundToLong
/**
* A portfolio represents a collection of scenarios are tested for the work.
@@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) {
/**
* Perform a single trial for this portfolio.
*/
- @OptIn(ExperimentalCoroutinesApi::class)
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
- val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements)
-
- val meterProvider = createMeterProvider(clock)
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
} else {
listOf(workload.name)
}
-
val rawReaders = workloadNames.map { workloadName ->
traceReaders.computeIfAbsent(workloadName) {
logger.info { "Loading trace $workloadName" }
RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
}
}
-
+ val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
.read(FileInputStream(config.getString("interference-model")))
@@ -126,49 +122,44 @@ abstract class Portfolio(name: String) : Experiment(name) {
else
null
- val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
+ val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
+ val failureModel =
+ if (operationalPhenomena.failureFrequency > 0)
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
+ else
+ null
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environment.read(),
+ failureModel,
+ performanceInterferenceModel
+ )
val monitor = ParquetExportMonitor(
File(config.getString("output-path")),
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
- withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler ->
- val faultInjector = if (operationalPhenomena.failureFrequency > 0) {
- logger.debug("ENABLING failures")
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seeder.nextInt(),
- operationalPhenomena.failureFrequency,
- )
- } else {
- null
- }
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector?.start()
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
- }
-
- faultInjector?.close()
+ try {
+ simulator.run(trace)
+ } finally {
+ simulator.close()
+ metricReader.close()
monitor.close()
}
- val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0])
logger.debug {
- "Finish " +
- "SUBMIT=${monitorResults.instanceCount} " +
- "FAIL=${monitorResults.failedInstanceCount} " +
- "QUEUE=${monitorResults.queuedInstanceCount} " +
- "RUNNING=${monitorResults.activeHostCount}"
+ "Scheduler " +
+ "Success=${monitorResults.attemptsSuccess} " +
+ "Failure=${monitorResults.attemptsFailure} " +
+ "Error=${monitorResults.attemptsError} " +
+ "Pending=${monitorResults.serversPending} " +
+ "Active=${monitorResults.serversActive}"
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
new file mode 100644
index 00000000..a4676f31
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("AvroUtils")
+package org.opendc.experiments.capelin.export.parquet
+
+import org.apache.avro.LogicalTypes
+import org.apache.avro.Schema
+
+/**
+ * Schema for UUID type.
+ */
+internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
+
+/**
+ * Schema for timestamp type.
+ */
+internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
+
+/**
+ * Helper function to make a [Schema] field optional.
+ */
+internal fun Schema.optional(): Schema {
+ return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
index c5cb80e2..e3d15c3b 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
@@ -25,11 +25,12 @@ package org.opendc.experiments.capelin.export.parquet
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.util.parquet.LocalOutputFile
-import java.io.Closeable
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
@@ -38,50 +39,94 @@ import kotlin.concurrent.thread
/**
* A writer that writes data in Parquet format.
*/
-public open class ParquetDataWriter<in T>(
- private val path: File,
+abstract class ParquetDataWriter<in T>(
+ path: File,
private val schema: Schema,
- private val converter: (T, GenericData.Record) -> Unit,
- private val bufferSize: Int = 4096
-) : Runnable, Closeable {
+ bufferSize: Int = 4096
+) : AutoCloseable {
/**
* The logging instance to use.
*/
private val logger = KotlinLogging.logger {}
/**
- * The writer to write the Parquet file.
+ * The queue of commands to process.
*/
- private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
/**
- * The queue of commands to process.
+ * An exception to be propagated to the actual writer.
*/
- private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
+ private var exception: Throwable? = null
/**
* The thread that is responsible for writing the Parquet records.
*/
- private val writerThread = thread(start = false, name = this.toString()) { run() }
+ private val writerThread = thread(start = false, name = this.toString()) {
+ val writer = let {
+ val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ buildWriter(builder)
+ }
+
+ val queue = queue
+ val buf = mutableListOf<T>()
+ var shouldStop = false
+
+ try {
+ while (!shouldStop) {
+ try {
+ process(writer, queue.take())
+ } catch (e: InterruptedException) {
+ shouldStop = true
+ }
+
+ if (queue.drainTo(buf) > 0) {
+ for (data in buf) {
+ process(writer, data)
+ }
+ buf.clear()
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error(e) { "Failure in Parquet data writer" }
+ exception = e
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * Build the [ParquetWriter] used to write the Parquet files.
+ */
+ protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder.build()
+ }
+
+ /**
+ * Convert the specified [data] into a Parquet record.
+ */
+ protected abstract fun convert(builder: GenericRecordBuilder, data: T)
/**
* Write the specified metrics to the database.
*/
- public fun write(event: T) {
- queue.put(Action.Write(event))
+ fun write(data: T) {
+ val exception = exception
+ if (exception != null) {
+ throw IllegalStateException("Writer thread failed", exception)
+ }
+
+ queue.put(data)
}
/**
* Signal the writer to stop.
*/
- public override fun close() {
- queue.put(Action.Stop)
+ override fun close() {
+ writerThread.interrupt()
writerThread.join()
}
@@ -90,38 +135,11 @@ public open class ParquetDataWriter<in T>(
}
/**
- * Start the writer thread.
+ * Process the specified [data] to be written to the Parquet file.
*/
- override fun run() {
- try {
- loop@ while (true) {
- val action = queue.take()
- when (action) {
- is Action.Stop -> break@loop
- is Action.Write<*> -> {
- val record = GenericData.Record(schema)
- @Suppress("UNCHECKED_CAST")
- converter(action.data as T, record)
- writer.write(record)
- }
- }
- }
- } catch (e: Throwable) {
- logger.error("Writer failed", e)
- } finally {
- writer.close()
- }
- }
-
- public sealed class Action {
- /**
- * A poison pill that will stop the writer thread.
- */
- public object Stop : Action()
-
- /**
- * Write the specified metrics to the database.
- */
- public data class Write<out T>(val data: T) : Action()
+ private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
+ val builder = GenericRecordBuilder(schema)
+ convert(builder, data)
+ writer.write(builder.build())
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
index 79b84e9d..b057e932 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
@@ -24,22 +24,33 @@ package org.opendc.experiments.capelin.export.parquet
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.compute.table.ServiceData
import java.io.File
/**
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+ private val serverWriter = ParquetServerDataWriter(
+ File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
private val hostWriter = ParquetHostDataWriter(
File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
)
+
private val serviceWriter = ParquetServiceDataWriter(
File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
)
+ override fun record(data: ServerData) {
+ serverWriter.write(data)
+ }
+
override fun record(data: HostData) {
hostWriter.write(data)
}
@@ -51,5 +62,6 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int
override fun close() {
hostWriter.close()
serviceWriter.close()
+ serverWriter.close()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
index 8912c12e..58388cb1 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -25,6 +25,9 @@ package org.opendc.experiments.capelin.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.telemetry.compute.table.HostData
import java.io.File
@@ -32,42 +35,67 @@ import java.io.File
* A Parquet event writer for [HostData]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostData>(path, schema, convert, bufferSize) {
+ ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
- override fun toString(): String = "host-writer"
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun convert(builder: GenericRecordBuilder, data: HostData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
- public companion object {
- private val convert: (HostData, GenericData.Record) -> Unit = { data, record ->
- record.put("host_id", data.host.name)
- record.put("state", data.host.state.name)
- record.put("timestamp", data.timestamp)
- record.put("total_work", data.totalWork)
- record.put("granted_work", data.grantedWork)
- record.put("overcommitted_work", data.overcommittedWork)
- record.put("interfered_work", data.interferedWork)
- record.put("cpu_usage", data.cpuUsage)
- record.put("cpu_demand", data.cpuDemand)
- record.put("power_draw", data.powerDraw)
- record.put("instance_count", data.instanceCount)
- record.put("cores", data.host.model.cpuCount)
+ builder["host_id"] = data.host.id
+
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ builder["boot_time"] = bootTime.toEpochMilli()
}
- private val schema: Schema = SchemaBuilder
+ builder["cpu_count"] = data.host.cpuCount
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["mem_limit"] = data.host.memCapacity
+
+ builder["power_total"] = data.powerTotal
+
+ builder["guests_terminated"] = data.guestsTerminated
+ builder["guests_running"] = data.guestsRunning
+ builder["guests_error"] = data.guestsError
+ builder["guests_invalid"] = data.guestsInvalid
+ }
+
+ override fun toString(): String = "host-writer"
+
+ companion object {
+ private val SCHEMA: Schema = SchemaBuilder
.record("host")
.namespace("org.opendc.telemetry.compute")
.fields()
- .name("timestamp").type().longType().noDefault()
- .name("host_id").type().stringType().noDefault()
- .name("state").type().stringType().noDefault()
- .name("requested_work").type().longType().noDefault()
- .name("granted_work").type().longType().noDefault()
- .name("overcommitted_work").type().longType().noDefault()
- .name("interfered_work").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("instance_count").type().intType().noDefault()
- .name("cores").type().intType().noDefault()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA).noDefault()
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
+ .requiredDouble("power_total")
+ .requiredInt("guests_terminated")
+ .requiredInt("guests_running")
+ .requiredInt("guests_error")
+ .requiredInt("guests_invalid")
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
new file mode 100644
index 00000000..43b5f469
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.telemetry.compute.table.ServerData
+import java.io.File
+import java.util.*
+
+/**
+ * A Parquet event writer for [ServerData]s.
+ */
+public class ParquetServerDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
+
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("server_id", true)
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun convert(builder: GenericRecordBuilder, data: ServerData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+
+ builder["server_id"] = data.server.id
+ builder["host_id"] = data.host?.id
+
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ builder["boot_time"] = bootTime.toEpochMilli()
+ }
+ builder["scheduling_latency"] = data.schedulingLatency
+
+ builder["cpu_count"] = data.server.cpuCount
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["mem_limit"] = data.server.memCapacity
+ }
+
+ override fun toString(): String = "server-writer"
+
+ companion object {
+ private val SCHEMA: Schema = SchemaBuilder
+ .record("server")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("server_id").type(UUID_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
+ .requiredLong("scheduling_latency")
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
+ .endRecord()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
index 36d630f3..2928f445 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
@@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
import org.opendc.telemetry.compute.table.ServiceData
import java.io.File
@@ -32,34 +32,34 @@ import java.io.File
* A Parquet event writer for [ServiceData]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) {
+ ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
- override fun toString(): String = "service-writer"
+ override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+ builder["hosts_up"] = data.hostsUp
+ builder["hosts_down"] = data.hostsDown
+ builder["servers_pending"] = data.serversPending
+ builder["servers_active"] = data.serversActive
+ builder["attempts_success"] = data.attemptsSuccess
+ builder["attempts_failure"] = data.attemptsFailure
+ builder["attempts_error"] = data.attemptsError
+ }
- public companion object {
- private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record ->
- record.put("timestamp", data.timestamp)
- record.put("host_total_count", data.hostCount)
- record.put("host_available_count", data.activeHostCount)
- record.put("instance_total_count", data.instanceCount)
- record.put("instance_active_count", data.runningInstanceCount)
- record.put("instance_inactive_count", data.finishedInstanceCount)
- record.put("instance_waiting_count", data.queuedInstanceCount)
- record.put("instance_failed_count", data.failedInstanceCount)
- }
+ override fun toString(): String = "service-writer"
- private val schema: Schema = SchemaBuilder
+ companion object {
+ private val SCHEMA: Schema = SchemaBuilder
.record("service")
.namespace("org.opendc.telemetry.compute")
.fields()
- .name("timestamp").type().longType().noDefault()
- .name("host_total_count").type().intType().noDefault()
- .name("host_available_count").type().intType().noDefault()
- .name("instance_total_count").type().intType().noDefault()
- .name("instance_active_count").type().intType().noDefault()
- .name("instance_inactive_count").type().intType().noDefault()
- .name("instance_waiting_count").type().intType().noDefault()
- .name("instance_failed_count").type().intType().noDefault()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .requiredInt("hosts_up")
+ .requiredInt("hosts_down")
+ .requiredInt("servers_pending")
+ .requiredInt("servers_active")
+ .requiredInt("attempts_success")
+ .requiredInt("attempts_failure")
+ .requiredInt("attempts_error")
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt
new file mode 100644
index 00000000..3b7c3f0f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeSchedulers")
+package org.opendc.experiments.capelin.util
+
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (allocationPolicy) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
new file mode 100644
index 00000000..065a8c93
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.util
+
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.env.MachineDef
+import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.*
+import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.max
+
+/**
+ * Helper class to manage a [ComputeService] simulation.
+ */
+class ComputeServiceSimulator(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ scheduler: ComputeScheduler,
+ machines: List<MachineDef>,
+ private val failureModel: FailureModel? = null,
+ interferenceModel: VmInterferenceModel? = null,
+ hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider()
+) : AutoCloseable {
+ /**
+ * The [ComputeService] that has been configured by the manager.
+ */
+ val service: ComputeService
+
+ /**
+ * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
+ */
+ val producers: List<MetricProducer>
+ get() = _metricProducers
+ private val _metricProducers = mutableListOf<MetricProducer>()
+
+ /**
+ * The [SimResourceInterpreter] to simulate the hosts.
+ */
+ private val interpreter = SimResourceInterpreter(context, clock)
+
+ /**
+ * The hosts that belong to this class.
+ */
+ private val hosts = mutableSetOf<SimHost>()
+
+ init {
+ val (service, serviceMeterProvider) = createService(scheduler)
+ this._metricProducers.add(serviceMeterProvider)
+ this.service = service
+
+ for (def in machines) {
+ val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel)
+ this._metricProducers.add(hostMeterProvider)
+ hosts.add(host)
+ this.service.addHost(host)
+ }
+ }
+
+ /**
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
+ */
+ suspend fun run(reader: TraceReader<SimWorkload>) {
+ val injector = failureModel?.createInjector(context, clock, service)
+ val client = service.newClient()
+
+ // Create new image for the virtual machine
+ val image = client.newImage("vm-image")
+
+ try {
+ coroutineScope {
+ // Start the fault injector
+ injector?.start()
+
+ var offset = Long.MIN_VALUE
+
+ while (reader.hasNext()) {
+ val entry = reader.next()
+
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ // Make sure the trace entries are ordered by submission time
+ assert(entry.start - offset >= 0) { "Invalid trace order" }
+ delay(max(0, (entry.start - offset) - clock.millis()))
+
+ launch {
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+
+ val server = client.newServer(
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta + mapOf("workload" to workload)
+ )
+
+ // Wait for the server reach its end time
+ val endTime = entry.meta["end-time"] as Long
+ delay(endTime + workloadOffset - clock.millis() + 1)
+
+ // Delete the server after reaching the end-time of the virtual machine
+ server.delete()
+ }
+ }
+ }
+
+ yield()
+ } finally {
+ injector?.close()
+ reader.close()
+ client.close()
+ }
+ }
+
+ override fun close() {
+ service.close()
+
+ for (host in hosts) {
+ host.close()
+ }
+
+ hosts.clear()
+ }
+
+ /**
+ * Construct a [ComputeService] instance.
+ */
+ private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val service = ComputeService(context, clock, meterProvider, scheduler)
+ return service to meterProvider
+ }
+
+ /**
+ * Construct a [SimHost] instance for the specified [MachineDef].
+ */
+ private fun createHost(
+ def: MachineDef,
+ hypervisorProvider: SimHypervisorProvider,
+ interferenceModel: VmInterferenceModel? = null
+ ): Pair<SimHost, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(HOST_ID, def.uid.toString())
+ .put(HOST_NAME, def.name)
+ .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(HOST_NCPUS, def.model.cpus.size)
+ .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size })
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val host = SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ context,
+ interpreter,
+ meterProvider,
+ hypervisorProvider,
+ powerDriver = SimplePowerDriver(def.powerModel),
+ interferenceDomain = interferenceModel?.newDomain()
+ )
+
+ return host to meterProvider
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
new file mode 100644
index 00000000..83393896
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.util
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
+ */
+interface FailureModel {
+ /**
+ * Construct a [HostFaultInjector] for the specified [service].
+ */
+ fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
new file mode 100644
index 00000000..89b4a31c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("FailureModels")
+package org.opendc.experiments.capelin
+
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.StartStopHostFault
+import org.opendc.compute.simulator.failure.StochasticVictimSelector
+import org.opendc.experiments.capelin.util.FailureModel
+import java.time.Clock
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+import kotlin.random.Random
+
+/**
+ * Obtain a [FailureModel] based on the GRID'5000 failure trace.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
+ return object : FailureModel {
+ override fun createInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ service: ComputeService
+ ): HostFaultInjector {
+ val rng = Well19937c(seed)
+ val hosts = service.hosts.map { it as SimHost }.toSet()
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+ }
+
+ override fun toString(): String = "Grid5000FailureModel"
+ }
+}
+
+/**
+ * Obtain the [HostFaultInjector] to use for the experiments.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+fun createFaultInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ hosts: Set<SimHost>,
+ seed: Int,
+ failureInterval: Double
+): HostFaultInjector {
+ val rng = Well19937c(seed)
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 44cf92a8..727530e3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
@@ -40,14 +38,17 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
import java.util.*
/**
@@ -60,11 +61,20 @@ class CapelinIntegrationTest {
private lateinit var monitor: TestExperimentReporter
/**
+ * The [FilterScheduler] to use for all experiments.
+ */
+ private lateinit var computeScheduler: FilterScheduler
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
fun setUp() {
monitor = TestExperimentReporter()
+ computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
}
/**
@@ -72,45 +82,46 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- val meterProvider = createMeterProvider(clock)
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") },
- { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
- { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
- { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
- { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
- { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
- { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } },
- { assertEquals(1.7671768767192196E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
+ { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
+ { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
+ { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -120,41 +131,41 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -164,10 +175,6 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
@@ -177,34 +184,39 @@ class CapelinIntegrationTest {
.read(perfInterferenceInput)
.let { VmInterferenceModel(it, Random(seed.toLong())) }
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ interferenceModel = performanceInterferenceModel
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -214,53 +226,43 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val faultInjector =
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seed,
- 24.0 * 7,
- )
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector.start()
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
-
- faultInjector.close()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ grid5000(Duration.ofDays(7), seed)
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } }
)
}
@@ -284,18 +286,20 @@ class CapelinIntegrationTest {
}
class TestExperimentReporter : ComputeMonitor {
- var totalWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
- var totalInterferedWork = 0L
- var totalPowerDraw = 0.0
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ var lostTime = 0L
+ var energyUsage = 0.0
+ var uptime = 0L
override fun record(data: HostData) {
- this.totalWork += data.totalWork.toLong()
- totalGrantedWork += data.grantedWork.toLong()
- totalOvercommittedWork += data.overcommittedWork.toLong()
- totalInterferedWork += data.interferedWork.toLong()
- totalPowerDraw += data.powerDraw
+ idleTime += data.cpuIdleTime
+ activeTime += data.cpuActiveTime
+ stealTime += data.cpuStealTime
+ lostTime += data.cpuLostTime
+ energyUsage += data.powerTotal
+ uptime += data.uptime
}
}
}
diff --git a/opendc-experiments/opendc-experiments-energy21/.gitignore b/opendc-experiments/opendc-experiments-energy21/.gitignore
deleted file mode 100644
index 55da79f8..00000000
--- a/opendc-experiments/opendc-experiments-energy21/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-input/
-output/
-.ipynb_checkpoints
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
deleted file mode 100644
index d9194969..00000000
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.energy21
-
-import com.typesafe.config.ConfigFactory
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.coroutineScope
-import mu.KotlinLogging
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
-import org.opendc.compute.simulator.SimHost
-import org.opendc.experiments.capelin.*
-import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
-import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader
-import org.opendc.harness.dsl.Experiment
-import org.opendc.harness.dsl.anyOf
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.*
-import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
-import java.io.File
-import java.time.Clock
-import java.util.*
-
-/**
- * Experiments for the OpenDC project on Energy modeling.
- */
-public class EnergyExperiment : Experiment("Energy Modeling 2021") {
- /**
- * The logger for this portfolio instance.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The configuration to use.
- */
- private val config = ConfigFactory.load().getConfig("opendc.experiments.energy21")
-
- /**
- * The traces to test.
- */
- private val trace by anyOf("solvinity")
-
- /**
- * The power models to test.
- */
- private val powerModel by anyOf(PowerModelType.LINEAR, PowerModelType.CUBIC, PowerModelType.INTERPOLATION)
-
- override fun doRun(repeat: Int): Unit = runBlockingSimulation {
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
- weighers = listOf(),
- subsetSize = Int.MAX_VALUE
- )
-
- val meterProvider: MeterProvider = createMeterProvider(clock)
- val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096)
- val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace))
-
- withComputeService(clock, meterProvider, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
- }
- }
-
- val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
- logger.debug {
- "Finish SUBMIT=${monitorResults.instanceCount} " +
- "FAIL=${monitorResults.failedInstanceCount} " +
- "QUEUE=${monitorResults.queuedInstanceCount} " +
- "RUNNING=${monitorResults.runningInstanceCount}"
- }
- }
-
- /**
- * Construct the environment for a simulated compute service..
- */
- public suspend fun withComputeService(
- clock: Clock,
- meterProvider: MeterProvider,
- scheduler: ComputeScheduler,
- block: suspend CoroutineScope.(ComputeService) -> Unit
- ): Unit = coroutineScope {
- val model = createMachineModel()
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val hosts = List(64) { id ->
- SimHost(
- UUID(0, id.toLong()),
- "node-$id",
- model,
- emptyMap(),
- coroutineContext,
- interpreter,
- meterProvider.get("opendc-compute-simulator"),
- SimFairShareHypervisorProvider(),
- PerformanceScalingGovernor(),
- powerModel.driver
- )
- }
-
- val serviceMeter = meterProvider.get("opendc-compute")
- val service =
- ComputeService(coroutineContext, clock, serviceMeter, scheduler)
-
- for (host in hosts) {
- service.addHost(host)
- }
-
- try {
- block(this, service)
- } finally {
- service.close()
- hosts.forEach(SimHost::close)
- }
- }
-
- /**
- * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html
- */
- private fun createMachineModel(): MachineModel {
- val node = ProcessingNode("AMD", "am64", "EPYC 7742", 64)
- val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) }
- val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) }
-
- return MachineModel(cpus, memory)
- }
-
- /**
- * The power models to test.
- */
- public enum class PowerModelType {
- CUBIC {
- override val driver: PowerDriver = SimplePowerDriver(CubicPowerModel(206.0, 56.4))
- },
-
- LINEAR {
- override val driver: PowerDriver = SimplePowerDriver(LinearPowerModel(206.0, 56.4))
- },
-
- SQRT {
- override val driver: PowerDriver = SimplePowerDriver(SqrtPowerModel(206.0, 56.4))
- },
-
- SQUARE {
- override val driver: PowerDriver = SimplePowerDriver(SquarePowerModel(206.0, 56.4))
- },
-
- INTERPOLATION {
- override val driver: PowerDriver = SimplePowerDriver(
- InterpolationPowerModel(
- listOf(56.4, 100.0, 107.0, 117.0, 127.0, 138.0, 149.0, 162.0, 177.0, 191.0, 206.0)
- )
- )
- },
-
- MSE {
- override val driver: PowerDriver = SimplePowerDriver(MsePowerModel(206.0, 56.4, 1.4))
- },
-
- ASYMPTOTIC {
- override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, false))
- },
-
- ASYMPTOTIC_DVFS {
- override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, true))
- };
-
- public abstract val driver: PowerDriver
- }
-}
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf b/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf
deleted file mode 100644
index 263da0fe..00000000
--- a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf
+++ /dev/null
@@ -1,14 +0,0 @@
-# Default configuration for the energy experiments
-opendc.experiments.energy21 {
- # Path to the directory containing the input traces
- trace-path = input/traces
-
- # Path to the output directory to write the results to
- output-path = output
-}
-
-opendc.experiments.capelin {
- env-path = input/environments/
- trace-path = input/traces/
- output-path = output
-}
diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
index 650416f5..3312d6c0 100644
--- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
+++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
@@ -46,6 +46,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
+import java.time.Duration
import java.util.*
import kotlin.math.max
@@ -85,7 +86,7 @@ public class ServerlessExperiment : Experiment("Serverless") {
val delayInjector = StochasticDelayInjector(coldStartModel, Random())
val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) }
val service =
- FaaSService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10L * 60 * 1000))
+ FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10)))
val client = service.newClient()
coroutineScope {
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts
index 63bed8bc..6f4fcc9b 100644
--- a/opendc-faas/opendc-faas-service/build.gradle.kts
+++ b/opendc-faas/opendc-faas-service/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
api(projects.opendcTelemetry.opendcTelemetryApi)
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
+ implementation(libs.opentelemetry.semconv)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testRuntimeOnly(libs.log4j.slf4j)
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 7e716a34..1d5331cb 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -23,6 +23,7 @@
package org.opendc.faas.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
import org.opendc.faas.service.deployer.FunctionDeployer
@@ -51,7 +52,7 @@ public interface FaaSService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
- * @param meter The meter to report metrics to.
+ * @param meterProvider The [MeterProvider] to create a [Meter] with.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
* @param terminationPolicy The policy for terminating function instances.
@@ -59,12 +60,12 @@ public interface FaaSService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
terminationPolicy: FunctionTerminationPolicy,
): FaaSService {
- return FaaSServiceImpl(context, clock, meter, deployer, routingPolicy, terminationPolicy)
+ return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy)
}
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
index a1cb1dbf..54df2b59 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
@@ -28,6 +28,7 @@ import io.opentelemetry.api.metrics.BoundLongCounter
import io.opentelemetry.api.metrics.BoundLongHistogram
import io.opentelemetry.api.metrics.BoundLongUpDownCounter
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.faas.service.deployer.FunctionInstance
import java.util.*
@@ -43,9 +44,14 @@ public class FunctionObject(
meta: Map<String, Any>
) : AutoCloseable {
/**
- * The function identifier attached to the metrics.
+ * The attributes of this function.
*/
- private val functionId = AttributeKey.stringKey("function")
+ public val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.FAAS_ID, uid.toString())
+ .put(ResourceAttributes.FAAS_NAME, name)
+ .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory)
+ .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" })
+ .build()
/**
* The total amount of function invocations received by the function.
@@ -54,7 +60,7 @@ public class FunctionObject(
.setDescription("Number of function invocations")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of function invocations that could be handled directly.
@@ -63,7 +69,7 @@ public class FunctionObject(
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of function invocations that were delayed due to function deployment.
@@ -72,7 +78,7 @@ public class FunctionObject(
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of function invocations that failed.
@@ -81,7 +87,7 @@ public class FunctionObject(
.setDescription("Number of function invocations that failed")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of instances for this function.
@@ -90,7 +96,7 @@ public class FunctionObject(
.setDescription("Number of active function instances")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of idle instances for this function.
@@ -99,7 +105,7 @@ public class FunctionObject(
.setDescription("Number of idle function instances")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The time that the function waited.
@@ -109,7 +115,7 @@ public class FunctionObject(
.setDescription("Time the function has to wait before being started")
.setUnit("ms")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The time that the function was running.
@@ -119,7 +125,7 @@ public class FunctionObject(
.setDescription("Time the function was running")
.setUnit("ms")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The instances associated with this function.
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
index 1e224ed1..63dbadc7 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -26,6 +26,7 @@ import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceState
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -36,7 +37,7 @@ import kotlin.coroutines.CoroutineContext
public class FunctionTerminationPolicyFixed(
context: CoroutineContext,
clock: Clock,
- public val timeout: Long
+ public val timeout: Duration
) : FunctionTerminationPolicy {
/**
* The [TimerScheduler] used to schedule the function terminations.
@@ -60,6 +61,6 @@ public class FunctionTerminationPolicyFixed(
* Schedule termination for the specified [instance].
*/
private fun schedule(instance: FunctionInstance) {
- scheduler.startSingleTimer(instance, delay = timeout) { instance.close() }
+ scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() }
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index ccf9a5d9..3b560cd3 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.faas.service.internal
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import mu.KotlinLogging
@@ -54,7 +55,7 @@ import kotlin.coroutines.resumeWithException
internal class FaaSServiceImpl(
context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ private val meterProvider: MeterProvider,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
private val terminationPolicy: FunctionTerminationPolicy
@@ -70,6 +71,11 @@ internal class FaaSServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] that collects the metrics of this service.
+ */
+ private val meter = meterProvider.get("org.opendc.faas.service")
+
+ /**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
index 6b99684a..1612e10b 100644
--- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
@@ -44,8 +44,7 @@ internal class FaaSServiceTest {
@Test
fun testClientState() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -59,8 +58,7 @@ internal class FaaSServiceTest {
@Test
fun testClientInvokeUnknown() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -69,8 +67,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCreation() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -81,8 +78,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionQuery() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -95,8 +91,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindById() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -109,8 +104,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindByName() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -123,8 +117,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDuplicateName() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -135,8 +128,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDelete() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -150,8 +142,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -163,9 +154,8 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionInvoke() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
val deployer = mockk<FunctionDeployer>()
- val service = FaaSService(coroutineContext, clock, meter, deployer, mockk(), mockk(relaxUnitFun = true))
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true))
every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index 64f2551b..0dc9ba87 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -43,6 +43,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import java.time.Duration
/**
* A test suite for the [FaaSService] implementation under simulated conditions.
@@ -64,14 +65,13 @@ internal class SimFaaSServiceTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) {
override suspend fun invoke() {}
})
val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload }
val service = FaaSService(
- coroutineContext, clock, meter, deployer, RandomRoutingPolicy(),
- FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10000)
+ coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(),
+ FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000))
)
val client = service.newClient()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 266db0dd..f9db048d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -49,22 +49,22 @@ public abstract class SimAbstractMachine(
/**
* The resources allocated for this machine.
*/
- protected abstract val cpus: List<SimProcessingUnit>
+ public abstract val cpus: List<SimProcessingUnit>
/**
* The memory interface of the machine.
*/
- protected val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory)
+ public val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory)
/**
* The network interfaces available to the machine.
*/
- protected val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) }
+ public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) }
/**
* The network interfaces available to the machine.
*/
- protected val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) }
+ public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) }
/**
* The peripherals of the machine.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index af28c346..3b49d515 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -64,7 +64,7 @@ public interface SimHypervisor : SimWorkload {
*/
public fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
index 8dea0045..1f010338 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
@@ -70,14 +70,14 @@ internal class SimHypervisorTest {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
- totalRequestedWork += requestedWork
+ totalRequestedWork += totalWork
totalGrantedWork += grantedWork
totalOvercommittedWork += overcommittedWork
}
@@ -128,14 +128,14 @@ internal class SimHypervisorTest {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
- totalRequestedWork += requestedWork
+ totalRequestedWork += totalWork
totalGrantedWork += grantedWork
totalOvercommittedWork += overcommittedWork
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
index 6a3de9bc..cd8cb57a 100644
--- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
+++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
@@ -31,7 +31,6 @@ dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
new file mode 100644
index 00000000..e9449634
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -0,0 +1,448 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.data.PointData
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import org.opendc.telemetry.compute.table.*
+import java.time.Instant
+import kotlin.math.roundToLong
+
+/**
+ * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData].
+ */
+public class ComputeMetricAggregator {
+ private val _service = ServiceAggregator()
+ private val _hosts = mutableMapOf<String, HostAggregator>()
+ private val _servers = mutableMapOf<String, ServerAggregator>()
+
+ /**
+ * Process the specified [metrics] for this cycle.
+ */
+ public fun process(metrics: Collection<MetricData>) {
+ val service = _service
+ val hosts = _hosts
+ val servers = _servers
+
+ for (metric in metrics) {
+ val resource = metric.resource
+
+ when (metric.name) {
+ // ComputeService
+ "scheduler.hosts" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> service.hostsUp = point.value.toInt()
+ "down" -> service.hostsDown = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.servers" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "pending" -> service.serversPending = point.value.toInt()
+ "active" -> service.serversActive = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.attempts" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[RESULT_KEY]) {
+ "success" -> service.attemptsSuccess = point.value.toInt()
+ "failure" -> service.attemptsFailure = point.value.toInt()
+ "error" -> service.attemptsError = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.latency" -> {
+ for (point in metric.doubleHistogramData.points) {
+ val server = getServer(servers, point) ?: continue
+ server.schedulingLatency = (point.sum / point.count).roundToLong()
+ }
+ }
+
+ // SimHost
+ "system.guests" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "terminated" -> agg.guestsTerminated = point.value.toInt()
+ "running" -> agg.guestsRunning = point.value.toInt()
+ "error" -> agg.guestsRunning = point.value.toInt()
+ "invalid" -> agg.guestsInvalid = point.value.toInt()
+ }
+ }
+ }
+ "system.cpu.limit" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.doubleGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.cpuLimit = point.value
+ server.host = agg.host
+ } else {
+ agg.cpuLimit = point.value
+ }
+ }
+ }
+ "system.cpu.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.demand" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuDemand = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.utilization" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUtilization = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+ val state = point.attributes[STATE_KEY]
+ if (server != null) {
+ when (state) {
+ "active" -> server.cpuActiveTime = point.value
+ "idle" -> server.cpuIdleTime = point.value
+ "steal" -> server.cpuStealTime = point.value
+ "lost" -> server.cpuLostTime = point.value
+ }
+ server.host = agg.host
+ } else {
+ when (state) {
+ "active" -> agg.cpuActiveTime = point.value
+ "idle" -> agg.cpuIdleTime = point.value
+ "steal" -> agg.cpuStealTime = point.value
+ "lost" -> agg.cpuLostTime = point.value
+ }
+ }
+ }
+ }
+ "system.power.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.power.total" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerTotal = metric.doubleSumData.points.first().value
+ }
+ "system.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> server.uptime = point.value
+ "down" -> server.downtime = point.value
+ }
+ server.host = agg.host
+ } else {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> agg.uptime = point.value
+ "down" -> agg.downtime = point.value
+ }
+ }
+ }
+ }
+ "system.time.boot" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.bootTime = point.value
+ server.host = agg.host
+ } else {
+ agg.bootTime = point.value
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Collect the data via the [monitor].
+ */
+ public fun collect(now: Instant, monitor: ComputeMonitor) {
+ monitor.record(_service.collect(now))
+
+ for (host in _hosts.values) {
+ monitor.record(host.collect(now))
+ }
+
+ for (server in _servers.values) {
+ monitor.record(server.collect(now))
+ }
+ }
+
+ /**
+ * Obtain the [HostAggregator] for the specified [resource].
+ */
+ private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? {
+ val id = resource.attributes[HOST_ID]
+ return if (id != null) {
+ hosts.computeIfAbsent(id) { HostAggregator(resource) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Obtain the [ServerAggregator] for the specified [point].
+ */
+ private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? {
+ val id = point.attributes[ResourceAttributes.HOST_ID]
+ return if (id != null) {
+ servers.computeIfAbsent(id) { ServerAggregator(point.attributes) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ internal class ServiceAggregator {
+ @JvmField var hostsUp = 0
+ @JvmField var hostsDown = 0
+
+ @JvmField var serversPending = 0
+ @JvmField var serversActive = 0
+
+ @JvmField var attemptsSuccess = 0
+ @JvmField var attemptsFailure = 0
+ @JvmField var attemptsError = 0
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): ServiceData = toServiceData(now)
+
+ /**
+ * Convert the aggregator state to an immutable [ServiceData].
+ */
+ private fun toServiceData(now: Instant): ServiceData {
+ return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ internal class HostAggregator(resource: Resource) {
+ /**
+ * The static information about this host.
+ */
+ val host = HostInfo(
+ resource.attributes[HOST_ID]!!,
+ resource.attributes[HOST_NAME] ?: "",
+ resource.attributes[HOST_ARCH] ?: "",
+ resource.attributes[HOST_NCPUS]?.toInt() ?: 0,
+ resource.attributes[HOST_MEM_CAPACITY] ?: 0,
+ )
+
+ @JvmField var guestsTerminated = 0
+ @JvmField var guestsRunning = 0
+ @JvmField var guestsError = 0
+ @JvmField var guestsInvalid = 0
+
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuUsage = 0.0
+ @JvmField var cpuDemand = 0.0
+ @JvmField var cpuUtilization = 0.0
+
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ @JvmField var powerUsage = 0.0
+ @JvmField var powerTotal = 0.0
+ private var previousPowerTotal = 0.0
+
+ @JvmField var uptime = 0L
+ private var previousUptime = 0L
+ @JvmField var downtime = 0L
+ private var previousDowntime = 0L
+ @JvmField var bootTime = Long.MIN_VALUE
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): HostData {
+ val data = toHostData(now)
+
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+ previousPowerTotal = powerTotal
+ previousUptime = uptime
+ previousDowntime = downtime
+
+ guestsTerminated = 0
+ guestsRunning = 0
+ guestsError = 0
+ guestsInvalid = 0
+
+ cpuLimit = 0.0
+ cpuUsage = 0.0
+ cpuDemand = 0.0
+ cpuUtilization = 0.0
+
+ powerUsage = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state to an immutable [HostData] instance.
+ */
+ private fun toHostData(now: Instant): HostData {
+ return HostData(
+ now,
+ host,
+ guestsTerminated,
+ guestsRunning,
+ guestsError,
+ guestsInvalid,
+ cpuLimit,
+ cpuUsage,
+ cpuDemand,
+ cpuUtilization,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime,
+ powerUsage,
+ powerTotal - previousPowerTotal,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null
+ )
+ }
+ }
+
+ /**
+ * An aggregator for server metrics before they are reported.
+ */
+ internal class ServerAggregator(attributes: Attributes) {
+ /**
+ * The static information about this server.
+ */
+ val server = ServerInfo(
+ attributes[ResourceAttributes.HOST_ID]!!,
+ attributes[ResourceAttributes.HOST_NAME]!!,
+ attributes[ResourceAttributes.HOST_TYPE]!!,
+ attributes[ResourceAttributes.HOST_ARCH]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_ID]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_NAME]!!,
+ attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(),
+ attributes[AttributeKey.longKey("host.mem_capacity")]!!,
+ )
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ var host: HostInfo? = null
+
+ @JvmField var uptime: Long = 0
+ private var previousUptime = 0L
+ @JvmField var downtime: Long = 0
+ private var previousDowntime = 0L
+ @JvmField var bootTime: Long = 0
+ @JvmField var schedulingLatency = 0L
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): ServerData {
+ val data = toServerData(now)
+
+ previousUptime = uptime
+ previousDowntime = downtime
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+
+ host = null
+ cpuLimit = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state into an immutable [ServerData].
+ */
+ private fun toServerData(now: Instant): ServerData {
+ return ServerData(
+ now,
+ server,
+ host,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null,
+ schedulingLatency,
+ cpuLimit,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime
+ )
+ }
+ }
+
+ private companion object {
+ private val STATE_KEY = AttributeKey.stringKey("state")
+ private val RESULT_KEY = AttributeKey.stringKey("result")
+ }
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index 95e7ff9e..ea96f721 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -23,126 +23,29 @@
package org.opendc.telemetry.compute
import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.data.*
import io.opentelemetry.sdk.metrics.export.MetricExporter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
-import org.opendc.compute.service.driver.Host
-import org.opendc.telemetry.compute.table.HostData
import java.time.Clock
/**
* A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
-public class ComputeMetricExporter(
- private val clock: Clock,
- private val hosts: Map<String, Host>,
- private val monitor: ComputeMonitor
-) : MetricExporter {
+public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter {
+ /**
+ * A [ComputeMetricAggregator] that actually performs the aggregation.
+ */
+ private val agg = ComputeMetricAggregator()
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
- reportHostMetrics(metrics)
- reportServiceMetrics(metrics)
+ agg.process(metrics)
+ agg.collect(clock.instant(), monitor)
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
CompletableResultCode.ofFailure()
}
}
- private var lastHostMetrics: Map<String, HBuffer> = emptyMap()
- private val hostMetricsSingleton = HBuffer()
-
- private fun reportHostMetrics(metrics: Collection<MetricData>) {
- val hostMetrics = mutableMapOf<String, HBuffer>()
-
- for (metric in metrics) {
- when (metric.name) {
- "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v }
- "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v }
- "power.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.powerDraw = v }
- "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v }
- "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v }
- "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v }
- "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v }
- "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() }
- "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v }
- "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v }
- }
- }
-
- for ((id, hostMetric) in hostMetrics) {
- val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton)
- val host = hosts[id] ?: continue
-
- monitor.record(
- HostData(
- clock.millis(),
- host,
- hostMetric.totalWork - lastHostMetric.totalWork,
- hostMetric.grantedWork - lastHostMetric.grantedWork,
- hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
- hostMetric.interferedWork - lastHostMetric.interferedWork,
- hostMetric.cpuUsage,
- hostMetric.cpuDemand,
- hostMetric.instanceCount,
- hostMetric.powerDraw,
- hostMetric.uptime - lastHostMetric.uptime,
- hostMetric.downtime - lastHostMetric.downtime,
- )
- )
- }
-
- lastHostMetrics = hostMetrics
- }
-
- private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) {
- val points = data.doubleSummaryData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
- val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
- block(hostMetric, avg)
- }
- }
-
- private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Long) -> Unit) {
- val points = data?.longSumData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
- block(hostMetric, point.value)
- }
- }
-
- private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) {
- val points = data?.doubleSumData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
- block(hostMetric, point.value)
- }
- }
-
- /**
- * A buffer for host metrics before they are reported.
- */
- private class HBuffer {
- var totalWork: Double = 0.0
- var grantedWork: Double = 0.0
- var overcommittedWork: Double = 0.0
- var interferedWork: Double = 0.0
- var cpuUsage: Double = 0.0
- var cpuDemand: Double = 0.0
- var instanceCount: Int = 0
- var powerDraw: Double = 0.0
- var uptime: Long = 0
- var downtime: Long = 0
- }
-
- private fun reportServiceMetrics(metrics: Collection<MetricData>) {
- monitor.record(extractServiceMetrics(clock.millis(), metrics))
- }
-
override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
index ec303b37..d51bcab4 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
@@ -22,10 +22,6 @@
package org.opendc.telemetry.compute
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.compute.table.ServiceData
@@ -35,16 +31,6 @@ import org.opendc.telemetry.compute.table.ServiceData
*/
public interface ComputeMonitor {
/**
- * This method is invoked when the state of a [Server] changes.
- */
- public fun onStateChange(timestamp: Long, server: Server, newState: ServerState) {}
-
- /**
- * This method is invoked when the state of a [Host] changes.
- */
- public fun onStateChange(time: Long, host: Host, newState: HostState) {}
-
- /**
* Record the specified [data].
*/
public fun record(data: ServerData) {}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index d3d983b9..25d346fb 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -24,88 +24,29 @@ package org.opendc.telemetry.compute
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.coroutineScope
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostListener
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.table.ServiceData
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
-import java.time.Clock
-
-/**
- * Attach the specified monitor to the OpenDC Compute service.
- */
-public suspend fun withMonitor(
- scheduler: ComputeService,
- clock: Clock,
- metricProducer: MetricProducer,
- monitor: ComputeMonitor,
- exportInterval: Long = 5L * 60 * 1000, /* Every 5 min (which is the granularity of the workload trace) */
- block: suspend CoroutineScope.() -> Unit
-): Unit = coroutineScope {
- // Monitor host events
- for (host in scheduler.hosts) {
- monitor.onStateChange(clock.millis(), host, HostState.UP)
- host.addListener(object : HostListener {
- override fun onStateChanged(host: Host, newState: HostState) {
- monitor.onStateChange(clock.millis(), host, newState)
- }
- })
- }
-
- val reader = CoroutineMetricReader(
- this,
- listOf(metricProducer),
- ComputeMetricExporter(clock, scheduler.hosts.associateBy { it.uid.toString() }, monitor),
- exportInterval
- )
-
- try {
- block(this)
- } finally {
- reader.close()
- }
-}
+import java.time.Instant
/**
* Collect the metrics of the compute service.
*/
-public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData {
+public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData {
return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics())
}
/**
* Extract a [ServiceData] object from the specified list of metric data.
*/
-public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData {
- var submittedVms = 0
- var queuedVms = 0
- var unscheduledVms = 0
- var runningVms = 0
- var finishedVms = 0
- var hosts = 0
- var availableHosts = 0
-
- for (metric in metrics) {
- val points = metric.longSumData.points
-
- if (points.isEmpty()) {
- continue
- }
-
- val value = points.first().value.toInt()
- when (metric.name) {
- "servers.submitted" -> submittedVms = value
- "servers.waiting" -> queuedVms = value
- "servers.unscheduled" -> unscheduledVms = value
- "servers.active" -> runningVms = value
- "servers.finished" -> finishedVms = value
- "hosts.total" -> hosts = value
- "hosts.available" -> availableHosts = value
+public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData {
+ lateinit var serviceData: ServiceData
+ val agg = ComputeMetricAggregator()
+ val monitor = object : ComputeMonitor {
+ override fun record(data: ServiceData) {
+ serviceData = data
}
}
- return ServiceData(timestamp, hosts, availableHosts, submittedVms, runningVms, finishedVms, queuedVms, unscheduledVms)
+ agg.process(metrics)
+ agg.collect(timestamp, monitor)
+ return serviceData
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt
new file mode 100644
index 00000000..7dca6186
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("HostAttributes")
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.api.common.AttributeKey
+
+/**
+ * The identifier of the node hosting virtual machines.
+ */
+public val HOST_ID: AttributeKey<String> = AttributeKey.stringKey("node.id")
+
+/**
+ * The name of the node hosting virtual machines.
+ */
+public val HOST_NAME: AttributeKey<String> = AttributeKey.stringKey("node.name")
+
+/**
+ * The CPU architecture of the host node.
+ */
+public val HOST_ARCH: AttributeKey<String> = AttributeKey.stringKey("node.arch")
+
+/**
+ * The number of CPUs in the host node.
+ */
+public val HOST_NCPUS: AttributeKey<Long> = AttributeKey.longKey("node.num_cpus")
+
+/**
+ * The amount of memory installed in the host node in MiB.
+ */
+public val HOST_MEM_CAPACITY: AttributeKey<Long> = AttributeKey.longKey("node.mem_capacity")
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
index 8e6c34d0..8e787b97 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
@@ -22,22 +22,29 @@
package org.opendc.telemetry.compute.table
-import org.opendc.compute.service.driver.Host
+import java.time.Instant
/**
* A trace entry for a particular host.
*/
public data class HostData(
- public val timestamp: Long,
- public val host: Host,
- public val totalWork: Double,
- public val grantedWork: Double,
- public val overcommittedWork: Double,
- public val interferedWork: Double,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val instanceCount: Int,
- public val powerDraw: Double,
- public val uptime: Long,
- public val downtime: Long,
+ val timestamp: Instant,
+ val host: HostInfo,
+ val guestsTerminated: Int,
+ val guestsRunning: Int,
+ val guestsError: Int,
+ val guestsInvalid: Int,
+ val cpuLimit: Double,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val cpuUtilization: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
+ val powerUsage: Double,
+ val powerTotal: Double,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?
)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt
new file mode 100644
index 00000000..d9a5906b
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+/**
+ * Information about a host exposed to the telemetry service.
+ */
+public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
index 2a9fa8a6..c48bff3a 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
@@ -22,14 +22,22 @@
package org.opendc.telemetry.compute.table
-import org.opendc.compute.api.Server
+import java.time.Instant
/**
* A trace entry for a particular server.
*/
public data class ServerData(
- public val timestamp: Long,
- public val server: Server,
- public val uptime: Long,
- public val downtime: Long,
+ val timestamp: Instant,
+ val server: ServerInfo,
+ val host: HostInfo?,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?,
+ val schedulingLatency: Long,
+ val cpuLimit: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt
new file mode 100644
index 00000000..b16e5f3d
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+/**
+ * Static information about a server exposed to the telemetry service.
+ */
+public data class ServerInfo(
+ val id: String,
+ val name: String,
+ val type: String,
+ val arch: String,
+ val imageId: String,
+ val imageName: String,
+ val cpuCount: Int,
+ val memCapacity: Long
+)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
index f6ff5db5..6db1399d 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -22,16 +22,18 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for the compute service.
*/
public data class ServiceData(
- public val timestamp: Long,
- public val hostCount: Int,
- public val activeHostCount: Int,
- public val instanceCount: Int,
- public val runningInstanceCount: Int,
- public val finishedInstanceCount: Int,
- public val queuedInstanceCount: Int,
- public val failedInstanceCount: Int
+ val timestamp: Instant,
+ val hostsUp: Int,
+ val hostsDown: Int,
+ val serversPending: Int,
+ val serversActive: Int,
+ val attemptsSuccess: Int,
+ val attemptsFailure: Int,
+ val attemptsError: Int
)
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index 9ee16fac..07f0ff7f 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -26,14 +26,8 @@ import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.consumeAsFlow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
-import java.util.*
-import kotlin.coroutines.resume
-import kotlin.coroutines.suspendCoroutine
+import java.time.Duration
/**
* A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every
@@ -44,56 +38,44 @@ import kotlin.coroutines.suspendCoroutine
* @param scope The [CoroutineScope] to run the reader in.
* @param producers The metric producers to gather metrics from.
* @param exporter The export to export the metrics to.
- * @param exportInterval The export interval in milliseconds.
+ * @param exportInterval The export interval.
*/
public class CoroutineMetricReader(
scope: CoroutineScope,
private val producers: List<MetricProducer>,
private val exporter: MetricExporter,
- private val exportInterval: Long = 60_000
+ private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
- private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS)
/**
- * The metric reader job.
+ * The background job that is responsible for collecting the metrics every cycle.
*/
- private val readerJob = scope.launch {
+ private val job = scope.launch {
+ val intervalMs = exportInterval.toMillis()
+
while (isActive) {
- delay(exportInterval)
+ delay(intervalMs)
val metrics = mutableListOf<MetricData>()
for (producer in producers) {
metrics.addAll(producer.collectAllMetrics())
}
- chan.send(Collections.unmodifiableList(metrics))
- }
- }
- /**
- * The exporter job runs in the background to actually export the metrics.
- */
- private val exporterJob = chan.consumeAsFlow()
- .onEach { metrics ->
- suspendCoroutine<Unit> { cont ->
- try {
- val result = exporter.export(metrics)
- result.whenComplete {
- if (!result.isSuccess) {
- logger.trace { "Exporter failed" }
- }
- cont.resume(Unit)
+ try {
+ val result = exporter.export(metrics)
+ result.whenComplete {
+ if (!result.isSuccess) {
+ logger.trace { "Exporter failed" }
}
- } catch (cause: Throwable) {
- logger.warn(cause) { "Exporter threw an Exception" }
- cont.resume(Unit)
}
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Exporter threw an Exception" }
}
}
- .launchIn(scope)
+ }
override fun close() {
- readerJob.cancel()
- exporterJob.cancel()
+ job.cancel()
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index b565e90d..483558e1 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -26,12 +26,8 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import mu.KotlinLogging
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.env.MachineDef
@@ -39,6 +35,8 @@ import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -46,18 +44,17 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
-import org.opendc.telemetry.sdk.toOtelClock
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
import org.opendc.web.client.model.Scenario
import org.opendc.web.client.model.Topology
import java.io.File
import java.net.URI
+import java.time.Duration
import java.util.*
-import kotlin.random.Random
-import kotlin.random.asJavaRandom
import org.opendc.web.client.model.Portfolio as ClientPortfolio
private val logger = KotlinLogging.logger {}
@@ -158,7 +155,7 @@ class RunnerCli : CliktCommand(name = "runner") {
val results = (0 until targets.repeatsPerScenario).map { repeat ->
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
- val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) }
+ val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
runRepeat(scenario, repeat, environment, traceReader, interferenceModel)
}
}
@@ -182,63 +179,51 @@ class RunnerCli : CliktCommand(name = "runner") {
try {
runBlockingSimulation {
- val seed = repeat
val workloadName = scenario.trace.traceId
val workloadFraction = scenario.trace.loadSamplingFraction
- val seeder = Random(seed)
-
- val meterProvider: MeterProvider = SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
- val metricProducer = meterProvider as MetricProducer
+ val seeder = Random(repeat.toLong())
val operational = scenario.operationalPhenomena
- val allocationPolicy = createComputeScheduler(operational.schedulerName, seeder)
+ val computeScheduler = createComputeScheduler(operational.schedulerName, seeder)
val trace = ParquetTraceReader(
listOf(traceReader),
Workload(workloadName, workloadFraction),
- seed
+ repeat
)
- val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0
-
- withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler ->
- val faultInjector = if (failureFrequency > 0) {
- logger.debug { "ENABLING failures" }
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seeder.nextInt(),
- failureFrequency,
- )
- } else {
+ val failureModel =
+ if (operational.failuresEnabled)
+ grid5000(Duration.ofDays(7), repeat)
+ else
null
- }
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector?.start()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environment.read(),
+ failureModel,
+ interferenceModel.takeIf { operational.performanceInterferenceEnabled }
+ )
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1))
- faultInjector?.close()
- }
+ try {
+ simulator.run(trace)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val monitorResults = collectServiceMetrics(clock.millis(), metricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
logger.debug {
- "Finish " +
- "SUBMIT=${monitorResults.instanceCount} " +
- "FAIL=${monitorResults.failedInstanceCount} " +
- "QUEUE=${monitorResults.queuedInstanceCount} " +
- "RUNNING=${monitorResults.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
}
}
} catch (cause: Throwable) {
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
index e0e3488f..a0c281e8 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
@@ -65,10 +65,10 @@ public class ScenarioManager(private val client: ApiClient) {
client.updateJob(
id, SimulationState.FINISHED,
mapOf(
- "total_requested_burst" to results.map { it.totalWork },
- "total_granted_burst" to results.map { it.totalGrantedWork },
- "total_overcommitted_burst" to results.map { it.totalOvercommittedWork },
- "total_interfered_burst" to results.map { it.totalInterferedWork },
+ "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime },
+ "total_granted_burst" to results.map { it.totalActiveTime },
+ "total_overcommitted_burst" to results.map { it.totalStealTime },
+ "total_interfered_burst" to results.map { it.totalLostTime },
"mean_cpu_usage" to results.map { it.meanCpuUsage },
"mean_cpu_demand" to results.map { it.meanCpuDemand },
"mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
index c8e58dde..bb412738 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
@@ -22,60 +22,51 @@
package org.opendc.web.runner
-import mu.KotlinLogging
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.compute.table.ServiceData
import kotlin.math.max
+import kotlin.math.roundToLong
/**
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
-public class WebComputeMonitor : ComputeMonitor {
- private val logger = KotlinLogging.logger {}
-
- override fun onStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
+class WebComputeMonitor : ComputeMonitor {
override fun record(data: HostData) {
- val duration = 5 * 60 * 1000L
- val slices = duration / SLICE_LENGTH
+ val slices = data.downtime / SLICE_LENGTH
hostAggregateMetrics = AggregateHostMetrics(
- hostAggregateMetrics.totalWork + data.totalWork,
- hostAggregateMetrics.totalGrantedWork + data.grantedWork,
- hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork,
- hostAggregateMetrics.totalInterferedWork + data.overcommittedWork,
- hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600,
- hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0
+ hostAggregateMetrics.totalActiveTime + data.cpuActiveTime,
+ hostAggregateMetrics.totalIdleTime + data.cpuIdleTime,
+ hostAggregateMetrics.totalStealTime + data.cpuStealTime,
+ hostAggregateMetrics.totalLostTime + data.cpuLostTime,
+ hostAggregateMetrics.totalPowerDraw + data.powerTotal,
+ hostAggregateMetrics.totalFailureSlices + slices,
+ hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices
)
- hostMetrics.compute(data.host) { _, prev ->
+ hostMetrics.compute(data.host.id) { _, prev ->
HostMetrics(
- (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
- data.instanceCount + (prev?.instanceCount ?: 0),
+ data.cpuUsage + (prev?.cpuUsage ?: 0.0),
+ data.cpuDemand + (prev?.cpuDemand ?: 0.0),
+ data.guestsRunning + (prev?.instanceCount ?: 0),
1 + (prev?.count ?: 0)
)
}
}
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
- private val SLICE_LENGTH: Long = 5 * 60 * 1000
+ private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf()
+ private val SLICE_LENGTH: Long = 5 * 60
data class AggregateHostMetrics(
- val totalWork: Double = 0.0,
- val totalGrantedWork: Double = 0.0,
- val totalOvercommittedWork: Double = 0.0,
- val totalInterferedWork: Double = 0.0,
+ val totalActiveTime: Long = 0L,
+ val totalIdleTime: Long = 0L,
+ val totalStealTime: Long = 0L,
+ val totalLostTime: Long = 0L,
val totalPowerDraw: Double = 0.0,
- val totalFailureSlices: Long = 0,
- val totalFailureVmSlices: Long = 0,
+ val totalFailureSlices: Double = 0.0,
+ val totalFailureVmSlices: Double = 0.0,
)
data class HostMetrics(
@@ -89,15 +80,15 @@ public class WebComputeMonitor : ComputeMonitor {
override fun record(data: ServiceData) {
serviceMetrics = AggregateServiceMetrics(
- max(data.instanceCount, serviceMetrics.vmTotalCount),
- max(data.queuedInstanceCount, serviceMetrics.vmWaitingCount),
- max(data.runningInstanceCount, serviceMetrics.vmActiveCount),
- max(data.finishedInstanceCount, serviceMetrics.vmInactiveCount),
- max(data.failedInstanceCount, serviceMetrics.vmFailedCount),
+ max(data.attemptsSuccess, serviceMetrics.vmTotalCount),
+ max(data.serversPending, serviceMetrics.vmWaitingCount),
+ max(data.serversActive, serviceMetrics.vmActiveCount),
+ max(0, serviceMetrics.vmInactiveCount),
+ max(data.attemptsFailure, serviceMetrics.vmFailedCount),
)
}
- public data class AggregateServiceMetrics(
+ data class AggregateServiceMetrics(
val vmTotalCount: Int = 0,
val vmWaitingCount: Int = 0,
val vmActiveCount: Int = 0,
@@ -105,19 +96,19 @@ public class WebComputeMonitor : ComputeMonitor {
val vmFailedCount: Int = 0
)
- public fun getResult(): Result {
+ fun getResult(): Result {
return Result(
- hostAggregateMetrics.totalWork,
- hostAggregateMetrics.totalGrantedWork,
- hostAggregateMetrics.totalOvercommittedWork,
- hostAggregateMetrics.totalInterferedWork,
+ hostAggregateMetrics.totalActiveTime,
+ hostAggregateMetrics.totalIdleTime,
+ hostAggregateMetrics.totalStealTime,
+ hostAggregateMetrics.totalLostTime,
hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(),
hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
hostAggregateMetrics.totalPowerDraw,
- hostAggregateMetrics.totalFailureSlices,
- hostAggregateMetrics.totalFailureVmSlices,
+ hostAggregateMetrics.totalFailureSlices.roundToLong(),
+ hostAggregateMetrics.totalFailureVmSlices.roundToLong(),
serviceMetrics.vmTotalCount,
serviceMetrics.vmWaitingCount,
serviceMetrics.vmInactiveCount,
@@ -126,10 +117,10 @@ public class WebComputeMonitor : ComputeMonitor {
}
data class Result(
- val totalWork: Double,
- val totalGrantedWork: Double,
- val totalOvercommittedWork: Double,
- val totalInterferedWork: Double,
+ val totalActiveTime: Long,
+ val totalIdleTime: Long,
+ val totalStealTime: Long,
+ val totalLostTime: Long,
val meanCpuUsage: Double,
val meanCpuDemand: Double,
val meanNumDeployedImages: Double,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index d3358ef1..a0248a93 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,7 +22,7 @@
package org.opendc.workflow.service
-import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -62,7 +62,7 @@ public interface WorkflowService : AutoCloseable {
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param tracer The event tracer to use.
- * @param meter The meter to use.
+ * @param meterProvider The meter provider to use.
* @param compute The compute client to use.
* @param mode The scheduling mode to use.
* @param jobAdmissionPolicy The job admission policy to use.
@@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
compute: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable {
return WorkflowServiceImpl(
context,
clock,
- meter,
+ meterProvider,
compute,
mode,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 5329143d..a0fd3fad 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.workflow.service.internal
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.map
import mu.KotlinLogging
@@ -48,7 +49,7 @@ import kotlin.coroutines.resume
public class WorkflowServiceImpl(
context: CoroutineContext,
internal val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val computeClient: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -67,6 +68,11 @@ public class WorkflowServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to collect metrics of this service.
+ */
+ private val meter = meterProvider.get("org.opendc.workflow.service")
+
+ /**
* The incoming jobs ready to be processed by the scheduler.
*/
internal val incomingJobs: MutableSet<JobState> = linkedSetOf()
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 07433d1f..74316437 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -51,6 +51,7 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
+import java.time.Duration
import java.util.*
/**
@@ -79,24 +80,23 @@ internal class WorkflowServiceTest {
emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ MeterProvider.noop(),
hvProvider,
)
}
- val meter = MeterProvider.noop().get("opendc-compute")
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
- val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000)
+ val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
hosts.forEach { compute.addHost(it) }
val scheduler = WorkflowService(
coroutineContext,
clock,
- meterProvider.get("opendc-workflow"),
+ meterProvider,
compute.newClient(),
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 60e67e2b..933febe0 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -31,7 +31,6 @@ include(":opendc-faas:opendc-faas-api")
include(":opendc-faas:opendc-faas-service")
include(":opendc-faas:opendc-faas-simulator")
include(":opendc-experiments:opendc-experiments-capelin")
-include(":opendc-experiments:opendc-experiments-energy21")
include(":opendc-experiments:opendc-experiments-serverless20")
include(":opendc-experiments:opendc-experiments-tf20")
include(":opendc-web:opendc-web-api")