diff options
Diffstat (limited to 'opendc-compute')
29 files changed, 840 insertions, 606 deletions
diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index b42c2919..fd15b6e7 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -29,10 +29,8 @@ plugins { dependencies { api(projects.opendcCompute.opendcComputeApi) - api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 3a6baaa1..c0b70268 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -22,8 +22,6 @@ package org.opendc.compute.service -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.compute.api.Server import org.opendc.compute.service.driver.Host @@ -79,18 +77,16 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, scheduler: ComputeScheduler, schedulingQuantum: Duration = Duration.ofMinutes(5), ): ComputeService { - return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum) + return ComputeServiceImpl(context, clock, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt index b3958473..6fec5175 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt @@ -35,5 +35,5 @@ import java.time.Instant public data class GuestSystemStats( val uptime: Duration, val downtime: Duration, - val bootTime: Instant + val bootTime: Instant? ) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt index 1c07023f..9d34a5ce 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt @@ -41,7 +41,7 @@ import java.time.Instant public data class HostSystemStats( val uptime: Duration, val downtime: Duration, - val bootTime: Instant, + val bootTime: Instant?, val powerUsage: Double, val energyUsage: Double, val guestsTerminated: Int, diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index e8664e5c..21aaa19e 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,11 +22,6 @@ package org.opendc.compute.service.internal -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.common.util.Pacer @@ -49,14 +44,12 @@ import kotlin.math.max * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val scheduler: ComputeScheduler, schedulingQuantum: Duration ) : ComputeService, HostListener { @@ -71,11 +64,6 @@ internal class ComputeServiceImpl( private val logger = KotlinLogging.logger {} /** - * The [Meter] to track metrics of the [ComputeService]. - */ - private val meter = meterProvider.get("org.opendc.compute.service") - - /** * The [Random] instance used to generate unique identifiers for the objects. */ private val random = Random(0) @@ -117,72 +105,20 @@ internal class ComputeServiceImpl( private var maxCores = 0 private var maxMemory = 0L - - /** - * The number of scheduling attempts. - */ - private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts") - .setDescription("Number of scheduling attempts") - .setUnit("1") - .build() - private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success") - private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure") - private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error") private var _attemptsSuccess = 0L private var _attemptsFailure = 0L private var _attemptsError = 0L - - /** - * The response time of the service. - */ - private val _schedulingLatency = meter.histogramBuilder("scheduler.latency") - .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") - .ofLongs() - .setUnit("ms") - .build() - - /** - * The number of servers that are pending. - */ - private val _servers = meter.upDownCounterBuilder("scheduler.servers") - .setDescription("Number of servers managed by the scheduler") - .setUnit("1") - .build() - private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending") - private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") private var _serversPending = 0 private var _serversActive = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis(), ::doSchedule) + private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } override val hosts: Set<Host> get() = hostToView.keys - init { - val upState = Attributes.of(AttributeKey.stringKey("state"), "up") - val downState = Attributes.of(AttributeKey.stringKey("state"), "down") - - meter.upDownCounterBuilder("scheduler.hosts") - .setDescription("Number of hosts registered with the scheduler") - .setUnit("1") - .buildWithCallback { result -> - val total = hosts.size - val available = availableHosts.size.toLong() - - result.record(available, upState) - result.record(total - available, downState) - } - - meter.gaugeBuilder("system.time.provision") - .setDescription("The most recent timestamp where the server entered a provisioned state") - .setUnit("1") - .ofLongs() - .buildWithCallback(::collectProvisionTime) - } - override fun newClient(): ComputeClient { check(scope.isActive) { "Service is already closed" } return object : ComputeClient { @@ -355,7 +291,6 @@ internal class ComputeServiceImpl( server.launchedAt = Instant.ofEpochMilli(now) queue.add(request) _serversPending++ - _servers.add(1, _serversPendingAttr) requestSchedulingCycle() return request } @@ -387,14 +322,13 @@ internal class ComputeServiceImpl( /** * Run a single scheduling iteration. */ - private fun doSchedule(now: Long) { + private fun doSchedule() { while (queue.isNotEmpty()) { val request = queue.peek() if (request.isCancelled) { queue.poll() _serversPending-- - _servers.add(-1, _serversPendingAttr) continue } @@ -407,9 +341,7 @@ internal class ComputeServiceImpl( // Remove the incoming image queue.poll() _serversPending-- - _servers.add(-1, _serversPendingAttr) _attemptsFailure++ - _schedulingAttempts.add(1, _schedulingAttemptsFailureAttr) logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } @@ -425,8 +357,6 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() _serversPending-- - _servers.add(-1, _serversPendingAttr) - _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -442,10 +372,8 @@ internal class ComputeServiceImpl( host.spawn(server) activeServers[server] = host - _servers.add(1, _serversActiveAttr) _serversActive++ _attemptsSuccess++ - _schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr) } catch (e: Throwable) { logger.error(e) { "Failed to deploy VM" } @@ -454,7 +382,6 @@ internal class ComputeServiceImpl( hv.availableMemory += server.flavor.memorySize _attemptsError++ - _schedulingAttempts.add(1, _schedulingAttemptsErrorAttr) } } } @@ -511,7 +438,6 @@ internal class ComputeServiceImpl( if (activeServers.remove(server) != null) { _serversActive-- - _servers.add(-1, _serversActiveAttr) } val hv = hostToView[host] @@ -527,14 +453,4 @@ internal class ComputeServiceImpl( requestSchedulingCycle() } } - - /** - * Collect the timestamp when each server entered its provisioning state most recently. - */ - private fun collectProvisionTime(result: ObservableLongMeasurement) { - for ((_, server) in servers) { - val launchedAt = server.launchedAt ?: continue - result.record(launchedAt.toEpochMilli(), server.attributes) - } - } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index d2a2d896..f9da24d8 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -22,9 +22,6 @@ package org.opendc.compute.service.internal -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host @@ -54,21 +51,6 @@ internal class InternalServer( private val watchers = mutableListOf<ServerWatcher>() /** - * The attributes of a server. - */ - @JvmField internal val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.HOST_NAME, name) - .put(ResourceAttributes.HOST_ID, uid.toString()) - .put(ResourceAttributes.HOST_TYPE, flavor.name) - .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong()) - .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize) - .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" }) - .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(ResourceAttributes.HOST_IMAGE_NAME, image.name) - .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString()) - .build() - - /** * The [Host] that has been assigned to host the server. */ @JvmField internal var host: Host? = null diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index eb106817..cc7be4a8 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -23,7 +23,6 @@ package org.opendc.compute.service import io.mockk.* -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.delay import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNull @@ -59,7 +58,7 @@ internal class ComputeServiceTest { filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), weighers = listOf(RamWeigher()) ) - service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler) + service = ComputeService(scope.coroutineContext, clock, computeScheduler) } @Test diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index e81d87ec..72962147 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -32,11 +32,8 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorCompute) api(libs.commons.math3) implementation(projects.opendcCommon) - implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) testImplementation(projects.opendcSimulator.opendcSimulatorCore) - testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) - testImplementation(projects.opendcTelemetry.opendcTelemetryCompute) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 323ae4fe..c28239b4 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,12 +22,6 @@ package org.opendc.compute.simulator -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement -import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server @@ -67,7 +61,6 @@ public class SimHost( override val meta: Map<String, Any>, context: CoroutineContext, engine: FlowEngine, - meterProvider: MeterProvider, hypervisorProvider: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), @@ -86,11 +79,6 @@ public class SimHost( private val clock = engine.clock /** - * The [Meter] to track metrics of the simulated host. - */ - private val meter = meterProvider.get("org.opendc.compute.simulator") - - /** * The event listeners registered with this host. */ private val listeners = mutableListOf<HostListener>() @@ -142,48 +130,6 @@ public class SimHost( init { launch() - - meter.upDownCounterBuilder("system.guests") - .setDescription("Number of guests on this host") - .setUnit("1") - .buildWithCallback(::collectGuests) - meter.gaugeBuilder("system.cpu.limit") - .setDescription("Amount of CPU resources available to the host") - .buildWithCallback(::collectCpuLimit) - meter.gaugeBuilder("system.cpu.demand") - .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") - .setUnit("MHz") - .buildWithCallback { result -> result.record(hypervisor.cpuDemand) } - meter.gaugeBuilder("system.cpu.usage") - .setDescription("Amount of CPU resources used by the host") - .setUnit("MHz") - .buildWithCallback { result -> result.record(hypervisor.cpuUsage) } - meter.gaugeBuilder("system.cpu.utilization") - .setDescription("Utilization of the CPU resources of the host") - .setUnit("%") - .buildWithCallback { result -> result.record(hypervisor.cpuUsage / _cpuLimit) } - meter.counterBuilder("system.cpu.time") - .setDescription("Amount of CPU time spent by the host") - .setUnit("s") - .buildWithCallback(::collectCpuTime) - meter.gaugeBuilder("system.power.usage") - .setDescription("Power usage of the host ") - .setUnit("W") - .buildWithCallback { result -> result.record(machine.powerUsage) } - meter.counterBuilder("system.power.total") - .setDescription("Amount of energy used by the CPU") - .setUnit("J") - .ofDoubles() - .buildWithCallback { result -> result.record(machine.energyUsage) } - meter.counterBuilder("system.time") - .setDescription("The uptime of the host") - .setUnit("s") - .buildWithCallback(::collectUptime) - meter.gaugeBuilder("system.time.boot") - .setDescription("The boot time of the host") - .setUnit("1") - .ofLongs() - .buildWithCallback(::collectBootTime) } override fun canFit(server: Server): Boolean { @@ -278,7 +224,7 @@ public class SimHost( return HostSystemStats( Duration.ofMillis(_uptime), Duration.ofMillis(_downtime), - Instant.ofEpochMilli(_bootTime), + _bootTime, machine.powerUsage, machine.energyUsage, terminated, @@ -358,7 +304,7 @@ public class SimHost( _ctx = machine.startWorkload(object : SimWorkload { override fun onStart(ctx: SimMachineContext) { try { - _bootTime = clock.millis() + _bootTime = clock.instant() _state = HostState.UP hypervisor.onStart(ctx) } catch (cause: Throwable) { @@ -422,80 +368,11 @@ public class SimHost( return MachineModel(processingUnits, memoryUnits) } - private val STATE_KEY = AttributeKey.stringKey("state") - - private val terminatedState = Attributes.of(STATE_KEY, "terminated") - private val runningState = Attributes.of(STATE_KEY, "running") - private val errorState = Attributes.of(STATE_KEY, "error") - private val invalidState = Attributes.of(STATE_KEY, "invalid") - - /** - * Helper function to collect the guest counts on this host. - */ - private fun collectGuests(result: ObservableLongMeasurement) { - var terminated = 0L - var running = 0L - var error = 0L - var invalid = 0L - - val guests = _guests.listIterator() - for (guest in guests) { - when (guest.state) { - ServerState.TERMINATED -> terminated++ - ServerState.RUNNING -> running++ - ServerState.ERROR -> error++ - ServerState.DELETED -> { - // Remove guests that have been deleted - this.guests.remove(guest.server) - guests.remove() - } - else -> invalid++ - } - } - - result.record(terminated, terminatedState) - result.record(running, runningState) - result.record(error, errorState) - result.record(invalid, invalidState) - } - - private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } - - /** - * Helper function to collect the CPU limits of a machine. - */ - private fun collectCpuLimit(result: ObservableDoubleMeasurement) { - result.record(_cpuLimit) - - val guests = _guests - for (i in guests.indices) { - guests[i].collectCpuLimit(result) - } - } - - private val _activeState = Attributes.of(STATE_KEY, "active") - private val _stealState = Attributes.of(STATE_KEY, "steal") - private val _lostState = Attributes.of(STATE_KEY, "lost") - private val _idleState = Attributes.of(STATE_KEY, "idle") - - /** - * Helper function to track the CPU time of a machine. - */ - private fun collectCpuTime(result: ObservableLongMeasurement) { - val stats = getCpuStats() - - result.record(stats.activeTime, _activeState) - result.record(stats.idleTime, _idleState) - result.record(stats.stealTime, _stealState) - result.record(stats.lostTime, _lostState) - - val guests = _guests - for (i in guests.indices) { - guests[i].collectCpuTime(result) - } - } - private var _lastReport = clock.millis() + private var _uptime = 0L + private var _downtime = 0L + private var _bootTime: Instant? = null + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } /** * Helper function to track the uptime of a machine. @@ -517,40 +394,4 @@ public class SimHost( guests[i].updateUptime() } } - - private var _uptime = 0L - private var _downtime = 0L - private val _upState = Attributes.of(STATE_KEY, "up") - private val _downState = Attributes.of(STATE_KEY, "down") - - /** - * Helper function to track the uptime of a machine. - */ - private fun collectUptime(result: ObservableLongMeasurement) { - updateUptime() - - result.record(_uptime, _upState) - result.record(_downtime, _downState) - - val guests = _guests - for (i in guests.indices) { - guests[i].collectUptime(result) - } - } - - private var _bootTime = Long.MIN_VALUE - - /** - * Helper function to track the boot time of a machine. - */ - private fun collectBootTime(result: ObservableLongMeasurement) { - if (_bootTime != Long.MIN_VALUE) { - result.record(_bootTime) - } - - val guests = _guests - for (i in guests.indices) { - guests[i].collectBootTime(result) - } - } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 0d4c550d..ea3c6549 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -22,12 +22,6 @@ package org.opendc.compute.simulator.internal -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.common.AttributesBuilder -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement -import io.opentelemetry.api.metrics.ObservableLongMeasurement -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Server @@ -77,11 +71,6 @@ internal class Guest( var state: ServerState = ServerState.TERMINATED /** - * The attributes of the guest. - */ - val attributes: Attributes = GuestAttributes(this) - - /** * Start the guest. */ suspend fun start() { @@ -158,7 +147,7 @@ internal class Guest( return GuestSystemStats( Duration.ofMillis(_uptime), Duration.ofMillis(_downtime), - Instant.ofEpochMilli(_bootTime) + _bootTime ) } @@ -235,7 +224,7 @@ internal class Guest( * This method is invoked when the guest was started on the host and has booted into a running state. */ private fun onStart() { - _bootTime = clock.millis() + _bootTime = clock.instant() state = ServerState.RUNNING listener.onStart(this) } @@ -250,18 +239,11 @@ internal class Guest( listener.onStop(this) } - private val STATE_KEY = AttributeKey.stringKey("state") - private var _uptime = 0L private var _downtime = 0L - private val _upState = attributes.toBuilder() - .put(STATE_KEY, "up") - .build() - private val _downState = attributes.toBuilder() - .put(STATE_KEY, "down") - .build() - private var _lastReport = clock.millis() + private var _bootTime: Instant? = null + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } /** * Helper function to track the uptime and downtime of the guest. @@ -277,122 +259,4 @@ internal class Guest( _downtime += duration } } - - /** - * Helper function to track the uptime of the guest. - */ - fun collectUptime(result: ObservableLongMeasurement) { - updateUptime() - - result.record(_uptime, _upState) - result.record(_downtime, _downState) - } - - private var _bootTime = Long.MIN_VALUE - - /** - * Helper function to track the boot time of the guest. - */ - fun collectBootTime(result: ObservableLongMeasurement) { - if (_bootTime != Long.MIN_VALUE) { - result.record(_bootTime, attributes) - } - } - - private val _activeState = attributes.toBuilder() - .put(STATE_KEY, "active") - .build() - private val _stealState = attributes.toBuilder() - .put(STATE_KEY, "steal") - .build() - private val _lostState = attributes.toBuilder() - .put(STATE_KEY, "lost") - .build() - private val _idleState = attributes.toBuilder() - .put(STATE_KEY, "idle") - .build() - - /** - * Helper function to track the CPU time of a machine. - */ - fun collectCpuTime(result: ObservableLongMeasurement) { - val counters = machine.counters - counters.flush() - - result.record(counters.cpuActiveTime / 1000, _activeState) - result.record(counters.cpuIdleTime / 1000, _idleState) - result.record(counters.cpuStealTime / 1000, _stealState) - result.record(counters.cpuLostTime / 1000, _lostState) - } - - private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } - - /** - * Helper function to collect the CPU limits of a machine. - */ - fun collectCpuLimit(result: ObservableDoubleMeasurement) { - result.record(_cpuLimit, attributes) - } - - /** - * An optimized [Attributes] implementation. - */ - private class GuestAttributes(private val uid: String, private val attributes: Attributes) : Attributes by attributes { - /** - * Construct a [GuestAttributes] instance from a [Guest]. - */ - constructor(guest: Guest) : this( - guest.server.uid.toString(), - Attributes.builder() - .put(ResourceAttributes.HOST_NAME, guest.server.name) - .put(ResourceAttributes.HOST_ID, guest.server.uid.toString()) - .put(ResourceAttributes.HOST_TYPE, guest.server.flavor.name) - .put(AttributeKey.longKey("host.num_cpus"), guest.server.flavor.cpuCount.toLong()) - .put(AttributeKey.longKey("host.mem_capacity"), guest.server.flavor.memorySize) - .put(AttributeKey.stringArrayKey("host.labels"), guest.server.labels.map { (k, v) -> "$k:$v" }) - .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(ResourceAttributes.HOST_IMAGE_NAME, guest.server.image.name) - .put(ResourceAttributes.HOST_IMAGE_ID, guest.server.image.uid.toString()) - .build() - ) - - override fun <T : Any?> get(key: AttributeKey<T>): T? { - // Optimize access to the HOST_ID key which is accessed quite often - if (key == ResourceAttributes.HOST_ID) { - @Suppress("UNCHECKED_CAST") - return uid as T? - } - return attributes.get(key) - } - - override fun toBuilder(): AttributesBuilder { - val delegate = attributes.toBuilder() - return object : AttributesBuilder { - - override fun putAll(attributes: Attributes): AttributesBuilder { - delegate.putAll(attributes) - return this - } - - override fun <T : Any?> put(key: AttributeKey<Long>, value: Int): AttributesBuilder { - delegate.put<T>(key, value) - return this - } - - override fun <T : Any?> put(key: AttributeKey<T>, value: T): AttributesBuilder { - delegate.put(key, value) - return this - } - - override fun build(): Attributes = GuestAttributes(uid, delegate.build()) - } - } - - override fun equals(other: Any?): Boolean = attributes == other - - // Cache hash code - private val _hash = attributes.hashCode() - - override fun hashCode(): Int = _hash - } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index fd54ad1d..5ba4a667 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,7 +22,6 @@ package org.opendc.compute.simulator -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -75,7 +74,6 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, engine, - MeterProvider.noop(), SimFairShareHypervisorProvider() ) val vmImageA = MockImage( @@ -158,7 +156,6 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, engine, - MeterProvider.noop(), SimFairShareHypervisorProvider() ) val image = MockImage( diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 319b2ae3..7b5fe6c1 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -34,9 +34,6 @@ dependencies { implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) - implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 21cfdad2..fddb4890 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -30,7 +30,6 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost -import org.opendc.compute.workload.telemetry.TelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -46,7 +45,6 @@ import kotlin.math.max * * @param context [CoroutineContext] to run the simulation in. * @param clock [Clock] instance tracking simulation time. - * @param telemetry Helper class for managing telemetry. * @param scheduler [ComputeScheduler] implementation to use for the service. * @param failureModel A failure model to use for injecting failures. * @param interferenceModel The model to use for performance interference. @@ -55,7 +53,6 @@ import kotlin.math.max public class ComputeServiceHelper( private val context: CoroutineContext, private val clock: Clock, - private val telemetry: TelemetryManager, scheduler: ComputeScheduler, private val failureModel: FailureModel? = null, private val interferenceModel: VmInterferenceModel? = null, @@ -167,7 +164,6 @@ public class ComputeServiceHelper( * @return The [SimHost] that has been constructed by the runner. */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { - val meterProvider = telemetry.createMeterProvider(spec) val host = SimHost( spec.uid, spec.name, @@ -175,7 +171,6 @@ public class ComputeServiceHelper( spec.meta, context, _engine, - meterProvider, spec.hypervisor, powerDriver = spec.powerDriver, interferenceDomain = interferenceModel?.newDomain(), @@ -202,7 +197,6 @@ public class ComputeServiceHelper( * Construct a [ComputeService] instance. */ private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): ComputeService { - val meterProvider = telemetry.createMeterProvider(scheduler) - return ComputeService(context, clock, meterProvider, scheduler, schedulingQuantum) + return ComputeService(context, clock, scheduler, schedulingQuantum) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt index 6c515118..af4dad44 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt @@ -22,10 +22,10 @@ package org.opendc.compute.workload.export.parquet -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.ComputeMonitor +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 0d5b6b34..e6e7e42d 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -27,7 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.HostTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 5d11629b..082c7c88 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -27,7 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 5ad3b95e..2a0fdca1 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -22,12 +22,11 @@ package org.opendc.compute.workload.export.parquet -import io.opentelemetry.context.ContextKey.named import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt new file mode 100644 index 00000000..45bd9ab1 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt @@ -0,0 +1,424 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.compute.workload.telemetry.table.* +import java.time.Clock +import java.time.Duration +import java.time.Instant + +/** + * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every + * export interval. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param clock The virtual clock. + * @param service The [ComputeService] to monitor. + * @param servers The [Server]s to monitor. + * @param monitor The monitor to export the metrics to. + * @param exportInterval The export interval. + */ +public class ComputeMetricReader( + scope: CoroutineScope, + clock: Clock, + private val service: ComputeService, + private val servers: List<Server>, + private val monitor: ComputeMonitor, + private val exportInterval: Duration = Duration.ofMinutes(5) +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + + /** + * Aggregator for service metrics. + */ + private val serviceTableReader = ServiceTableReaderImpl(service) + + /** + * Mapping from [Host] instances to [HostTableReaderImpl] + */ + private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>() + + /** + * Mapping from [Server] instances to [ServerTableReaderImpl] + */ + private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>() + + /** + * The background job that is responsible for collecting the metrics every cycle. + */ + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + + try { + while (isActive) { + delay(intervalMs) + + try { + val now = clock.instant() + + for (host in service.hosts) { + val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + for (server in servers) { + val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + serviceTableReader.record(now) + monitor.record(serviceTableReader) + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + } + } + } finally { + if (monitor is AutoCloseable) { + monitor.close() + } + } + } + + override fun close() { + job.cancel() + } + + /** + * An aggregator for service metrics before they are reported. + */ + private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val serversPending: Int + get() = _serversPending + private var _serversPending = 0 + + override val serversActive: Int + get() = _serversActive + private var _serversActive = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + private var _attemptsError = 0 + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + _timestamp = now + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _serversPending = stats.serversPending + _serversActive = stats.serversActive + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + _attemptsError = stats.attemptsError.toInt() + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostTableReaderImpl(host: Host) : HostTableReader { + private val _host = host + + override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + private var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerUsage: Double + get() = _powerUsage + private var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + private var _powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerUsage = hostSysStats.powerUsage + _powerTotal = hostSysStats.energyUsage + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + private val _server = server + + /** + * The static information about this server. + */ + override val server = ServerInfo( + server.uid.toString(), + server.name, + "vm", + "x86", + server.image.uid.toString(), + server.image.name, + server.flavor.cpuCount, + server.flavor.memorySize + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + override var host: HostInfo? = null + private var _host: Host? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val newHost = service.lookupHost(_server) + if (newHost != null && newHost.uid != _host?.uid) { + _host = newHost + host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) + } + + val cpuStats = _host?.getCpuStats(_server) + val sysStats = _host?.getSystemStats(_server) + + _timestamp = now + _cpuLimit = cpuStats?.capacity ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = _server.launchedAt + _bootTime = sysStats?.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0 + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt index b67050ce..36a2079a 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt @@ -22,21 +22,26 @@ package org.opendc.compute.workload.telemetry -import io.opentelemetry.api.metrics.MeterProvider -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader /** - * Helper class to manage the telemetry for a [ComputeServiceHelper] instance. + * A monitor that tracks the metrics and events of the OpenDC Compute service. */ -public interface TelemetryManager { +public interface ComputeMonitor { /** - * Construct a [MeterProvider] for the specified [ComputeScheduler]. + * Record an entry with the specified [reader]. */ - public fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider + public fun record(reader: ServerTableReader) {} /** - * Construct a [MeterProvider] for the specified [HostSpec]. + * Record an entry with the specified [reader]. */ - public fun createMeterProvider(host: HostSpec): MeterProvider + public fun record(reader: HostTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: ServiceTableReader) {} } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt deleted file mode 100644 index 478c0609..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec -import org.opendc.telemetry.compute.* -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock - -/** - * A [TelemetryManager] using the OpenTelemetry Java SDK. - */ -public class SdkTelemetryManager(private val clock: Clock) : TelemetryManager, AutoCloseable { - /** - * The [SdkMeterProvider]s that belong to the workload runner. - */ - private val _meterProviders = mutableListOf<SdkMeterProvider>() - - /** - * The internal [MetricProducer] registered with the runner. - */ - private val _metricProducers = mutableListOf<MetricProducer>() - - /** - * The list of [MetricReader]s that have been registered with the runner. - */ - private val _metricReaders = mutableListOf<MetricReader>() - - /** - * A [MetricProducer] that combines all the other metric producers. - */ - public val metricProducer: MetricProducer = object : MetricProducer { - private val producers = _metricProducers - - override fun collectAllMetrics(): Collection<MetricData> = producers.flatMap(MetricProducer::collectAllMetrics) - - override fun toString(): String = "SdkTelemetryManager.AggregateMetricProducer" - } - - /** - * Register a [MetricReader] for this manager. - * - * @param factory The factory for the reader to register. - */ - public fun registerMetricReader(factory: MetricReaderFactory) { - val reader = factory.apply(metricProducer) - _metricReaders.add(reader) - } - - override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") - .build() - - return createMeterProvider(resource) - } - - override fun createMeterProvider(host: HostSpec): MeterProvider { - val resource = Resource.builder() - .put(HOST_ID, host.uid.toString()) - .put(HOST_NAME, host.name) - .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(HOST_NCPUS, host.model.cpus.size) - .put(HOST_MEM_CAPACITY, host.model.memory.sumOf { it.size }) - .build() - - return createMeterProvider(resource) - } - - /** - * Construct a [SdkMeterProvider] for the specified [resource]. - */ - private fun createMeterProvider(resource: Resource): SdkMeterProvider { - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .registerMetricReader { producer -> - _metricProducers.add(producer) - object : MetricReader { - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - } - } - .build() - _meterProviders.add(meterProvider) - return meterProvider - } - - override fun close() { - for (meterProvider in _meterProviders) { - meterProvider.close() - } - - _meterProviders.clear() - - for (metricReader in _metricReaders) { - metricReader.shutdown() - } - - _metricReaders.clear() - _metricProducers.clear() - } -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt index 4e7d0b75..5d383e40 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,17 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec +package org.opendc.compute.workload.telemetry.table /** - * A [TelemetryManager] that does nothing. + * Information about a host exposed to the telemetry service. */ -public class NoopTelemetryManager : TelemetryManager { - override fun createMeterProvider(host: HostSpec): MeterProvider = MeterProvider.noop() - - override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider = MeterProvider.noop() -} +public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt new file mode 100644 index 00000000..8f6f0d01 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface HostTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [HostInfo] of the host to which the row belongs to. + */ + public val host: HostInfo + + /** + * The number of guests that are in a terminated state. + */ + public val guestsTerminated: Int + + /** + * The number of guests that are in a running state. + */ + public val guestsRunning: Int + + /** + * The number of guests that are in an error state. + */ + public val guestsError: Int + + /** + * The number of guests that are in an unknown state. + */ + public val guestsInvalid: Int + + /** + * The capacity of the CPUs in the host (in MHz). + */ + public val cpuLimit: Double + + /** + * The usage of all CPUs in the host (in MHz). + */ + public val cpuUsage: Double + + /** + * The demand of all vCPUs of the guests (in MHz) + */ + public val cpuDemand: Double + + /** + * The CPU utilization of the host. + */ + public val cpuUtilization: Double + + /** + * The duration (in seconds) that a CPU was active in the host. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the host. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The current power usage of the host in W. + */ + public val powerUsage: Double + + /** + * The total power consumption of the host since last time in J. + */ + public val powerTotal: Double + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the host booted. + */ + public val bootTime: Instant? +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt new file mode 100644 index 00000000..111135b7 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +/** + * Static information about a server exposed to the telemetry service. + */ +public data class ServerInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt new file mode 100644 index 00000000..bccccd01 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a server trace entry. + */ +public interface ServerTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [ServerInfo] of the server to which the row belongs to. + */ + public val server: ServerInfo + + /** + * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the server was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the server booted. + */ + public val bootTime: Instant? + + /** + * The capacity of the CPUs of the servers (in MHz). + */ + public val cpuLimit: Double + + /** + * The duration (in seconds) that a CPU was active in the server. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the server. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt new file mode 100644 index 00000000..a1df6ea7 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * A trace entry for the compute service. + */ +public data class ServiceData( + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val serversPending: Int, + val serversActive: Int, + val attemptsSuccess: Int, + val attemptsFailure: Int, + val attemptsError: Int +) + +/** + * Convert a [ServiceTableReader] into a persistent object. + */ +public fun ServiceTableReader.toServiceData(): ServiceData { + return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt new file mode 100644 index 00000000..4211ab15 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a service trace entry. + */ +public interface ServiceTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The number of hosts that are up at this instant. + */ + public val hostsUp: Int + + /** + * The number of hosts that are down at this instant. + */ + public val hostsDown: Int + + /** + * The number of servers that are pending to be scheduled. + */ + public val serversPending: Int + + /** + * The number of servers that are currently active. + */ + public val serversActive: Int + + /** + * The scheduling attempts that were successful. + */ + public val attemptsSuccess: Int + + /** + * The scheduling attempts that were unsuccessful due to client error. + */ + public val attemptsFailure: Int + + /** + * The scheduling attempts that were unsuccessful due to scheduler error. + */ + public val attemptsError: Int +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt index dae03513..4344bb08 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt @@ -25,8 +25,8 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.HostInfo +import org.opendc.compute.workload.telemetry.table.HostTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt index 280f5ef8..8465871d 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt @@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.ServerInfo -import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.HostInfo +import org.opendc.compute.workload.telemetry.table.ServerInfo +import org.opendc.compute.workload.telemetry.table.ServerTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt index 7ffa7186..d91982bc 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.nio.file.Files import java.time.Instant |
