summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt10
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt26
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt18
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt3
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt81
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt43
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt54
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt256
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt67
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt7
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt8
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt86
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt222
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt97
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt190
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt3
-rw-r--r--opendc-faas/opendc-faas-service/build.gradle.kts1
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt7
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt26
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt5
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt8
-rw-r--r--opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt30
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt6
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/build.gradle.kts1
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt295
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt14
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt44
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt51
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt4
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt28
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt5
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt37
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt72
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt40
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt8
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt8
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt8
38 files changed, 1148 insertions, 759 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 1873eb99..2a1fbaa0 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -23,11 +23,13 @@
package org.opendc.compute.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
* @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
scheduler: ComputeScheduler,
- schedulingQuantum: Long = 300000,
+ schedulingQuantum: Duration = Duration.ofMinutes(5),
): ComputeService {
- return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index f1c055d4..824becf4 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,9 +22,8 @@
package org.opendc.compute.service.internal
-import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -35,6 +34,7 @@ import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -42,15 +42,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use.
- * @param clock The clock instance to keep track of time.
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
+ * @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val scheduler: ComputeScheduler,
- private val schedulingQuantum: Long
+ private val schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -63,6 +66,11 @@ internal class ComputeServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the [ComputeService].
+ */
+ private val meter = meterProvider.get("org.opendc.compute.service")
+
+ /**
* The [Random] instance used to generate unique identifiers for the objects.
*/
private val random = Random(0)
@@ -365,10 +373,12 @@ internal class ComputeServiceImpl(
return
}
+ val quantum = schedulingQuantum.toMillis()
+
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
+ val delay = quantum - (clock.millis() % quantum)
timerScheduler.startSingleTimer(Unit, delay) {
doSchedule()
@@ -414,7 +424,7 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
_waitingServers.add(-1)
- _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString()))
+ _schedulerDuration.record(now - request.submitTime, server.attributes)
logger.info { "Assigned server $server to host $host." }
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index d9d0f3fc..05a7e1bf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -22,6 +22,9 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
@@ -50,6 +53,21 @@ internal class InternalServer(
private val watchers = mutableListOf<ServerWatcher>()
/**
+ * The attributes of a server.
+ */
+ internal val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, name)
+ .put(ResourceAttributes.HOST_ID, uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString())
+ .build()
+
+ /**
* The [Host] that has been assigned to host the server.
*/
internal var host: Host? = null
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index d036ec00..564f9493 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -61,8 +61,7 @@ internal class ComputeServiceTest {
filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
weighers = listOf(RamWeigher())
)
- val meter = MeterProvider.noop().get("opendc-compute")
- service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler)
+ service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler)
}
@Test
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index 28fd8217..dfd3bc67 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -47,8 +47,9 @@ class InternalServerTest {
fun testEquality() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
+
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
@@ -59,8 +60,8 @@ class InternalServerTest {
fun testEqualityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -73,8 +74,8 @@ class InternalServerTest {
fun testInequalityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -87,8 +88,8 @@ class InternalServerTest {
fun testInequalityWithIncorrectType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
assertNotEquals(a, Unit)
@@ -98,8 +99,8 @@ class InternalServerTest {
fun testStartTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) }
@@ -114,8 +115,8 @@ class InternalServerTest {
fun testStartDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -127,8 +128,8 @@ class InternalServerTest {
fun testStartProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.PROVISIONING
@@ -142,8 +143,8 @@ class InternalServerTest {
fun testStartRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.RUNNING
@@ -157,8 +158,8 @@ class InternalServerTest {
fun testStopProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val request = ComputeServiceImpl.SchedulingRequest(server, 0)
@@ -175,8 +176,8 @@ class InternalServerTest {
fun testStopTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -189,8 +190,8 @@ class InternalServerTest {
fun testStopDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -203,8 +204,8 @@ class InternalServerTest {
fun testStopRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -220,8 +221,8 @@ class InternalServerTest {
fun testDeleteProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val request = ComputeServiceImpl.SchedulingRequest(server, 0)
@@ -239,8 +240,8 @@ class InternalServerTest {
fun testDeleteTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -255,8 +256,8 @@ class InternalServerTest {
fun testDeleteDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -269,8 +270,8 @@ class InternalServerTest {
fun testDeleteRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -282,4 +283,20 @@ class InternalServerTest {
coVerify { host.delete(server) }
verify { service.delete(server) }
}
+
+ private fun mockFlavor(): InternalFlavor {
+ val flavor = mockk<InternalFlavor>()
+ every { flavor.name } returns "c5.large"
+ every { flavor.uid } returns UUID.randomUUID()
+ every { flavor.cpuCount } returns 2
+ every { flavor.memorySize } returns 4096
+ return flavor
+ }
+
+ private fun mockImage(): InternalImage {
+ val image = mockk<InternalImage>()
+ every { image.name } returns "ubuntu-20.04"
+ every { image.uid } returns UUID.randomUUID()
+ return image
+ }
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index a1cc3390..be6ef11e 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -25,6 +25,7 @@ package org.opendc.compute.simulator
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.*
import mu.KotlinLogging
@@ -59,7 +60,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
interpreter: SimResourceInterpreter,
- private val meter: Meter,
+ meterProvider: MeterProvider,
hypervisor: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
@@ -82,6 +83,11 @@ public class SimHost(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the simulated host.
+ */
+ private val meter = meterProvider.get("org.opendc.compute.simulator")
+
+ /**
* The event listeners registered with this host.
*/
private val listeners = mutableListOf<HostListener>()
@@ -142,10 +148,9 @@ public class SimHost(
* The total number of guests.
*/
private val _guests = meter.upDownCounterBuilder("guests.total")
- .setDescription("Number of guests")
+ .setDescription("Total number of guests")
.setUnit("1")
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The number of active guests on the host.
@@ -154,7 +159,6 @@ public class SimHost(
.setDescription("Number of active guests")
.setUnit("1")
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The CPU demand of the host.
@@ -163,7 +167,6 @@ public class SimHost(
.setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
.setUnit("MHz")
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The CPU usage of the host.
@@ -172,7 +175,6 @@ public class SimHost(
.setDescription("The amount of CPU resources used by the host")
.setUnit("MHz")
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The power usage of the host.
@@ -181,7 +183,6 @@ public class SimHost(
.setDescription("The amount of power used by the CPU")
.setUnit("W")
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The total amount of work supplied to the CPU.
@@ -191,7 +192,6 @@ public class SimHost(
.setUnit("1")
.ofDoubles()
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The work performed by the CPU.
@@ -201,7 +201,6 @@ public class SimHost(
.setUnit("1")
.ofDoubles()
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The amount not performed by the CPU due to overcommitment.
@@ -211,7 +210,6 @@ public class SimHost(
.setUnit("1")
.ofDoubles()
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The amount of work not performed by the CPU due to interference.
@@ -221,7 +219,6 @@ public class SimHost(
.setUnit("1")
.ofDoubles()
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The amount of time in the system.
@@ -230,7 +227,6 @@ public class SimHost(
.setDescription("The amount of time in the system")
.setUnit("ms")
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The uptime of the host.
@@ -239,7 +235,6 @@ public class SimHost(
.setDescription("The uptime of the host")
.setUnit("ms")
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The downtime of the host.
@@ -248,7 +243,6 @@ public class SimHost(
.setDescription("The downtime of the host")
.setUnit("ms")
.build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
init {
// Launch hypervisor onto machine
@@ -391,13 +385,28 @@ public class SimHost(
var state: ServerState = ServerState.TERMINATED
/**
+ * The attributes of the guest.
+ */
+ val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, server.name)
+ .put(ResourceAttributes.HOST_ID, server.uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, server.flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString())
+ .build()
+
+ /**
* The amount of time in the system.
*/
private val _totalTime = meter.counterBuilder("guest.time.total")
.setDescription("The amount of time in the system")
.setUnit("ms")
.build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
+ .bind(attributes)
/**
* The uptime of the guest.
@@ -406,7 +415,7 @@ public class SimHost(
.setDescription("The uptime of the guest")
.setUnit("ms")
.build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
+ .bind(attributes)
/**
* The time the guest is in an error state.
@@ -415,7 +424,7 @@ public class SimHost(
.setDescription("The time the guest is in an error state")
.setUnit("ms")
.build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
+ .bind(attributes)
suspend fun start() {
when (state) {
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 9fa8af34..318b5a5d 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -49,6 +49,7 @@ import org.opendc.telemetry.sdk.toOtelClock
import java.time.Duration
import java.util.*
import kotlin.coroutines.resume
+import kotlin.math.roundToLong
/**
* Basic test-suite for the hypervisor.
@@ -72,7 +73,7 @@ internal class SimHostTest {
*/
@Test
fun testOvercommitted() = runBlockingSimulation {
- var requestedWork = 0L
+ var totalWork = 0L
var grantedWork = 0L
var overcommittedWork = 0L
@@ -89,7 +90,7 @@ internal class SimHostTest {
meta = emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ meterProvider,
SimFairShareHypervisorProvider()
)
val duration = 5 * 60L
@@ -134,15 +135,10 @@ internal class SimHostTest {
object : MetricExporter {
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
val metricsByName = metrics.associateBy { it.name }
- metricsByName["cpu.work.total"]?.let {
- requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["cpu.work.granted"]?.let {
- grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["cpu.work.overcommit"]?.let {
- overcommittedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
+
+ totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong()
+ grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong()
+ overcommittedWork = metricsByName.getValue("cpu.work.overcommit").doubleSumData.points.first().value.roundToLong()
return CompletableResultCode.ofSuccess()
}
@@ -176,7 +172,7 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(4147200, requestedWork, "Requested work does not match") },
+ { assertEquals(4147200, totalWork, "Requested work does not match") },
{ assertEquals(2107200, grantedWork, "Granted work does not match") },
{ assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
{ assertEquals(1500001, clock.millis()) }
@@ -188,7 +184,7 @@ internal class SimHostTest {
*/
@Test
fun testFailure() = runBlockingSimulation {
- var requestedWork = 0L
+ var totalWork = 0L
var grantedWork = 0L
var totalTime = 0L
var downTime = 0L
@@ -208,7 +204,7 @@ internal class SimHostTest {
meta = emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ meterProvider,
SimFairShareHypervisorProvider()
)
val duration = 5 * 60L
@@ -237,24 +233,14 @@ internal class SimHostTest {
object : MetricExporter {
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
val metricsByName = metrics.associateBy { it.name }
- metricsByName["cpu.work.total"]?.let {
- requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["cpu.work.granted"]?.let {
- grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["host.time.total"]?.let {
- totalTime = it.longSumData.points.first().value
- }
- metricsByName["host.time.down"]?.let {
- downTime = it.longSumData.points.first().value
- }
- metricsByName["guest.time.total"]?.let {
- guestTotalTime = it.longSumData.points.first().value
- }
- metricsByName["guest.time.error"]?.let {
- guestDownTime = it.longSumData.points.first().value
- }
+
+ totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong()
+ grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong()
+ totalTime = metricsByName.getValue("host.time.total").longSumData.points.first().value
+ downTime = metricsByName.getValue("host.time.down").longSumData.points.first().value
+ guestTotalTime = metricsByName.getValue("guest.time.total").longSumData.points.first().value
+ guestDownTime = metricsByName.getValue("guest.time.error").longSumData.points.first().value
+
return CompletableResultCode.ofSuccess()
}
@@ -290,8 +276,8 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(2226039, requestedWork, "Total time does not match") },
- { assertEquals(1086039, grantedWork, "Down time does not match") },
+ { assertEquals(2226040, totalWork, "Total time does not match") },
+ { assertEquals(1086040, grantedWork, "Down time does not match") },
{ assertEquals(1200001, totalTime, "Total time does not match") },
{ assertEquals(1200001, guestTotalTime, "Guest total time does not match") },
{ assertEquals(5000, downTime, "Down time does not match") },
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
deleted file mode 100644
index 8227bca9..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import kotlinx.coroutines.*
-import org.apache.commons.math3.distribution.LogNormalDistribution
-import org.apache.commons.math3.random.Well19937c
-import org.opendc.compute.api.*
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.ReplayScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
-import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
-import org.opendc.compute.service.scheduler.weights.RamWeigher
-import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import org.opendc.compute.simulator.failure.StartStopHostFault
-import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import org.opendc.experiments.capelin.env.EnvironmentReader
-import org.opendc.experiments.capelin.trace.TraceReader
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.sdk.toOtelClock
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
-
-/**
- * Obtain the [FaultInjector] to use for the experiments.
- */
-fun createFaultInjector(
- context: CoroutineContext,
- clock: Clock,
- hosts: Set<SimHost>,
- seed: Int,
- failureInterval: Double
-): HostFaultInjector {
- val rng = Well19937c(seed)
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
-}
-
-/**
- * Construct the environment for a simulated compute service..
- */
-suspend fun withComputeService(
- clock: Clock,
- meterProvider: MeterProvider,
- environmentReader: EnvironmentReader,
- scheduler: ComputeScheduler,
- interferenceModel: VmInterferenceModel? = null,
- block: suspend CoroutineScope.(ComputeService) -> Unit
-): Unit = coroutineScope {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val hosts = environmentReader
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- coroutineContext,
- interpreter,
- meterProvider.get("opendc-compute-simulator"),
- SimFairShareHypervisorProvider(),
- powerDriver = SimplePowerDriver(def.powerModel),
- interferenceDomain = interferenceModel?.newDomain()
- )
- }
-
- val serviceMeter = meterProvider.get("opendc-compute")
- val service =
- ComputeService(coroutineContext, clock, serviceMeter, scheduler)
-
- for (host in hosts) {
- service.addHost(host)
- }
-
- try {
- block(this, service)
- } finally {
- service.close()
- hosts.forEach(SimHost::close)
- }
-}
-
-/**
- * Process the trace.
- */
-suspend fun processTrace(
- clock: Clock,
- reader: TraceReader<SimWorkload>,
- scheduler: ComputeService,
- monitor: ComputeMonitor? = null,
-) {
- val client = scheduler.newClient()
- val watcher = object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor?.onStateChange(clock.millis(), server, newState)
- }
- }
-
- // Create new image for the virtual machine
- val image = client.newImage("vm-image")
-
- try {
- coroutineScope {
- var offset = Long.MIN_VALUE
-
- while (reader.hasNext()) {
- val entry = reader.next()
-
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- // Make sure the trace entries are ordered by submission time
- assert(entry.start - offset >= 0) { "Invalid trace order" }
- delay(max(0, (entry.start - offset) - clock.millis()))
-
- launch {
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
-
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta + mapOf("workload" to workload)
- )
- server.watch(watcher)
-
- // Wait for the server reach its end time
- val endTime = entry.meta["end-time"] as Long
- delay(endTime + workloadOffset - clock.millis() + 1)
-
- // Delete the server after reaching the end-time of the virtual machine
- server.delete()
- }
- }
- }
-
- yield()
- } finally {
- reader.close()
- client.close()
- }
-}
-
-/**
- * Create a [MeterProvider] instance for the experiment.
- */
-fun createMeterProvider(clock: Clock): MeterProvider {
- return SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-}
-
-/**
- * Create a [ComputeScheduler] for the experiment.
- */
-fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
- val cpuAllocationRatio = 16.0
- val ramAllocationRatio = 1.5
- return when (allocationPolicy) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = 1.0))
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = -1.0))
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = -1.0))
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = emptyList(),
- subsetSize = Int.MAX_VALUE,
- random = java.util.Random(seeder.nextLong())
- )
- "replay" -> ReplayScheduler(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 82794471..f7f9336e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,10 +23,7 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import mu.KotlinLogging
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
@@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.io.FileInputStream
+import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.asKotlinRandom
+import kotlin.math.roundToLong
/**
* A portfolio represents a collection of scenarios are tested for the work.
@@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) {
/**
* Perform a single trial for this portfolio.
*/
- @OptIn(ExperimentalCoroutinesApi::class)
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
- val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements)
-
- val meterProvider = createMeterProvider(clock)
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
} else {
listOf(workload.name)
}
-
val rawReaders = workloadNames.map { workloadName ->
traceReaders.computeIfAbsent(workloadName) {
logger.info { "Loading trace $workloadName" }
RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
}
}
-
+ val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
.read(FileInputStream(config.getString("interference-model")))
@@ -126,43 +122,36 @@ abstract class Portfolio(name: String) : Experiment(name) {
else
null
- val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
+ val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
+ val failureModel =
+ if (operationalPhenomena.failureFrequency > 0)
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
+ else
+ null
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environment.read(),
+ failureModel,
+ performanceInterferenceModel
+ )
val monitor = ParquetExportMonitor(
File(config.getString("output-path")),
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
- withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler ->
- val faultInjector = if (operationalPhenomena.failureFrequency > 0) {
- logger.debug("ENABLING failures")
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seeder.nextInt(),
- operationalPhenomena.failureFrequency,
- )
- } else {
- null
- }
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector?.start()
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
- }
-
- faultInjector?.close()
- monitor.close()
+ try {
+ simulator.run(trace)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0])
logger.debug {
"Finish " +
"SUBMIT=${monitorResults.instanceCount} " +
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
index 7062a275..fa00fc35 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -28,7 +28,6 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.table.HostData
import java.io.File
@@ -46,8 +45,8 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
override fun convert(builder: GenericRecordBuilder, data: HostData) {
builder["timestamp"] = data.timestamp
- builder["host_id"] = data.host.name
- builder["powered_on"] = data.host.state == HostState.UP
+ builder["host_id"] = data.host.id
+ builder["powered_on"] = true
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
builder["total_work"] = data.totalWork
@@ -58,7 +57,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
builder["cpu_demand"] = data.cpuDemand
builder["power_draw"] = data.powerDraw
builder["num_instances"] = data.instanceCount
- builder["num_cpus"] = data.host.model.cpuCount
+ builder["num_cpus"] = data.host.cpuCount
}
override fun toString(): String = "host-writer"
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
index 9904adde..bb2db4b7 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
@@ -46,12 +46,12 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
override fun convert(builder: GenericRecordBuilder, data: ServerData) {
builder["timestamp"] = data.timestamp
- builder["server_id"] = data.server.uid.toString()
- builder["state"] = data.server.state
+ builder["server_id"] = data.server
+ // builder["state"] = data.server.state
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
- builder["num_vcpus"] = data.server.flavor.cpuCount
- builder["mem_capacity"] = data.server.flavor.memorySize
+ // builder["num_vcpus"] = data.server.flavor.cpuCount
+ // builder["mem_capacity"] = data.server.flavor.memorySize
}
override fun toString(): String = "server-writer"
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt
new file mode 100644
index 00000000..3b7c3f0f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeSchedulers")
+package org.opendc.experiments.capelin.util
+
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (allocationPolicy) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
new file mode 100644
index 00000000..065a8c93
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.util
+
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.env.MachineDef
+import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.*
+import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.max
+
+/**
+ * Helper class to manage a [ComputeService] simulation.
+ */
+class ComputeServiceSimulator(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ scheduler: ComputeScheduler,
+ machines: List<MachineDef>,
+ private val failureModel: FailureModel? = null,
+ interferenceModel: VmInterferenceModel? = null,
+ hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider()
+) : AutoCloseable {
+ /**
+ * The [ComputeService] that has been configured by the manager.
+ */
+ val service: ComputeService
+
+ /**
+ * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
+ */
+ val producers: List<MetricProducer>
+ get() = _metricProducers
+ private val _metricProducers = mutableListOf<MetricProducer>()
+
+ /**
+ * The [SimResourceInterpreter] to simulate the hosts.
+ */
+ private val interpreter = SimResourceInterpreter(context, clock)
+
+ /**
+ * The hosts that belong to this class.
+ */
+ private val hosts = mutableSetOf<SimHost>()
+
+ init {
+ val (service, serviceMeterProvider) = createService(scheduler)
+ this._metricProducers.add(serviceMeterProvider)
+ this.service = service
+
+ for (def in machines) {
+ val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel)
+ this._metricProducers.add(hostMeterProvider)
+ hosts.add(host)
+ this.service.addHost(host)
+ }
+ }
+
+ /**
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
+ */
+ suspend fun run(reader: TraceReader<SimWorkload>) {
+ val injector = failureModel?.createInjector(context, clock, service)
+ val client = service.newClient()
+
+ // Create new image for the virtual machine
+ val image = client.newImage("vm-image")
+
+ try {
+ coroutineScope {
+ // Start the fault injector
+ injector?.start()
+
+ var offset = Long.MIN_VALUE
+
+ while (reader.hasNext()) {
+ val entry = reader.next()
+
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ // Make sure the trace entries are ordered by submission time
+ assert(entry.start - offset >= 0) { "Invalid trace order" }
+ delay(max(0, (entry.start - offset) - clock.millis()))
+
+ launch {
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+
+ val server = client.newServer(
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta + mapOf("workload" to workload)
+ )
+
+ // Wait for the server reach its end time
+ val endTime = entry.meta["end-time"] as Long
+ delay(endTime + workloadOffset - clock.millis() + 1)
+
+ // Delete the server after reaching the end-time of the virtual machine
+ server.delete()
+ }
+ }
+ }
+
+ yield()
+ } finally {
+ injector?.close()
+ reader.close()
+ client.close()
+ }
+ }
+
+ override fun close() {
+ service.close()
+
+ for (host in hosts) {
+ host.close()
+ }
+
+ hosts.clear()
+ }
+
+ /**
+ * Construct a [ComputeService] instance.
+ */
+ private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val service = ComputeService(context, clock, meterProvider, scheduler)
+ return service to meterProvider
+ }
+
+ /**
+ * Construct a [SimHost] instance for the specified [MachineDef].
+ */
+ private fun createHost(
+ def: MachineDef,
+ hypervisorProvider: SimHypervisorProvider,
+ interferenceModel: VmInterferenceModel? = null
+ ): Pair<SimHost, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(HOST_ID, def.uid.toString())
+ .put(HOST_NAME, def.name)
+ .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(HOST_NCPUS, def.model.cpus.size)
+ .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size })
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val host = SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ context,
+ interpreter,
+ meterProvider,
+ hypervisorProvider,
+ powerDriver = SimplePowerDriver(def.powerModel),
+ interferenceDomain = interferenceModel?.newDomain()
+ )
+
+ return host to meterProvider
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
new file mode 100644
index 00000000..83393896
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.util
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
+ */
+interface FailureModel {
+ /**
+ * Construct a [HostFaultInjector] for the specified [service].
+ */
+ fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
new file mode 100644
index 00000000..89b4a31c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("FailureModels")
+package org.opendc.experiments.capelin
+
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.StartStopHostFault
+import org.opendc.compute.simulator.failure.StochasticVictimSelector
+import org.opendc.experiments.capelin.util.FailureModel
+import java.time.Clock
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+import kotlin.random.Random
+
+/**
+ * Obtain a [FailureModel] based on the GRID'5000 failure trace.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
+ return object : FailureModel {
+ override fun createInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ service: ComputeService
+ ): HostFaultInjector {
+ val rng = Well19937c(seed)
+ val hosts = service.hosts.map { it as SimHost }.toSet()
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+ }
+
+ override fun toString(): String = "Grid5000FailureModel"
+ }
+}
+
+/**
+ * Obtain the [HostFaultInjector] to use for the experiments.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+fun createFaultInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ hosts: Set<SimHost>,
+ seed: Int,
+ failureInterval: Double
+): HostFaultInjector {
+ val rng = Well19937c(seed)
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index cf88535d..f4cf3e5e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
@@ -40,15 +38,19 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
import java.util.*
+import kotlin.math.roundToLong
/**
* An integration test suite for the Capelin experiments.
@@ -60,11 +62,20 @@ class CapelinIntegrationTest {
private lateinit var monitor: TestExperimentReporter
/**
+ * The [FilterScheduler] to use for all experiments.
+ */
+ private lateinit var computeScheduler: FilterScheduler
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
fun setUp() {
monitor = TestExperimentReporter()
+ computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
}
/**
@@ -72,26 +83,26 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- val meterProvider = createMeterProvider(clock)
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -106,11 +117,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
{ assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
- { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
- { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
- { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
+ { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } },
+ { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } },
+ { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } },
- { assertEquals(1.8175860403178412E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
+ { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
)
}
@@ -120,27 +131,26 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -151,9 +161,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } },
+ { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -164,10 +174,6 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
@@ -177,20 +183,24 @@ class CapelinIntegrationTest {
.read(perfInterferenceInput)
.let { VmInterferenceModel(it, Random(seed.toLong())) }
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ interferenceModel = performanceInterferenceModel
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -201,10 +211,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } },
+ { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -214,39 +224,27 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val faultInjector =
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seed,
- 24.0 * 7,
- )
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector.start()
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
-
- faultInjector.close()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ grid5000(Duration.ofDays(7), seed)
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -257,9 +255,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -291,10 +289,10 @@ class CapelinIntegrationTest {
var totalPowerDraw = 0.0
override fun record(data: HostData) {
- this.totalWork += data.totalWork.toLong()
- totalGrantedWork += data.grantedWork.toLong()
- totalOvercommittedWork += data.overcommittedWork.toLong()
- totalInterferedWork += data.interferedWork.toLong()
+ this.totalWork += data.totalWork.roundToLong()
+ totalGrantedWork += data.grantedWork.roundToLong()
+ totalOvercommittedWork += data.overcommittedWork.roundToLong()
+ totalInterferedWork += data.interferedWork.roundToLong()
totalPowerDraw += data.powerDraw
}
}
diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
index 650416f5..3312d6c0 100644
--- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
+++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
@@ -46,6 +46,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
+import java.time.Duration
import java.util.*
import kotlin.math.max
@@ -85,7 +86,7 @@ public class ServerlessExperiment : Experiment("Serverless") {
val delayInjector = StochasticDelayInjector(coldStartModel, Random())
val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) }
val service =
- FaaSService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10L * 60 * 1000))
+ FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10)))
val client = service.newClient()
coroutineScope {
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts
index 63bed8bc..6f4fcc9b 100644
--- a/opendc-faas/opendc-faas-service/build.gradle.kts
+++ b/opendc-faas/opendc-faas-service/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
api(projects.opendcTelemetry.opendcTelemetryApi)
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
+ implementation(libs.opentelemetry.semconv)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testRuntimeOnly(libs.log4j.slf4j)
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 7e716a34..1d5331cb 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -23,6 +23,7 @@
package org.opendc.faas.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
import org.opendc.faas.service.deployer.FunctionDeployer
@@ -51,7 +52,7 @@ public interface FaaSService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
- * @param meter The meter to report metrics to.
+ * @param meterProvider The [MeterProvider] to create a [Meter] with.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
* @param terminationPolicy The policy for terminating function instances.
@@ -59,12 +60,12 @@ public interface FaaSService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
terminationPolicy: FunctionTerminationPolicy,
): FaaSService {
- return FaaSServiceImpl(context, clock, meter, deployer, routingPolicy, terminationPolicy)
+ return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy)
}
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
index a1cb1dbf..54df2b59 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
@@ -28,6 +28,7 @@ import io.opentelemetry.api.metrics.BoundLongCounter
import io.opentelemetry.api.metrics.BoundLongHistogram
import io.opentelemetry.api.metrics.BoundLongUpDownCounter
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.faas.service.deployer.FunctionInstance
import java.util.*
@@ -43,9 +44,14 @@ public class FunctionObject(
meta: Map<String, Any>
) : AutoCloseable {
/**
- * The function identifier attached to the metrics.
+ * The attributes of this function.
*/
- private val functionId = AttributeKey.stringKey("function")
+ public val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.FAAS_ID, uid.toString())
+ .put(ResourceAttributes.FAAS_NAME, name)
+ .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory)
+ .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" })
+ .build()
/**
* The total amount of function invocations received by the function.
@@ -54,7 +60,7 @@ public class FunctionObject(
.setDescription("Number of function invocations")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of function invocations that could be handled directly.
@@ -63,7 +69,7 @@ public class FunctionObject(
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of function invocations that were delayed due to function deployment.
@@ -72,7 +78,7 @@ public class FunctionObject(
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of function invocations that failed.
@@ -81,7 +87,7 @@ public class FunctionObject(
.setDescription("Number of function invocations that failed")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of instances for this function.
@@ -90,7 +96,7 @@ public class FunctionObject(
.setDescription("Number of active function instances")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of idle instances for this function.
@@ -99,7 +105,7 @@ public class FunctionObject(
.setDescription("Number of idle function instances")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The time that the function waited.
@@ -109,7 +115,7 @@ public class FunctionObject(
.setDescription("Time the function has to wait before being started")
.setUnit("ms")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The time that the function was running.
@@ -119,7 +125,7 @@ public class FunctionObject(
.setDescription("Time the function was running")
.setUnit("ms")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The instances associated with this function.
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
index 1e224ed1..63dbadc7 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -26,6 +26,7 @@ import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceState
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -36,7 +37,7 @@ import kotlin.coroutines.CoroutineContext
public class FunctionTerminationPolicyFixed(
context: CoroutineContext,
clock: Clock,
- public val timeout: Long
+ public val timeout: Duration
) : FunctionTerminationPolicy {
/**
* The [TimerScheduler] used to schedule the function terminations.
@@ -60,6 +61,6 @@ public class FunctionTerminationPolicyFixed(
* Schedule termination for the specified [instance].
*/
private fun schedule(instance: FunctionInstance) {
- scheduler.startSingleTimer(instance, delay = timeout) { instance.close() }
+ scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() }
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index ccf9a5d9..3b560cd3 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.faas.service.internal
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import mu.KotlinLogging
@@ -54,7 +55,7 @@ import kotlin.coroutines.resumeWithException
internal class FaaSServiceImpl(
context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ private val meterProvider: MeterProvider,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
private val terminationPolicy: FunctionTerminationPolicy
@@ -70,6 +71,11 @@ internal class FaaSServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] that collects the metrics of this service.
+ */
+ private val meter = meterProvider.get("org.opendc.faas.service")
+
+ /**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
index 6b99684a..1612e10b 100644
--- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
@@ -44,8 +44,7 @@ internal class FaaSServiceTest {
@Test
fun testClientState() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -59,8 +58,7 @@ internal class FaaSServiceTest {
@Test
fun testClientInvokeUnknown() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -69,8 +67,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCreation() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -81,8 +78,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionQuery() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -95,8 +91,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindById() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -109,8 +104,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindByName() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -123,8 +117,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDuplicateName() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -135,8 +128,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDelete() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -150,8 +142,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -163,9 +154,8 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionInvoke() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
val deployer = mockk<FunctionDeployer>()
- val service = FaaSService(coroutineContext, clock, meter, deployer, mockk(), mockk(relaxUnitFun = true))
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true))
every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index 64f2551b..0dc9ba87 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -43,6 +43,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import java.time.Duration
/**
* A test suite for the [FaaSService] implementation under simulated conditions.
@@ -64,14 +65,13 @@ internal class SimFaaSServiceTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) {
override suspend fun invoke() {}
})
val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload }
val service = FaaSService(
- coroutineContext, clock, meter, deployer, RandomRoutingPolicy(),
- FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10000)
+ coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(),
+ FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000))
)
val client = service.newClient()
diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
index 6a3de9bc..cd8cb57a 100644
--- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
+++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
@@ -31,7 +31,6 @@ dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index 57d43c60..408d1325 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -22,137 +22,260 @@
package org.opendc.telemetry.compute
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.data.*
import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
-import org.opendc.compute.service.driver.Host
import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.HostInfo
+import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServerInfo
import java.time.Clock
/**
* A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
-public class ComputeMetricExporter(
- private val clock: Clock,
- private val hosts: Map<String, Host>,
- private val monitor: ComputeMonitor
-) : MetricExporter {
-
+public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter {
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
- reportHostMetrics(metrics)
reportServiceMetrics(metrics)
+ reportHostMetrics(metrics)
+ reportServerMetrics(metrics)
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
CompletableResultCode.ofFailure()
}
}
- private var lastHostMetrics: Map<String, HBuffer> = emptyMap()
- private val hostMetricsSingleton = HBuffer()
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ private fun reportServiceMetrics(metrics: Collection<MetricData>) {
+ monitor.record(extractServiceMetrics(clock.millis(), metrics))
+ }
+
+ private val hosts = mutableMapOf<String, HostAggregator>()
+ private val servers = mutableMapOf<String, ServerAggregator>()
private fun reportHostMetrics(metrics: Collection<MetricData>) {
- val hostMetrics = mutableMapOf<String, HBuffer>()
+ val hosts = hosts
+ val servers = servers
+
+ for (metric in metrics) {
+ val resource = metric.resource
+ val hostId = resource.attributes[HOST_ID] ?: continue
+ val agg = hosts.computeIfAbsent(hostId) { HostAggregator(resource) }
+ agg.accept(metric)
+ }
+
+ val monitor = monitor
+ val now = clock.millis()
+ for ((_, server) in servers) {
+ server.record(monitor, now)
+ }
+ }
+
+ private fun reportServerMetrics(metrics: Collection<MetricData>) {
+ val hosts = hosts
for (metric in metrics) {
+ val resource = metric.resource
+ val host = resource.attributes[HOST_ID]?.let { hosts[it]?.host }
+
when (metric.name) {
- "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v }
- "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v }
- "power.usage" -> mapDoubleHistogram(metric, hostMetrics) { m, v -> m.powerDraw = v }
- "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v }
- "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v }
- "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v }
- "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v }
- "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() }
- "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v }
- "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v }
+ "scheduler.duration" -> mapByServer(metric.doubleHistogramData.points, host) { agg, point ->
+ agg.schedulingLatency = point.sum / point.count
+ }
+ "guest.time.running" -> mapByServer(metric.longSumData.points, host) { agg, point ->
+ agg.uptime = point.value
+ }
+ "guest.time.error" -> mapByServer(metric.longSumData.points, host) { agg, point ->
+ agg.downtime = point.value
+ }
}
}
- for ((id, hostMetric) in hostMetrics) {
- val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton)
- val host = hosts[id] ?: continue
+ val monitor = monitor
+ val now = clock.millis()
+ for ((_, host) in hosts) {
+ host.record(monitor, now)
+ }
+ }
+
+ /**
+ * Helper function to map a metric by the server.
+ */
+ private inline fun <P : PointData> mapByServer(points: Collection<P>, host: HostInfo? = null, block: (ServerAggregator, P) -> Unit) {
+ for (point in points) {
+ val serverId = point.attributes[ResourceAttributes.HOST_ID] ?: continue
+ val agg = servers.computeIfAbsent(serverId) { ServerAggregator(point.attributes) }
+
+ if (host != null) {
+ agg.host = host
+ }
+
+ block(agg, point)
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ private class HostAggregator(resource: Resource) {
+ /**
+ * The static information about this host.
+ */
+ val host = HostInfo(
+ resource.attributes[HOST_ID]!!,
+ resource.attributes[HOST_NAME]!!,
+ resource.attributes[HOST_ARCH]!!,
+ resource.attributes[HOST_NCPUS]!!.toInt(),
+ resource.attributes[HOST_MEM_CAPACITY]!!,
+ )
+
+ private var totalWork: Double = 0.0
+ private var previousTotalWork = 0.0
+ private var grantedWork: Double = 0.0
+ private var previousGrantedWork = 0.0
+ private var overcommittedWork: Double = 0.0
+ private var previousOvercommittedWork = 0.0
+ private var interferedWork: Double = 0.0
+ private var previousInterferedWork = 0.0
+ private var cpuUsage: Double = 0.0
+ private var cpuDemand: Double = 0.0
+ private var instanceCount: Int = 0
+ private var powerDraw: Double = 0.0
+ private var uptime: Long = 0
+ private var previousUptime = 0L
+ private var downtime: Long = 0
+ private var previousDowntime = 0L
+ fun record(monitor: ComputeMonitor, now: Long) {
monitor.record(
HostData(
- clock.millis(),
+ now,
host,
- hostMetric.totalWork - lastHostMetric.totalWork,
- hostMetric.grantedWork - lastHostMetric.grantedWork,
- hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
- hostMetric.interferedWork - lastHostMetric.interferedWork,
- hostMetric.cpuUsage,
- hostMetric.cpuDemand,
- hostMetric.instanceCount,
- hostMetric.powerDraw,
- hostMetric.uptime - lastHostMetric.uptime,
- hostMetric.downtime - lastHostMetric.downtime,
+ totalWork - previousTotalWork,
+ grantedWork - previousGrantedWork,
+ overcommittedWork - previousOvercommittedWork,
+ interferedWork - previousInterferedWork,
+ cpuUsage,
+ cpuDemand,
+ instanceCount,
+ powerDraw,
+ uptime - previousUptime,
+ downtime - previousDowntime,
)
)
- }
-
- lastHostMetrics = hostMetrics
- }
- private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) {
- val points = data.doubleSummaryData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
- val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
- block(hostMetric, avg)
+ previousTotalWork = totalWork
+ previousGrantedWork = grantedWork
+ previousOvercommittedWork = overcommittedWork
+ previousInterferedWork = interferedWork
+ previousUptime = uptime
+ previousDowntime = downtime
+ reset()
}
- }
- private fun mapDoubleHistogram(data: MetricData, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) {
- val points = data.doubleHistogramData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
- block(hostMetric, point.sum / point.count)
+ /**
+ * Accept the [MetricData] for this host.
+ */
+ fun accept(data: MetricData) {
+ when (data.name) {
+ "cpu.work.total" -> totalWork = data.doubleSumData.points.first().value
+ "cpu.work.granted" -> grantedWork = data.doubleSumData.points.first().value
+ "cpu.work.overcommit" -> overcommittedWork = data.doubleSumData.points.first().value
+ "cpu.work.interference" -> interferedWork = data.doubleSumData.points.first().value
+ "power.usage" -> powerDraw = acceptHistogram(data)
+ "cpu.usage" -> cpuUsage = acceptHistogram(data)
+ "cpu.demand" -> cpuDemand = acceptHistogram(data)
+ "guests.active" -> instanceCount = data.longSumData.points.first().value.toInt()
+ "host.time.up" -> uptime = data.longSumData.points.first().value
+ "host.time.down" -> downtime = data.longSumData.points.first().value
+ }
}
- }
- private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Long) -> Unit) {
- val points = data?.longSumData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
- block(hostMetric, point.value)
+ private fun acceptHistogram(data: MetricData): Double {
+ return when (data.type) {
+ MetricDataType.HISTOGRAM -> {
+ val point = data.doubleHistogramData.points.first()
+ point.sum / point.count
+ }
+ MetricDataType.SUMMARY -> {
+ val point = data.doubleSummaryData.points.first()
+ point.sum / point.count
+ }
+ else -> error("Invalid metric type")
+ }
}
- }
- private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) {
- val points = data?.doubleSumData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
- block(hostMetric, point.value)
+ private fun reset() {
+ totalWork = 0.0
+ grantedWork = 0.0
+ overcommittedWork = 0.0
+ interferedWork = 0.0
+ cpuUsage = 0.0
+ cpuDemand = 0.0
+ instanceCount = 0
+ powerDraw = 0.0
+ uptime = 0L
+ downtime = 0L
}
}
/**
- * A buffer for host metrics before they are reported.
+ * An aggregator for server metrics before they are reported.
*/
- private class HBuffer {
- var totalWork: Double = 0.0
- var grantedWork: Double = 0.0
- var overcommittedWork: Double = 0.0
- var interferedWork: Double = 0.0
- var cpuUsage: Double = 0.0
- var cpuDemand: Double = 0.0
- var instanceCount: Int = 0
- var powerDraw: Double = 0.0
- var uptime: Long = 0
- var downtime: Long = 0
- }
+ private class ServerAggregator(attributes: Attributes) {
+ /**
+ * The static information about this server.
+ */
+ val server = ServerInfo(
+ attributes[ResourceAttributes.HOST_ID]!!,
+ attributes[ResourceAttributes.HOST_NAME]!!,
+ attributes[ResourceAttributes.HOST_TYPE]!!,
+ attributes[ResourceAttributes.HOST_ARCH]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_ID]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_NAME]!!,
+ attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(),
+ attributes[AttributeKey.longKey("host.mem_capacity")]!!,
+ )
- private fun reportServiceMetrics(metrics: Collection<MetricData>) {
- monitor.record(extractServiceMetrics(clock.millis(), metrics))
- }
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ var host: HostInfo? = null
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ @JvmField var uptime: Long = 0
+ private var previousUptime = 0L
+ @JvmField var downtime: Long = 0
+ private var previousDowntime = 0L
+ @JvmField var schedulingLatency = 0.0
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ fun record(monitor: ComputeMonitor, now: Long) {
+ monitor.record(
+ ServerData(
+ now,
+ server,
+ null,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ )
+ )
+
+ previousUptime = uptime
+ previousDowntime = downtime
+ reset()
+ }
+
+ private fun reset() {
+ host = null
+ uptime = 0L
+ downtime = 0L
+ }
+ }
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
index ec303b37..d51bcab4 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
@@ -22,10 +22,6 @@
package org.opendc.telemetry.compute
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.compute.table.ServiceData
@@ -35,16 +31,6 @@ import org.opendc.telemetry.compute.table.ServiceData
*/
public interface ComputeMonitor {
/**
- * This method is invoked when the state of a [Server] changes.
- */
- public fun onStateChange(timestamp: Long, server: Server, newState: ServerState) {}
-
- /**
- * This method is invoked when the state of a [Host] changes.
- */
- public fun onStateChange(time: Long, host: Host, newState: HostState) {}
-
- /**
* Record the specified [data].
*/
public fun record(data: ServerData) {}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index 01df0e69..1f309f1b 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -24,51 +24,7 @@ package org.opendc.telemetry.compute
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.coroutineScope
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostListener
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.table.ServiceData
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
-import java.time.Clock
-import java.time.Duration
-
-/**
- * Attach the specified monitor to the OpenDC Compute service.
- */
-public suspend fun withMonitor(
- scheduler: ComputeService,
- clock: Clock,
- metricProducer: MetricProducer,
- monitor: ComputeMonitor,
- exportInterval: Duration = Duration.ofMinutes(5), /* Every 5 min (which is the granularity of the workload trace) */
- block: suspend CoroutineScope.() -> Unit
-): Unit = coroutineScope {
- // Monitor host events
- for (host in scheduler.hosts) {
- monitor.onStateChange(clock.millis(), host, HostState.UP)
- host.addListener(object : HostListener {
- override fun onStateChanged(host: Host, newState: HostState) {
- monitor.onStateChange(clock.millis(), host, newState)
- }
- })
- }
-
- val reader = CoroutineMetricReader(
- this,
- listOf(metricProducer),
- ComputeMetricExporter(clock, scheduler.hosts.associateBy { it.uid.toString() }, monitor),
- exportInterval
- )
-
- try {
- block(this)
- } finally {
- reader.close()
- }
-}
/**
* Collect the metrics of the compute service.
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt
new file mode 100644
index 00000000..7dca6186
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("HostAttributes")
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.api.common.AttributeKey
+
+/**
+ * The identifier of the node hosting virtual machines.
+ */
+public val HOST_ID: AttributeKey<String> = AttributeKey.stringKey("node.id")
+
+/**
+ * The name of the node hosting virtual machines.
+ */
+public val HOST_NAME: AttributeKey<String> = AttributeKey.stringKey("node.name")
+
+/**
+ * The CPU architecture of the host node.
+ */
+public val HOST_ARCH: AttributeKey<String> = AttributeKey.stringKey("node.arch")
+
+/**
+ * The number of CPUs in the host node.
+ */
+public val HOST_NCPUS: AttributeKey<Long> = AttributeKey.longKey("node.num_cpus")
+
+/**
+ * The amount of memory installed in the host node in MiB.
+ */
+public val HOST_MEM_CAPACITY: AttributeKey<Long> = AttributeKey.longKey("node.mem_capacity")
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
index 8e6c34d0..e3ecda3d 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
@@ -22,14 +22,12 @@
package org.opendc.telemetry.compute.table
-import org.opendc.compute.service.driver.Host
-
/**
* A trace entry for a particular host.
*/
public data class HostData(
public val timestamp: Long,
- public val host: Host,
+ public val host: HostInfo,
public val totalWork: Double,
public val grantedWork: Double,
public val overcommittedWork: Double,
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt
new file mode 100644
index 00000000..d9a5906b
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+/**
+ * Information about a host exposed to the telemetry service.
+ */
+public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
index 2a9fa8a6..7fde86d9 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
@@ -22,14 +22,13 @@
package org.opendc.telemetry.compute.table
-import org.opendc.compute.api.Server
-
/**
* A trace entry for a particular server.
*/
public data class ServerData(
public val timestamp: Long,
- public val server: Server,
+ public val server: ServerInfo,
+ public val host: HostInfo?,
public val uptime: Long,
public val downtime: Long,
)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt
new file mode 100644
index 00000000..b16e5f3d
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+/**
+ * Static information about a server exposed to the telemetry service.
+ */
+public data class ServerInfo(
+ val id: String,
+ val name: String,
+ val type: String,
+ val arch: String,
+ val imageId: String,
+ val imageName: String,
+ val cpuCount: Int,
+ val memCapacity: Long
+)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index b565e90d..b9d5a3f5 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -28,10 +28,8 @@ import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import mu.KotlinLogging
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.env.MachineDef
@@ -39,6 +37,8 @@ import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -46,8 +46,9 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
@@ -55,9 +56,8 @@ import org.opendc.web.client.model.Scenario
import org.opendc.web.client.model.Topology
import java.io.File
import java.net.URI
+import java.time.Duration
import java.util.*
-import kotlin.random.Random
-import kotlin.random.asJavaRandom
import org.opendc.web.client.model.Portfolio as ClientPortfolio
private val logger = KotlinLogging.logger {}
@@ -158,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") {
val results = (0 until targets.repeatsPerScenario).map { repeat ->
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
- val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) }
+ val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
runRepeat(scenario, repeat, environment, traceReader, interferenceModel)
}
}
@@ -182,63 +182,55 @@ class RunnerCli : CliktCommand(name = "runner") {
try {
runBlockingSimulation {
- val seed = repeat
val workloadName = scenario.trace.traceId
val workloadFraction = scenario.trace.loadSamplingFraction
- val seeder = Random(seed)
+ val seeder = Random(repeat.toLong())
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
.setClock(clock.toOtelClock())
.build()
- val metricProducer = meterProvider as MetricProducer
val operational = scenario.operationalPhenomena
- val allocationPolicy = createComputeScheduler(operational.schedulerName, seeder)
+ val computeScheduler = createComputeScheduler(operational.schedulerName, seeder)
val trace = ParquetTraceReader(
listOf(traceReader),
Workload(workloadName, workloadFraction),
- seed
+ repeat
)
- val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0
-
- withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler ->
- val faultInjector = if (failureFrequency > 0) {
- logger.debug { "ENABLING failures" }
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seeder.nextInt(),
- failureFrequency,
- )
- } else {
+ val failureModel =
+ if (operational.failuresEnabled)
+ grid5000(Duration.ofDays(7), repeat)
+ else
null
- }
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector?.start()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environment.read(),
+ failureModel,
+ interferenceModel.takeIf { operational.performanceInterferenceEnabled }
+ )
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
- faultInjector?.close()
- }
+ try {
+ simulator.run(trace)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val monitorResults = collectServiceMetrics(clock.millis(), metricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
logger.debug {
"Finish " +
- "SUBMIT=${monitorResults.instanceCount} " +
- "FAIL=${monitorResults.failedInstanceCount} " +
- "QUEUE=${monitorResults.queuedInstanceCount} " +
- "RUNNING=${monitorResults.runningInstanceCount}"
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
}
}
} catch (cause: Throwable) {
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
index c8e58dde..4b813310 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
@@ -22,27 +22,19 @@
package org.opendc.web.runner
-import mu.KotlinLogging
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.compute.table.ServiceData
import kotlin.math.max
+import kotlin.math.roundToLong
/**
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
-public class WebComputeMonitor : ComputeMonitor {
- private val logger = KotlinLogging.logger {}
-
- override fun onStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
+class WebComputeMonitor : ComputeMonitor {
override fun record(data: HostData) {
- val duration = 5 * 60 * 1000L
- val slices = duration / SLICE_LENGTH
+ val duration = data.uptime
+ val slices = data.downtime / SLICE_LENGTH
hostAggregateMetrics = AggregateHostMetrics(
hostAggregateMetrics.totalWork + data.totalWork,
@@ -50,14 +42,14 @@ public class WebComputeMonitor : ComputeMonitor {
hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork,
hostAggregateMetrics.totalInterferedWork + data.overcommittedWork,
hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600,
- hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0
+ hostAggregateMetrics.totalFailureSlices + slices,
+ hostAggregateMetrics.totalFailureVmSlices + data.instanceCount * slices
)
- hostMetrics.compute(data.host) { _, prev ->
+ hostMetrics.compute(data.host.id) { _, prev ->
HostMetrics(
- (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ data.cpuUsage + (prev?.cpuUsage ?: 0.0),
+ data.cpuDemand + (prev?.cpuDemand ?: 0.0),
data.instanceCount + (prev?.instanceCount ?: 0),
1 + (prev?.count ?: 0)
)
@@ -65,7 +57,7 @@ public class WebComputeMonitor : ComputeMonitor {
}
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
+ private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf()
private val SLICE_LENGTH: Long = 5 * 60 * 1000
data class AggregateHostMetrics(
@@ -74,8 +66,8 @@ public class WebComputeMonitor : ComputeMonitor {
val totalOvercommittedWork: Double = 0.0,
val totalInterferedWork: Double = 0.0,
val totalPowerDraw: Double = 0.0,
- val totalFailureSlices: Long = 0,
- val totalFailureVmSlices: Long = 0,
+ val totalFailureSlices: Double = 0.0,
+ val totalFailureVmSlices: Double = 0.0,
)
data class HostMetrics(
@@ -97,7 +89,7 @@ public class WebComputeMonitor : ComputeMonitor {
)
}
- public data class AggregateServiceMetrics(
+ data class AggregateServiceMetrics(
val vmTotalCount: Int = 0,
val vmWaitingCount: Int = 0,
val vmActiveCount: Int = 0,
@@ -105,7 +97,7 @@ public class WebComputeMonitor : ComputeMonitor {
val vmFailedCount: Int = 0
)
- public fun getResult(): Result {
+ fun getResult(): Result {
return Result(
hostAggregateMetrics.totalWork,
hostAggregateMetrics.totalGrantedWork,
@@ -116,8 +108,8 @@ public class WebComputeMonitor : ComputeMonitor {
hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(),
hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
hostAggregateMetrics.totalPowerDraw,
- hostAggregateMetrics.totalFailureSlices,
- hostAggregateMetrics.totalFailureVmSlices,
+ hostAggregateMetrics.totalFailureSlices.roundToLong(),
+ hostAggregateMetrics.totalFailureVmSlices.roundToLong(),
serviceMetrics.vmTotalCount,
serviceMetrics.vmWaitingCount,
serviceMetrics.vmInactiveCount,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index d3358ef1..a0248a93 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,7 +22,7 @@
package org.opendc.workflow.service
-import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -62,7 +62,7 @@ public interface WorkflowService : AutoCloseable {
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param tracer The event tracer to use.
- * @param meter The meter to use.
+ * @param meterProvider The meter provider to use.
* @param compute The compute client to use.
* @param mode The scheduling mode to use.
* @param jobAdmissionPolicy The job admission policy to use.
@@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
compute: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable {
return WorkflowServiceImpl(
context,
clock,
- meter,
+ meterProvider,
compute,
mode,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 5329143d..a0fd3fad 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.workflow.service.internal
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.map
import mu.KotlinLogging
@@ -48,7 +49,7 @@ import kotlin.coroutines.resume
public class WorkflowServiceImpl(
context: CoroutineContext,
internal val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val computeClient: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -67,6 +68,11 @@ public class WorkflowServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to collect metrics of this service.
+ */
+ private val meter = meterProvider.get("org.opendc.workflow.service")
+
+ /**
* The incoming jobs ready to be processed by the scheduler.
*/
internal val incomingJobs: MutableSet<JobState> = linkedSetOf()
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 07433d1f..74316437 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -51,6 +51,7 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
+import java.time.Duration
import java.util.*
/**
@@ -79,24 +80,23 @@ internal class WorkflowServiceTest {
emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ MeterProvider.noop(),
hvProvider,
)
}
- val meter = MeterProvider.noop().get("opendc-compute")
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
- val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000)
+ val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
hosts.forEach { compute.addHost(it) }
val scheduler = WorkflowService(
coroutineContext,
clock,
- meterProvider.get("opendc-workflow"),
+ meterProvider,
compute.newClient(),
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,