diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-14 14:41:05 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-17 16:48:53 +0200 |
| commit | 3ca64e0110adab65526a0ccfd5b252e9f047ab10 (patch) | |
| tree | 9cad172501154d01b00a1e5c45d94d7e2f1e5698 | |
| parent | 5f0b6b372487d79594cf59010822e160f351e0be (diff) | |
refactor(telemetry): Create separate MeterProvider per service/host
This change refactors the telemetry implementation by creating a
separate MeterProvider per service or host. This means we have to keep
track of multiple metric producers, but that we can attach resource
information to each of the MeterProviders like we would in a real world
scenario.
38 files changed, 1148 insertions, 759 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 1873eb99..2a1fbaa0 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -23,11 +23,13 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, scheduler: ComputeScheduler, - schedulingQuantum: Long = 300000, + schedulingQuantum: Duration = Duration.ofMinutes(5), ): ComputeService { - return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum) + return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index f1c055d4..824becf4 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,9 +22,8 @@ package org.opendc.compute.service.internal -import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -35,6 +34,7 @@ import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -42,15 +42,18 @@ import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param context The [CoroutineContext] to use. - * @param clock The clock instance to keep track of time. + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. + * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val scheduler: ComputeScheduler, - private val schedulingQuantum: Long + private val schedulingQuantum: Duration ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -63,6 +66,11 @@ internal class ComputeServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] to track metrics of the [ComputeService]. + */ + private val meter = meterProvider.get("org.opendc.compute.service") + + /** * The [Random] instance used to generate unique identifiers for the objects. */ private val random = Random(0) @@ -365,10 +373,12 @@ internal class ComputeServiceImpl( return } + val quantum = schedulingQuantum.toMillis() + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) + val delay = quantum - (clock.millis() % quantum) timerScheduler.startSingleTimer(Unit, delay) { doSchedule() @@ -414,7 +424,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() _waitingServers.add(-1) - _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString())) + _schedulerDuration.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index d9d0f3fc..05a7e1bf 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -22,6 +22,9 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host @@ -50,6 +53,21 @@ internal class InternalServer( private val watchers = mutableListOf<ServerWatcher>() /** + * The attributes of a server. + */ + internal val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, name) + .put(ResourceAttributes.HOST_ID, uid.toString()) + .put(ResourceAttributes.HOST_TYPE, flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString()) + .build() + + /** * The [Host] that has been assigned to host the server. */ internal var host: Host? = null diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index d036ec00..564f9493 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -61,8 +61,7 @@ internal class ComputeServiceTest { filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), weighers = listOf(RamWeigher()) ) - val meter = MeterProvider.noop().get("opendc-compute") - service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler) + service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler) } @Test diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index 28fd8217..dfd3bc67 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -47,8 +47,9 @@ class InternalServerTest { fun testEquality() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) @@ -59,8 +60,8 @@ class InternalServerTest { fun testEqualityWithDifferentType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk<Server>(relaxUnitFun = true) @@ -73,8 +74,8 @@ class InternalServerTest { fun testInequalityWithDifferentType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk<Server>(relaxUnitFun = true) @@ -87,8 +88,8 @@ class InternalServerTest { fun testInequalityWithIncorrectType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) assertNotEquals(a, Unit) @@ -98,8 +99,8 @@ class InternalServerTest { fun testStartTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) } @@ -114,8 +115,8 @@ class InternalServerTest { fun testStartDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -127,8 +128,8 @@ class InternalServerTest { fun testStartProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.PROVISIONING @@ -142,8 +143,8 @@ class InternalServerTest { fun testStartRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.RUNNING @@ -157,8 +158,8 @@ class InternalServerTest { fun testStopProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val request = ComputeServiceImpl.SchedulingRequest(server, 0) @@ -175,8 +176,8 @@ class InternalServerTest { fun testStopTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -189,8 +190,8 @@ class InternalServerTest { fun testStopDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -203,8 +204,8 @@ class InternalServerTest { fun testStopRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk<Host>(relaxUnitFun = true) @@ -220,8 +221,8 @@ class InternalServerTest { fun testDeleteProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val request = ComputeServiceImpl.SchedulingRequest(server, 0) @@ -239,8 +240,8 @@ class InternalServerTest { fun testDeleteTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -255,8 +256,8 @@ class InternalServerTest { fun testDeleteDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -269,8 +270,8 @@ class InternalServerTest { fun testDeleteRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk<Host>(relaxUnitFun = true) @@ -282,4 +283,20 @@ class InternalServerTest { coVerify { host.delete(server) } verify { service.delete(server) } } + + private fun mockFlavor(): InternalFlavor { + val flavor = mockk<InternalFlavor>() + every { flavor.name } returns "c5.large" + every { flavor.uid } returns UUID.randomUUID() + every { flavor.cpuCount } returns 2 + every { flavor.memorySize } returns 4096 + return flavor + } + + private fun mockImage(): InternalImage { + val image = mockk<InternalImage>() + every { image.name } returns "ubuntu-20.04" + every { image.uid } returns UUID.randomUUID() + return image + } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index a1cc3390..be6ef11e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -25,6 +25,7 @@ package org.opendc.compute.simulator import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.* import mu.KotlinLogging @@ -59,7 +60,7 @@ public class SimHost( override val meta: Map<String, Any>, context: CoroutineContext, interpreter: SimResourceInterpreter, - private val meter: Meter, + meterProvider: MeterProvider, hypervisor: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), @@ -82,6 +83,11 @@ public class SimHost( private val logger = KotlinLogging.logger {} /** + * The [Meter] to track metrics of the simulated host. + */ + private val meter = meterProvider.get("org.opendc.compute.simulator") + + /** * The event listeners registered with this host. */ private val listeners = mutableListOf<HostListener>() @@ -142,10 +148,9 @@ public class SimHost( * The total number of guests. */ private val _guests = meter.upDownCounterBuilder("guests.total") - .setDescription("Number of guests") + .setDescription("Total number of guests") .setUnit("1") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The number of active guests on the host. @@ -154,7 +159,6 @@ public class SimHost( .setDescription("Number of active guests") .setUnit("1") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The CPU demand of the host. @@ -163,7 +167,6 @@ public class SimHost( .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") .setUnit("MHz") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The CPU usage of the host. @@ -172,7 +175,6 @@ public class SimHost( .setDescription("The amount of CPU resources used by the host") .setUnit("MHz") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The power usage of the host. @@ -181,7 +183,6 @@ public class SimHost( .setDescription("The amount of power used by the CPU") .setUnit("W") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The total amount of work supplied to the CPU. @@ -191,7 +192,6 @@ public class SimHost( .setUnit("1") .ofDoubles() .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The work performed by the CPU. @@ -201,7 +201,6 @@ public class SimHost( .setUnit("1") .ofDoubles() .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The amount not performed by the CPU due to overcommitment. @@ -211,7 +210,6 @@ public class SimHost( .setUnit("1") .ofDoubles() .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The amount of work not performed by the CPU due to interference. @@ -221,7 +219,6 @@ public class SimHost( .setUnit("1") .ofDoubles() .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The amount of time in the system. @@ -230,7 +227,6 @@ public class SimHost( .setDescription("The amount of time in the system") .setUnit("ms") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The uptime of the host. @@ -239,7 +235,6 @@ public class SimHost( .setDescription("The uptime of the host") .setUnit("ms") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The downtime of the host. @@ -248,7 +243,6 @@ public class SimHost( .setDescription("The downtime of the host") .setUnit("ms") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) init { // Launch hypervisor onto machine @@ -391,13 +385,28 @@ public class SimHost( var state: ServerState = ServerState.TERMINATED /** + * The attributes of the guest. + */ + val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, server.name) + .put(ResourceAttributes.HOST_ID, server.uid.toString()) + .put(ResourceAttributes.HOST_TYPE, server.flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString()) + .build() + + /** * The amount of time in the system. */ private val _totalTime = meter.counterBuilder("guest.time.total") .setDescription("The amount of time in the system") .setUnit("ms") .build() - .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + .bind(attributes) /** * The uptime of the guest. @@ -406,7 +415,7 @@ public class SimHost( .setDescription("The uptime of the guest") .setUnit("ms") .build() - .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + .bind(attributes) /** * The time the guest is in an error state. @@ -415,7 +424,7 @@ public class SimHost( .setDescription("The time the guest is in an error state") .setUnit("ms") .build() - .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + .bind(attributes) suspend fun start() { when (state) { diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 9fa8af34..318b5a5d 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -49,6 +49,7 @@ import org.opendc.telemetry.sdk.toOtelClock import java.time.Duration import java.util.* import kotlin.coroutines.resume +import kotlin.math.roundToLong /** * Basic test-suite for the hypervisor. @@ -72,7 +73,7 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var requestedWork = 0L + var totalWork = 0L var grantedWork = 0L var overcommittedWork = 0L @@ -89,7 +90,7 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, interpreter, - meterProvider.get("opendc-compute-simulator"), + meterProvider, SimFairShareHypervisorProvider() ) val duration = 5 * 60L @@ -134,15 +135,10 @@ internal class SimHostTest { object : MetricExporter { override fun export(metrics: Collection<MetricData>): CompletableResultCode { val metricsByName = metrics.associateBy { it.name } - metricsByName["cpu.work.total"]?.let { - requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["cpu.work.granted"]?.let { - grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["cpu.work.overcommit"]?.let { - overcommittedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } + + totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong() + grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong() + overcommittedWork = metricsByName.getValue("cpu.work.overcommit").doubleSumData.points.first().value.roundToLong() return CompletableResultCode.ofSuccess() } @@ -176,7 +172,7 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(4147200, requestedWork, "Requested work does not match") }, + { assertEquals(4147200, totalWork, "Requested work does not match") }, { assertEquals(2107200, grantedWork, "Granted work does not match") }, { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, { assertEquals(1500001, clock.millis()) } @@ -188,7 +184,7 @@ internal class SimHostTest { */ @Test fun testFailure() = runBlockingSimulation { - var requestedWork = 0L + var totalWork = 0L var grantedWork = 0L var totalTime = 0L var downTime = 0L @@ -208,7 +204,7 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, interpreter, - meterProvider.get("opendc-compute-simulator"), + meterProvider, SimFairShareHypervisorProvider() ) val duration = 5 * 60L @@ -237,24 +233,14 @@ internal class SimHostTest { object : MetricExporter { override fun export(metrics: Collection<MetricData>): CompletableResultCode { val metricsByName = metrics.associateBy { it.name } - metricsByName["cpu.work.total"]?.let { - requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["cpu.work.granted"]?.let { - grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["host.time.total"]?.let { - totalTime = it.longSumData.points.first().value - } - metricsByName["host.time.down"]?.let { - downTime = it.longSumData.points.first().value - } - metricsByName["guest.time.total"]?.let { - guestTotalTime = it.longSumData.points.first().value - } - metricsByName["guest.time.error"]?.let { - guestDownTime = it.longSumData.points.first().value - } + + totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong() + grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong() + totalTime = metricsByName.getValue("host.time.total").longSumData.points.first().value + downTime = metricsByName.getValue("host.time.down").longSumData.points.first().value + guestTotalTime = metricsByName.getValue("guest.time.total").longSumData.points.first().value + guestDownTime = metricsByName.getValue("guest.time.error").longSumData.points.first().value + return CompletableResultCode.ofSuccess() } @@ -290,8 +276,8 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(2226039, requestedWork, "Total time does not match") }, - { assertEquals(1086039, grantedWork, "Down time does not match") }, + { assertEquals(2226040, totalWork, "Total time does not match") }, + { assertEquals(1086040, grantedWork, "Down time does not match") }, { assertEquals(1200001, totalTime, "Total time does not match") }, { assertEquals(1200001, guestTotalTime, "Guest total time does not match") }, { assertEquals(5000, downTime, "Down time does not match") }, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt deleted file mode 100644 index 8227bca9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import kotlinx.coroutines.* -import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.commons.math3.random.Well19937c -import org.opendc.compute.api.* -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.ReplayScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher -import org.opendc.compute.service.scheduler.weights.RamWeigher -import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.failure.HostFaultInjector -import org.opendc.compute.simulator.failure.StartStopHostFault -import org.opendc.compute.simulator.failure.StochasticVictimSelector -import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector( - context: CoroutineContext, - clock: Clock, - hosts: Set<SimHost>, - seed: Int, - failureInterval: Double -): HostFaultInjector { - val rng = Well19937c(seed) - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) -} - -/** - * Construct the environment for a simulated compute service.. - */ -suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - environmentReader: EnvironmentReader, - scheduler: ComputeScheduler, - interferenceModel: VmInterferenceModel? = null, - block: suspend CoroutineScope.(ComputeService) -> Unit -): Unit = coroutineScope { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = environmentReader - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - interpreter, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - powerDriver = SimplePowerDriver(def.powerModel), - interferenceDomain = interferenceModel?.newDomain() - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } -} - -/** - * Process the trace. - */ -suspend fun processTrace( - clock: Clock, - reader: TraceReader<SimWorkload>, - scheduler: ComputeService, - monitor: ComputeMonitor? = null, -) { - val client = scheduler.newClient() - val watcher = object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.onStateChange(clock.millis(), server, newState) - } - } - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - var offset = Long.MIN_VALUE - - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) - - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) - - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta + mapOf("workload" to workload) - ) - server.watch(watcher) - - // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) - - // Delete the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - - yield() - } finally { - reader.close() - client.close() - } -} - -/** - * Create a [MeterProvider] instance for the experiment. - */ -fun createMeterProvider(clock: Clock): MeterProvider { - return SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() -} - -/** - * Create a [ComputeScheduler] for the experiment. - */ -fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { - val cpuAllocationRatio = 16.0 - val ramAllocationRatio = 1.5 - return when (allocationPolicy) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = 1.0)) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = -1.0)) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = -1.0)) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = java.util.Random(seeder.nextLong()) - ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 82794471..f7f9336e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,10 +23,7 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload @@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.io.FileInputStream +import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom +import kotlin.math.roundToLong /** * A portfolio represents a collection of scenarios are tested for the work. @@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) { /** * Perform a single trial for this portfolio. */ - @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) - val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements) - - val meterProvider = createMeterProvider(clock) val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } } else { listOf(workload.name) } - val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() .read(FileInputStream(config.getString("interference-model"))) @@ -126,43 +122,36 @@ abstract class Portfolio(name: String) : Experiment(name) { else null - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) + val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) + val failureModel = + if (operationalPhenomena.failureFrequency > 0) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + else + null + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + performanceInterferenceModel + ) val monitor = ParquetExportMonitor( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> - val faultInjector = if (operationalPhenomena.failureFrequency > 0) { - logger.debug("ENABLING failures") - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - operationalPhenomena.failureFrequency, - ) - } else { - null - } - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() - processTrace( - clock, - trace, - scheduler, - monitor - ) - } - - faultInjector?.close() - monitor.close() + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() } - val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { "Finish " + "SUBMIT=${monitorResults.instanceCount} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 7062a275..fa00fc35 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -28,7 +28,6 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import java.io.File @@ -46,8 +45,8 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: HostData) { builder["timestamp"] = data.timestamp - builder["host_id"] = data.host.name - builder["powered_on"] = data.host.state == HostState.UP + builder["host_id"] = data.host.id + builder["powered_on"] = true builder["uptime"] = data.uptime builder["downtime"] = data.downtime builder["total_work"] = data.totalWork @@ -58,7 +57,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["cpu_demand"] = data.cpuDemand builder["power_draw"] = data.powerDraw builder["num_instances"] = data.instanceCount - builder["num_cpus"] = data.host.model.cpuCount + builder["num_cpus"] = data.host.cpuCount } override fun toString(): String = "host-writer" diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt index 9904adde..bb2db4b7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -46,12 +46,12 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: ServerData) { builder["timestamp"] = data.timestamp - builder["server_id"] = data.server.uid.toString() - builder["state"] = data.server.state + builder["server_id"] = data.server + // builder["state"] = data.server.state builder["uptime"] = data.uptime builder["downtime"] = data.downtime - builder["num_vcpus"] = data.server.flavor.cpuCount - builder["mem_capacity"] = data.server.flavor.memorySize + // builder["num_vcpus"] = data.server.flavor.cpuCount + // builder["mem_capacity"] = data.server.flavor.memorySize } override fun toString(): String = "server-writer" diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt new file mode 100644 index 00000000..3b7c3f0f --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (allocationPolicy) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt new file mode 100644 index 00000000..065a8c93 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.env.MachineDef +import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * Helper class to manage a [ComputeService] simulation. + */ +class ComputeServiceSimulator( + private val context: CoroutineContext, + private val clock: Clock, + scheduler: ComputeScheduler, + machines: List<MachineDef>, + private val failureModel: FailureModel? = null, + interferenceModel: VmInterferenceModel? = null, + hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() +) : AutoCloseable { + /** + * The [ComputeService] that has been configured by the manager. + */ + val service: ComputeService + + /** + * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. + */ + val producers: List<MetricProducer> + get() = _metricProducers + private val _metricProducers = mutableListOf<MetricProducer>() + + /** + * The [SimResourceInterpreter] to simulate the hosts. + */ + private val interpreter = SimResourceInterpreter(context, clock) + + /** + * The hosts that belong to this class. + */ + private val hosts = mutableSetOf<SimHost>() + + init { + val (service, serviceMeterProvider) = createService(scheduler) + this._metricProducers.add(serviceMeterProvider) + this.service = service + + for (def in machines) { + val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) + this._metricProducers.add(hostMeterProvider) + hosts.add(host) + this.service.addHost(host) + } + } + + /** + * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. + */ + suspend fun run(reader: TraceReader<SimWorkload>) { + val injector = failureModel?.createInjector(context, clock, service) + val client = service.newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } + delay(max(0, (entry.start - offset) - clock.millis())) + + launch { + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + mapOf("workload" to workload) + ) + + // Wait for the server reach its end time + val endTime = entry.meta["end-time"] as Long + delay(endTime + workloadOffset - clock.millis() + 1) + + // Delete the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + + yield() + } finally { + injector?.close() + reader.close() + client.close() + } + } + + override fun close() { + service.close() + + for (host in hosts) { + host.close() + } + + hosts.clear() + } + + /** + * Construct a [ComputeService] instance. + */ + private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val service = ComputeService(context, clock, meterProvider, scheduler) + return service to meterProvider + } + + /** + * Construct a [SimHost] instance for the specified [MachineDef]. + */ + private fun createHost( + def: MachineDef, + hypervisorProvider: SimHypervisorProvider, + interferenceModel: VmInterferenceModel? = null + ): Pair<SimHost, SdkMeterProvider> { + val resource = Resource.builder() + .put(HOST_ID, def.uid.toString()) + .put(HOST_NAME, def.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, def.model.cpus.size) + .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val host = SimHost( + def.uid, + def.name, + def.model, + def.meta, + context, + interpreter, + meterProvider, + hypervisorProvider, + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() + ) + + return host to meterProvider + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt new file mode 100644 index 00000000..83393896 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt new file mode 100644 index 00000000..89b4a31c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.experiments.capelin + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import org.opendc.experiments.capelin.util.FailureModel +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln +import kotlin.random.Random + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun grid5000(failureInterval: Duration, seed: Int): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService + ): HostFaultInjector { + val rng = Well19937c(seed) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} + +/** + * Obtain the [HostFaultInjector] to use for the experiments. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun createFaultInjector( + context: CoroutineContext, + clock: Clock, + hosts: Set<SimHost>, + seed: Int, + failureInterval: Double +): HostFaultInjector { + val rng = Well19937c(seed) + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index cf88535d..f4cf3e5e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload @@ -40,15 +38,19 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File +import java.time.Duration import java.util.* +import kotlin.math.roundToLong /** * An integration test suite for the Capelin experiments. @@ -60,11 +62,20 @@ class CapelinIntegrationTest { private lateinit var monitor: TestExperimentReporter /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** * Setup the experimental environment. */ @BeforeEach fun setUp() { monitor = TestExperimentReporter() + computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) } /** @@ -72,26 +83,26 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - val meterProvider = createMeterProvider(clock) - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -106,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, - { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, - { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, - { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, + { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } }, + { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } }, + { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(1.8175860403178412E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, ) } @@ -120,27 +131,26 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -151,9 +161,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, + { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -164,10 +174,6 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") @@ -177,20 +183,24 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + interferenceModel = performanceInterferenceModel + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -201,10 +211,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, + { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -214,39 +224,27 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val faultInjector = - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seed, - 24.0 * 7, - ) - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector.start() - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } - - faultInjector.close() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + grid5000(Duration.ofDays(7), seed) + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -257,9 +255,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -291,10 +289,10 @@ class CapelinIntegrationTest { var totalPowerDraw = 0.0 override fun record(data: HostData) { - this.totalWork += data.totalWork.toLong() - totalGrantedWork += data.grantedWork.toLong() - totalOvercommittedWork += data.overcommittedWork.toLong() - totalInterferedWork += data.interferedWork.toLong() + this.totalWork += data.totalWork.roundToLong() + totalGrantedWork += data.grantedWork.roundToLong() + totalOvercommittedWork += data.overcommittedWork.roundToLong() + totalInterferedWork += data.interferedWork.roundToLong() totalPowerDraw += data.powerDraw } } diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt index 650416f5..3312d6c0 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt @@ -46,6 +46,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.sdk.toOtelClock import java.io.File +import java.time.Duration import java.util.* import kotlin.math.max @@ -85,7 +86,7 @@ public class ServerlessExperiment : Experiment("Serverless") { val delayInjector = StochasticDelayInjector(coldStartModel, Random()) val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) } val service = - FaaSService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10L * 60 * 1000)) + FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10))) val client = service.newClient() coroutineScope { diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 63bed8bc..6f4fcc9b 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 7e716a34..1d5331cb 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -23,6 +23,7 @@ package org.opendc.faas.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy import org.opendc.faas.service.deployer.FunctionDeployer @@ -51,7 +52,7 @@ public interface FaaSService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meter The meter to report metrics to. + * @param meterProvider The [MeterProvider] to create a [Meter] with. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. @@ -59,12 +60,12 @@ public interface FaaSService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, ): FaaSService { - return FaaSServiceImpl(context, clock, meter, deployer, routingPolicy, terminationPolicy) + return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index a1cb1dbf..54df2b59 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -28,6 +28,7 @@ import io.opentelemetry.api.metrics.BoundLongCounter import io.opentelemetry.api.metrics.BoundLongHistogram import io.opentelemetry.api.metrics.BoundLongUpDownCounter import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.faas.service.deployer.FunctionInstance import java.util.* @@ -43,9 +44,14 @@ public class FunctionObject( meta: Map<String, Any> ) : AutoCloseable { /** - * The function identifier attached to the metrics. + * The attributes of this function. */ - private val functionId = AttributeKey.stringKey("function") + public val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.FAAS_ID, uid.toString()) + .put(ResourceAttributes.FAAS_NAME, name) + .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) + .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" }) + .build() /** * The total amount of function invocations received by the function. @@ -54,7 +60,7 @@ public class FunctionObject( .setDescription("Number of function invocations") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that could be handled directly. @@ -63,7 +69,7 @@ public class FunctionObject( .setDescription("Number of function invocations handled directly") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that were delayed due to function deployment. @@ -72,7 +78,7 @@ public class FunctionObject( .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that failed. @@ -81,7 +87,7 @@ public class FunctionObject( .setDescription("Number of function invocations that failed") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of instances for this function. @@ -90,7 +96,7 @@ public class FunctionObject( .setDescription("Number of active function instances") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of idle instances for this function. @@ -99,7 +105,7 @@ public class FunctionObject( .setDescription("Number of idle function instances") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The time that the function waited. @@ -109,7 +115,7 @@ public class FunctionObject( .setDescription("Time the function has to wait before being started") .setUnit("ms") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The time that the function was running. @@ -119,7 +125,7 @@ public class FunctionObject( .setDescription("Time the function was running") .setUnit("ms") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The instances associated with this function. diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index 1e224ed1..63dbadc7 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -26,6 +26,7 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -36,7 +37,7 @@ import kotlin.coroutines.CoroutineContext public class FunctionTerminationPolicyFixed( context: CoroutineContext, clock: Clock, - public val timeout: Long + public val timeout: Duration ) : FunctionTerminationPolicy { /** * The [TimerScheduler] used to schedule the function terminations. @@ -60,6 +61,6 @@ public class FunctionTerminationPolicyFixed( * Schedule termination for the specified [instance]. */ private fun schedule(instance: FunctionInstance) { - scheduler.startSingleTimer(instance, delay = timeout) { instance.close() } + scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index ccf9a5d9..3b560cd3 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.faas.service.internal import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -54,7 +55,7 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + private val meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy @@ -70,6 +71,11 @@ internal class FaaSServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] that collects the metrics of this service. + */ + private val meter = meterProvider.get("org.opendc.faas.service") + + /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 6b99684a..1612e10b 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -44,8 +44,7 @@ internal class FaaSServiceTest { @Test fun testClientState() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -59,8 +58,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -69,8 +67,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -81,8 +78,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -95,8 +91,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -109,8 +104,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -123,8 +117,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -135,8 +128,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -150,8 +142,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -163,9 +154,8 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") val deployer = mockk<FunctionDeployer>() - val service = FaaSService(coroutineContext, clock, meter, deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 64f2551b..0dc9ba87 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -43,6 +43,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import java.time.Duration /** * A test suite for the [FaaSService] implementation under simulated conditions. @@ -64,14 +65,13 @@ internal class SimFaaSServiceTest { @Test fun testSmoke() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) { override suspend fun invoke() {} }) val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload } val service = FaaSService( - coroutineContext, clock, meter, deployer, RandomRoutingPolicy(), - FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10000) + coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), + FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) ) val client = service.newClient() diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts index 6a3de9bc..cd8cb57a 100644 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -31,7 +31,6 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcCompute.opendcComputeSimulator) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index 57d43c60..408d1325 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -22,137 +22,260 @@ package org.opendc.telemetry.compute +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.resources.Resource import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.compute.service.driver.Host import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.HostInfo +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServerInfo import java.time.Clock /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ -public class ComputeMetricExporter( - private val clock: Clock, - private val hosts: Map<String, Host>, - private val monitor: ComputeMonitor -) : MetricExporter { - +public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter { override fun export(metrics: Collection<MetricData>): CompletableResultCode { return try { - reportHostMetrics(metrics) reportServiceMetrics(metrics) + reportHostMetrics(metrics) + reportServerMetrics(metrics) CompletableResultCode.ofSuccess() } catch (e: Throwable) { CompletableResultCode.ofFailure() } } - private var lastHostMetrics: Map<String, HBuffer> = emptyMap() - private val hostMetricsSingleton = HBuffer() + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + + private fun reportServiceMetrics(metrics: Collection<MetricData>) { + monitor.record(extractServiceMetrics(clock.millis(), metrics)) + } + + private val hosts = mutableMapOf<String, HostAggregator>() + private val servers = mutableMapOf<String, ServerAggregator>() private fun reportHostMetrics(metrics: Collection<MetricData>) { - val hostMetrics = mutableMapOf<String, HBuffer>() + val hosts = hosts + val servers = servers + + for (metric in metrics) { + val resource = metric.resource + val hostId = resource.attributes[HOST_ID] ?: continue + val agg = hosts.computeIfAbsent(hostId) { HostAggregator(resource) } + agg.accept(metric) + } + + val monitor = monitor + val now = clock.millis() + for ((_, server) in servers) { + server.record(monitor, now) + } + } + + private fun reportServerMetrics(metrics: Collection<MetricData>) { + val hosts = hosts for (metric in metrics) { + val resource = metric.resource + val host = resource.attributes[HOST_ID]?.let { hosts[it]?.host } + when (metric.name) { - "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } - "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } - "power.usage" -> mapDoubleHistogram(metric, hostMetrics) { m, v -> m.powerDraw = v } - "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } - "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } - "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } - "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } - "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } - "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v } - "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v } + "scheduler.duration" -> mapByServer(metric.doubleHistogramData.points, host) { agg, point -> + agg.schedulingLatency = point.sum / point.count + } + "guest.time.running" -> mapByServer(metric.longSumData.points, host) { agg, point -> + agg.uptime = point.value + } + "guest.time.error" -> mapByServer(metric.longSumData.points, host) { agg, point -> + agg.downtime = point.value + } } } - for ((id, hostMetric) in hostMetrics) { - val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) - val host = hosts[id] ?: continue + val monitor = monitor + val now = clock.millis() + for ((_, host) in hosts) { + host.record(monitor, now) + } + } + + /** + * Helper function to map a metric by the server. + */ + private inline fun <P : PointData> mapByServer(points: Collection<P>, host: HostInfo? = null, block: (ServerAggregator, P) -> Unit) { + for (point in points) { + val serverId = point.attributes[ResourceAttributes.HOST_ID] ?: continue + val agg = servers.computeIfAbsent(serverId) { ServerAggregator(point.attributes) } + + if (host != null) { + agg.host = host + } + + block(agg, point) + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostAggregator(resource: Resource) { + /** + * The static information about this host. + */ + val host = HostInfo( + resource.attributes[HOST_ID]!!, + resource.attributes[HOST_NAME]!!, + resource.attributes[HOST_ARCH]!!, + resource.attributes[HOST_NCPUS]!!.toInt(), + resource.attributes[HOST_MEM_CAPACITY]!!, + ) + + private var totalWork: Double = 0.0 + private var previousTotalWork = 0.0 + private var grantedWork: Double = 0.0 + private var previousGrantedWork = 0.0 + private var overcommittedWork: Double = 0.0 + private var previousOvercommittedWork = 0.0 + private var interferedWork: Double = 0.0 + private var previousInterferedWork = 0.0 + private var cpuUsage: Double = 0.0 + private var cpuDemand: Double = 0.0 + private var instanceCount: Int = 0 + private var powerDraw: Double = 0.0 + private var uptime: Long = 0 + private var previousUptime = 0L + private var downtime: Long = 0 + private var previousDowntime = 0L + fun record(monitor: ComputeMonitor, now: Long) { monitor.record( HostData( - clock.millis(), + now, host, - hostMetric.totalWork - lastHostMetric.totalWork, - hostMetric.grantedWork - lastHostMetric.grantedWork, - hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, - hostMetric.interferedWork - lastHostMetric.interferedWork, - hostMetric.cpuUsage, - hostMetric.cpuDemand, - hostMetric.instanceCount, - hostMetric.powerDraw, - hostMetric.uptime - lastHostMetric.uptime, - hostMetric.downtime - lastHostMetric.downtime, + totalWork - previousTotalWork, + grantedWork - previousGrantedWork, + overcommittedWork - previousOvercommittedWork, + interferedWork - previousInterferedWork, + cpuUsage, + cpuDemand, + instanceCount, + powerDraw, + uptime - previousUptime, + downtime - previousDowntime, ) ) - } - - lastHostMetrics = hostMetrics - } - private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) { - val points = data.doubleSummaryData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 - block(hostMetric, avg) + previousTotalWork = totalWork + previousGrantedWork = grantedWork + previousOvercommittedWork = overcommittedWork + previousInterferedWork = interferedWork + previousUptime = uptime + previousDowntime = downtime + reset() } - } - private fun mapDoubleHistogram(data: MetricData, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) { - val points = data.doubleHistogramData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - block(hostMetric, point.sum / point.count) + /** + * Accept the [MetricData] for this host. + */ + fun accept(data: MetricData) { + when (data.name) { + "cpu.work.total" -> totalWork = data.doubleSumData.points.first().value + "cpu.work.granted" -> grantedWork = data.doubleSumData.points.first().value + "cpu.work.overcommit" -> overcommittedWork = data.doubleSumData.points.first().value + "cpu.work.interference" -> interferedWork = data.doubleSumData.points.first().value + "power.usage" -> powerDraw = acceptHistogram(data) + "cpu.usage" -> cpuUsage = acceptHistogram(data) + "cpu.demand" -> cpuDemand = acceptHistogram(data) + "guests.active" -> instanceCount = data.longSumData.points.first().value.toInt() + "host.time.up" -> uptime = data.longSumData.points.first().value + "host.time.down" -> downtime = data.longSumData.points.first().value + } } - } - private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Long) -> Unit) { - val points = data?.longSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - block(hostMetric, point.value) + private fun acceptHistogram(data: MetricData): Double { + return when (data.type) { + MetricDataType.HISTOGRAM -> { + val point = data.doubleHistogramData.points.first() + point.sum / point.count + } + MetricDataType.SUMMARY -> { + val point = data.doubleSummaryData.points.first() + point.sum / point.count + } + else -> error("Invalid metric type") + } } - } - private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) { - val points = data?.doubleSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - block(hostMetric, point.value) + private fun reset() { + totalWork = 0.0 + grantedWork = 0.0 + overcommittedWork = 0.0 + interferedWork = 0.0 + cpuUsage = 0.0 + cpuDemand = 0.0 + instanceCount = 0 + powerDraw = 0.0 + uptime = 0L + downtime = 0L } } /** - * A buffer for host metrics before they are reported. + * An aggregator for server metrics before they are reported. */ - private class HBuffer { - var totalWork: Double = 0.0 - var grantedWork: Double = 0.0 - var overcommittedWork: Double = 0.0 - var interferedWork: Double = 0.0 - var cpuUsage: Double = 0.0 - var cpuDemand: Double = 0.0 - var instanceCount: Int = 0 - var powerDraw: Double = 0.0 - var uptime: Long = 0 - var downtime: Long = 0 - } + private class ServerAggregator(attributes: Attributes) { + /** + * The static information about this server. + */ + val server = ServerInfo( + attributes[ResourceAttributes.HOST_ID]!!, + attributes[ResourceAttributes.HOST_NAME]!!, + attributes[ResourceAttributes.HOST_TYPE]!!, + attributes[ResourceAttributes.HOST_ARCH]!!, + attributes[ResourceAttributes.HOST_IMAGE_ID]!!, + attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, + attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), + attributes[AttributeKey.longKey("host.mem_capacity")]!!, + ) - private fun reportServiceMetrics(metrics: Collection<MetricData>) { - monitor.record(extractServiceMetrics(clock.millis(), metrics)) - } + /** + * The [HostInfo] of the host on which the server is hosted. + */ + var host: HostInfo? = null - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + @JvmField var uptime: Long = 0 + private var previousUptime = 0L + @JvmField var downtime: Long = 0 + private var previousDowntime = 0L + @JvmField var schedulingLatency = 0.0 - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + fun record(monitor: ComputeMonitor, now: Long) { + monitor.record( + ServerData( + now, + server, + null, + uptime - previousUptime, + downtime - previousDowntime, + ) + ) + + previousUptime = uptime + previousDowntime = downtime + reset() + } + + private fun reset() { + host = null + uptime = 0L + downtime = 0L + } + } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt index ec303b37..d51bcab4 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt @@ -22,10 +22,6 @@ package org.opendc.telemetry.compute -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.compute.table.ServiceData @@ -35,16 +31,6 @@ import org.opendc.telemetry.compute.table.ServiceData */ public interface ComputeMonitor { /** - * This method is invoked when the state of a [Server] changes. - */ - public fun onStateChange(timestamp: Long, server: Server, newState: ServerState) {} - - /** - * This method is invoked when the state of a [Host] changes. - */ - public fun onStateChange(time: Long, host: Host, newState: HostState) {} - - /** * Record the specified [data]. */ public fun record(data: ServerData) {} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index 01df0e69..1f309f1b 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -24,51 +24,7 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostListener -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.ServiceData -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import java.time.Clock -import java.time.Duration - -/** - * Attach the specified monitor to the OpenDC Compute service. - */ -public suspend fun withMonitor( - scheduler: ComputeService, - clock: Clock, - metricProducer: MetricProducer, - monitor: ComputeMonitor, - exportInterval: Duration = Duration.ofMinutes(5), /* Every 5 min (which is the granularity of the workload trace) */ - block: suspend CoroutineScope.() -> Unit -): Unit = coroutineScope { - // Monitor host events - for (host in scheduler.hosts) { - monitor.onStateChange(clock.millis(), host, HostState.UP) - host.addListener(object : HostListener { - override fun onStateChanged(host: Host, newState: HostState) { - monitor.onStateChange(clock.millis(), host, newState) - } - }) - } - - val reader = CoroutineMetricReader( - this, - listOf(metricProducer), - ComputeMetricExporter(clock, scheduler.hosts.associateBy { it.uid.toString() }, monitor), - exportInterval - ) - - try { - block(this) - } finally { - reader.close() - } -} /** * Collect the metrics of the compute service. diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt new file mode 100644 index 00000000..7dca6186 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("HostAttributes") +package org.opendc.telemetry.compute + +import io.opentelemetry.api.common.AttributeKey + +/** + * The identifier of the node hosting virtual machines. + */ +public val HOST_ID: AttributeKey<String> = AttributeKey.stringKey("node.id") + +/** + * The name of the node hosting virtual machines. + */ +public val HOST_NAME: AttributeKey<String> = AttributeKey.stringKey("node.name") + +/** + * The CPU architecture of the host node. + */ +public val HOST_ARCH: AttributeKey<String> = AttributeKey.stringKey("node.arch") + +/** + * The number of CPUs in the host node. + */ +public val HOST_NCPUS: AttributeKey<Long> = AttributeKey.longKey("node.num_cpus") + +/** + * The amount of memory installed in the host node in MiB. + */ +public val HOST_MEM_CAPACITY: AttributeKey<Long> = AttributeKey.longKey("node.mem_capacity") diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt index 8e6c34d0..e3ecda3d 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt @@ -22,14 +22,12 @@ package org.opendc.telemetry.compute.table -import org.opendc.compute.service.driver.Host - /** * A trace entry for a particular host. */ public data class HostData( public val timestamp: Long, - public val host: Host, + public val host: HostInfo, public val totalWork: Double, public val grantedWork: Double, public val overcommittedWork: Double, diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt new file mode 100644 index 00000000..d9a5906b --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +/** + * Information about a host exposed to the telemetry service. + */ +public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt index 2a9fa8a6..7fde86d9 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt @@ -22,14 +22,13 @@ package org.opendc.telemetry.compute.table -import org.opendc.compute.api.Server - /** * A trace entry for a particular server. */ public data class ServerData( public val timestamp: Long, - public val server: Server, + public val server: ServerInfo, + public val host: HostInfo?, public val uptime: Long, public val downtime: Long, ) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt new file mode 100644 index 00000000..b16e5f3d --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +/** + * Static information about a server exposed to the telemetry service. + */ +public data class ServerInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long +) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index b565e90d..b9d5a3f5 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -28,10 +28,8 @@ import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.env.MachineDef @@ -39,6 +37,8 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -46,8 +46,9 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import org.opendc.web.client.ApiClient import org.opendc.web.client.AuthConfiguration @@ -55,9 +56,8 @@ import org.opendc.web.client.model.Scenario import org.opendc.web.client.model.Topology import java.io.File import java.net.URI +import java.time.Duration import java.util.* -import kotlin.random.Random -import kotlin.random.asJavaRandom import org.opendc.web.client.model.Portfolio as ClientPortfolio private val logger = KotlinLogging.logger {} @@ -158,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") { val results = (0 until targets.repeatsPerScenario).map { repeat -> logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { - val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) } + val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } runRepeat(scenario, repeat, environment, traceReader, interferenceModel) } } @@ -182,63 +182,55 @@ class RunnerCli : CliktCommand(name = "runner") { try { runBlockingSimulation { - val seed = repeat val workloadName = scenario.trace.traceId val workloadFraction = scenario.trace.loadSamplingFraction - val seeder = Random(seed) + val seeder = Random(repeat.toLong()) val meterProvider: MeterProvider = SdkMeterProvider .builder() .setClock(clock.toOtelClock()) .build() - val metricProducer = meterProvider as MetricProducer val operational = scenario.operationalPhenomena - val allocationPolicy = createComputeScheduler(operational.schedulerName, seeder) + val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) val trace = ParquetTraceReader( listOf(traceReader), Workload(workloadName, workloadFraction), - seed + repeat ) - val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0 - - withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler -> - val faultInjector = if (failureFrequency > 0) { - logger.debug { "ENABLING failures" } - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - failureFrequency, - ) - } else { + val failureModel = + if (operational.failuresEnabled) + grid5000(Duration.ofDays(7), repeat) + else null - } - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + interferenceModel.takeIf { operational.performanceInterferenceEnabled } + ) - processTrace( - clock, - trace, - scheduler, - monitor - ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - faultInjector?.close() - } + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() } - val monitorResults = collectServiceMetrics(clock.millis(), metricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { "Finish " + - "SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.runningInstanceCount}" + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" } } } catch (cause: Throwable) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt index c8e58dde..4b813310 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt @@ -22,27 +22,19 @@ package org.opendc.web.runner -import mu.KotlinLogging -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServiceData import kotlin.math.max +import kotlin.math.roundToLong /** * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ -public class WebComputeMonitor : ComputeMonitor { - private val logger = KotlinLogging.logger {} - - override fun onStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - +class WebComputeMonitor : ComputeMonitor { override fun record(data: HostData) { - val duration = 5 * 60 * 1000L - val slices = duration / SLICE_LENGTH + val duration = data.uptime + val slices = data.downtime / SLICE_LENGTH hostAggregateMetrics = AggregateHostMetrics( hostAggregateMetrics.totalWork + data.totalWork, @@ -50,14 +42,14 @@ public class WebComputeMonitor : ComputeMonitor { hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork, hostAggregateMetrics.totalInterferedWork + data.overcommittedWork, hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600, - hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0, - hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0 + hostAggregateMetrics.totalFailureSlices + slices, + hostAggregateMetrics.totalFailureVmSlices + data.instanceCount * slices ) - hostMetrics.compute(data.host) { _, prev -> + hostMetrics.compute(data.host.id) { _, prev -> HostMetrics( - (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + data.cpuUsage + (prev?.cpuUsage ?: 0.0), + data.cpuDemand + (prev?.cpuDemand ?: 0.0), data.instanceCount + (prev?.instanceCount ?: 0), 1 + (prev?.count ?: 0) ) @@ -65,7 +57,7 @@ public class WebComputeMonitor : ComputeMonitor { } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf() + private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf() private val SLICE_LENGTH: Long = 5 * 60 * 1000 data class AggregateHostMetrics( @@ -74,8 +66,8 @@ public class WebComputeMonitor : ComputeMonitor { val totalOvercommittedWork: Double = 0.0, val totalInterferedWork: Double = 0.0, val totalPowerDraw: Double = 0.0, - val totalFailureSlices: Long = 0, - val totalFailureVmSlices: Long = 0, + val totalFailureSlices: Double = 0.0, + val totalFailureVmSlices: Double = 0.0, ) data class HostMetrics( @@ -97,7 +89,7 @@ public class WebComputeMonitor : ComputeMonitor { ) } - public data class AggregateServiceMetrics( + data class AggregateServiceMetrics( val vmTotalCount: Int = 0, val vmWaitingCount: Int = 0, val vmActiveCount: Int = 0, @@ -105,7 +97,7 @@ public class WebComputeMonitor : ComputeMonitor { val vmFailedCount: Int = 0 ) - public fun getResult(): Result { + fun getResult(): Result { return Result( hostAggregateMetrics.totalWork, hostAggregateMetrics.totalGrantedWork, @@ -116,8 +108,8 @@ public class WebComputeMonitor : ComputeMonitor { hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, hostAggregateMetrics.totalPowerDraw, - hostAggregateMetrics.totalFailureSlices, - hostAggregateMetrics.totalFailureVmSlices, + hostAggregateMetrics.totalFailureSlices.roundToLong(), + hostAggregateMetrics.totalFailureVmSlices.roundToLong(), serviceMetrics.vmTotalCount, serviceMetrics.vmWaitingCount, serviceMetrics.vmInactiveCount, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index d3358ef1..a0248a93 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,7 +22,7 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -62,7 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. - * @param meter The meter to use. + * @param meterProvider The meter provider to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - meter, + meterProvider, compute, mode, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 5329143d..a0fd3fad 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.flow.map import mu.KotlinLogging @@ -48,7 +49,7 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -67,6 +68,11 @@ public class WorkflowServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] to collect metrics of this service. + */ + private val meter = meterProvider.get("org.opendc.workflow.service") + + /** * The incoming jobs ready to be processed by the scheduler. */ internal val incomingJobs: MutableSet<JobState> = linkedSetOf() diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 07433d1f..74316437 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -51,6 +51,7 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import java.time.Duration import java.util.* /** @@ -79,24 +80,23 @@ internal class WorkflowServiceTest { emptyMap(), coroutineContext, interpreter, - meterProvider.get("opendc-compute-simulator"), + MeterProvider.noop(), hvProvider, ) } - val meter = MeterProvider.noop().get("opendc-compute") val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000) + val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) hosts.forEach { compute.addHost(it) } val scheduler = WorkflowService( coroutineContext, clock, - meterProvider.get("opendc-workflow"), + meterProvider, compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, |
