diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-10-30 17:35:06 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-10-30 17:35:06 +0100 |
| commit | 7511fb768fab68d542adf5bbfb15e32300156c7e (patch) | |
| tree | 959736689bff655be4ea7e6cc92aaec60ca74f1a /opendc-compute/opendc-compute-simulator | |
| parent | 2325c62377e7c94e768a22807e107a9714626bfc (diff) | |
Added power sources to OpenDC (#258)
* Added power sources to OpenDC.
In the current form each Cluster has a single power source that is connected to all hosts in that cluster
* Added power sources to OpenDC.
In the current form each Cluster has a single power source that is connected to all hosts in that cluster
* Ran spotless Kotlin and Java
Diffstat (limited to 'opendc-compute/opendc-compute-simulator')
17 files changed, 1014 insertions, 534 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java index 84e23516..f6447573 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -50,6 +50,7 @@ import org.opendc.compute.simulator.host.HostState; import org.opendc.compute.simulator.host.SimHost; import org.opendc.compute.simulator.scheduler.ComputeScheduler; import org.opendc.compute.simulator.telemetry.SchedulerStats; +import org.opendc.simulator.compute.power.SimPowerSource; import org.opendc.simulator.compute.workload.Workload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +99,11 @@ public final class ComputeService implements AutoCloseable { private final Set<HostView> availableHosts = new HashSet<>(); /** + * The available powerSources + */ + private final Set<SimPowerSource> powerSources = new HashSet<>(); + + /** * The tasks that should be launched by the service. */ private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>(); @@ -283,6 +289,15 @@ public final class ComputeService implements AutoCloseable { host.addListener(hostListener); } + public void addPowerSource(SimPowerSource simPowerSource) { + // Check if host is already known + if (powerSources.contains(simPowerSource)) { + return; + } + + powerSources.add(simPowerSource); + } + /** * Remove a {@link SimHost} from the scheduling pool of the compute service. */ @@ -313,6 +328,10 @@ public final class ComputeService implements AutoCloseable { return this.clock; } + public Set<SimPowerSource> getPowerSources() { + return Collections.unmodifiableSet(this.powerSources); + } + /** * Collect the statistics about the scheduler component of this service. */ diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt index 31ff384c..32fcf277 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -31,6 +31,7 @@ import org.opendc.compute.simulator.telemetry.GuestCpuStats import org.opendc.compute.simulator.telemetry.GuestSystemStats import org.opendc.compute.simulator.telemetry.HostCpuStats import org.opendc.compute.simulator.telemetry.HostSystemStats +import org.opendc.simulator.Multiplexer import org.opendc.simulator.compute.cpu.CpuPowerModel import org.opendc.simulator.compute.machine.SimMachine import org.opendc.simulator.compute.models.MachineModel @@ -61,6 +62,7 @@ public class SimHost( private val graph: FlowGraph, private val machineModel: MachineModel, private val powerModel: CpuPowerModel, + private val powerMux: Multiplexer, ) : AutoCloseable { /** * The event listeners registered with this host. @@ -130,6 +132,7 @@ public class SimHost( this.graph, this.machineModel, this.powerModel, + this.powerMux, ) { cause -> hostState = if (cause != null) HostState.ERROR else HostState.DOWN } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt index 07db3d26..d8bb703e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt @@ -27,6 +27,7 @@ package org.opendc.compute.simulator.provisioner import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.topology.specs.HostSpec import java.time.Duration @@ -74,7 +75,7 @@ public fun registerComputeMonitor( */ public fun setupHosts( serviceDomain: String, - specs: List<HostSpec>, + specs: List<ClusterSpec>, ): ProvisioningStep { return HostsProvisioningStep(serviceDomain, specs) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index 19674d5e..d2231f0d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -24,7 +24,10 @@ package org.opendc.compute.simulator.provisioner import org.opendc.compute.simulator.host.SimHost import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.topology.specs.HostSpec +import org.opendc.simulator.Multiplexer +import org.opendc.simulator.compute.power.SimPowerSource import org.opendc.simulator.engine.FlowEngine /** @@ -36,37 +39,56 @@ import org.opendc.simulator.engine.FlowEngine */ public class HostsProvisioningStep internal constructor( private val serviceDomain: String, - private val specs: List<HostSpec>, + private val clusterSpecs: List<ClusterSpec>, ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { val service = requireNotNull( ctx.registry.resolve(serviceDomain, ComputeService::class.java), ) { "Compute service $serviceDomain does not exist" } - val hosts = mutableSetOf<SimHost>() + val simHosts = mutableSetOf<SimHost>() + val simPowerSources = mutableListOf<SimPowerSource>() - val flowEngine = FlowEngine.create(ctx.dispatcher) - val flowGraph = flowEngine.newGraph() + val engine = FlowEngine.create(ctx.dispatcher) + val graph = engine.newGraph() - for (spec in specs) { - val host = - SimHost( - spec.uid, - spec.name, - spec.meta, - ctx.dispatcher.timeSource, - flowGraph, - spec.model, - spec.cpuPowerModel, - ) + for (cluster in clusterSpecs) { + // Create the Power Source to which hosts are connected + // TODO: Add connection to totalPower + val simPowerSource = SimPowerSource(graph) + service.addPowerSource(simPowerSource) + simPowerSources.add(simPowerSource) - require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" } - service.addHost(host) + val powerMux = Multiplexer(graph) + graph.addEdge(powerMux, simPowerSource) + + // Create hosts, they are connected to the powerMux when SimMachine is created + for (hostSpec in cluster.hostSpecs) { + val simHost = + SimHost( + hostSpec.uid, + hostSpec.name, + hostSpec.meta, + ctx.dispatcher.timeSource, + graph, + hostSpec.model, + hostSpec.cpuPowerModel, + powerMux, + ) + + require(simHosts.add(simHost)) { "Host with uid ${hostSpec.uid} already exists" } + service.addHost(simHost) + } } return AutoCloseable { - for (host in hosts) { - host.close() + for (simHost in simHosts) { + simHost.close() + } + + for (simPowerSource in simPowerSources) { + // TODO: add close function +// simPowerSource.close() } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt index c84b2a3f..fb7c8f89 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt @@ -29,18 +29,16 @@ import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.common.Dispatcher import org.opendc.common.asCoroutineDispatcher -import org.opendc.compute.api.TaskState import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.simulator.host.SimHost import org.opendc.compute.simulator.service.ComputeService import org.opendc.compute.simulator.service.ServiceTask -import org.opendc.compute.simulator.telemetry.table.HostInfo -import org.opendc.compute.simulator.telemetry.table.HostTableReader -import org.opendc.compute.simulator.telemetry.table.ServiceTableReader -import org.opendc.compute.simulator.telemetry.table.TaskInfo -import org.opendc.compute.simulator.telemetry.table.TaskTableReader +import org.opendc.compute.simulator.telemetry.table.HostTableReaderImpl +import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReaderImpl +import org.opendc.compute.simulator.telemetry.table.ServiceTableReaderImpl +import org.opendc.compute.simulator.telemetry.table.TaskTableReaderImpl +import org.opendc.simulator.compute.power.SimPowerSource import java.time.Duration -import java.time.Instant /** * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every @@ -75,16 +73,21 @@ public class ComputeMetricReader( private var loggCounter = 0 /** - * Mapping from [Host] instances to [HostTableReaderImpl] + * Mapping from [SimHost] instances to [HostTableReaderImpl] */ private val hostTableReaders = mutableMapOf<SimHost, HostTableReaderImpl>() /** - * Mapping from [Task] instances to [TaskTableReaderImpl] + * Mapping from [ServiceTask] instances to [TaskTableReaderImpl] */ private val taskTableReaders = mutableMapOf<ServiceTask, TaskTableReaderImpl>() /** + * Mapping from [SimPowerSource] instances to [PowerSourceTableReaderImpl] + */ + private val powerSourceTableReaders = mutableMapOf<SimPowerSource, PowerSourceTableReaderImpl>() + + /** * The background job that is responsible for collecting the metrics every cycle. */ private val job = @@ -143,6 +146,21 @@ public class ComputeMetricReader( } this.service.clearTasksToRemove() + for (simPowerSource in this.service.powerSources) { + val reader = + this.powerSourceTableReaders.computeIfAbsent(simPowerSource) { + PowerSourceTableReaderImpl( + it, + startTime, + carbonTrace, + ) + } + + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } + this.serviceTableReader.record(now) monitor.record(this.serviceTableReader.copy()) @@ -165,500 +183,4 @@ public class ComputeMetricReader( override fun close() { job.cancel() } - - /** - * An aggregator for service metrics before they are reported. - */ - private class ServiceTableReaderImpl( - private val service: ComputeService, - private val startTime: Duration = Duration.ofMillis(0), - ) : ServiceTableReader { - override fun copy(): ServiceTableReader { - val newServiceTable = - ServiceTableReaderImpl( - service, - ) - newServiceTable.setValues(this) - - return newServiceTable - } - - override fun setValues(table: ServiceTableReader) { - _timestamp = table.timestamp - _timestampAbsolute = table.timestampAbsolute - - _hostsUp = table.hostsUp - _hostsDown = table.hostsDown - _tasksTotal = table.tasksTotal - _tasksPending = table.tasksPending - _tasksActive = table.tasksActive - _tasksCompleted = table.tasksCompleted - _tasksTerminated = table.tasksTerminated - _attemptsSuccess = table.attemptsSuccess - _attemptsFailure = table.attemptsFailure - } - - private var _timestamp: Instant = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - private var _timestampAbsolute: Instant = Instant.MIN - override val timestampAbsolute: Instant - get() = _timestampAbsolute - - override val hostsUp: Int - get() = _hostsUp - private var _hostsUp = 0 - - override val hostsDown: Int - get() = _hostsDown - private var _hostsDown = 0 - - override val tasksTotal: Int - get() = _tasksTotal - private var _tasksTotal = 0 - - override val tasksPending: Int - get() = _tasksPending - private var _tasksPending = 0 - - override val tasksCompleted: Int - get() = _tasksCompleted - private var _tasksCompleted = 0 - - override val tasksActive: Int - get() = _tasksActive - private var _tasksActive = 0 - - override val tasksTerminated: Int - get() = _tasksTerminated - private var _tasksTerminated = 0 - - override val attemptsSuccess: Int - get() = _attemptsSuccess - private var _attemptsSuccess = 0 - - override val attemptsFailure: Int - get() = _attemptsFailure - private var _attemptsFailure = 0 - - /** - * Record the next cycle. - */ - fun record(now: Instant) { - _timestamp = now - _timestampAbsolute = now + startTime - - val stats = service.getSchedulerStats() - _hostsUp = stats.hostsAvailable - _hostsDown = stats.hostsUnavailable - _tasksTotal = stats.tasksTotal - _tasksPending = stats.tasksPending - _tasksCompleted = stats.tasksCompleted - _tasksActive = stats.tasksActive - _tasksTerminated = stats.tasksTerminated - _attemptsSuccess = stats.attemptsSuccess.toInt() - _attemptsFailure = stats.attemptsFailure.toInt() - } - } - - /** - * An aggregator for host metrics before they are reported. - */ - private class HostTableReaderImpl( - host: SimHost, - private val startTime: Duration = Duration.ofMillis(0), - private val carbonTrace: CarbonTrace = CarbonTrace(null), - ) : HostTableReader { - override fun copy(): HostTableReader { - val newHostTable = - HostTableReaderImpl(_host) - newHostTable.setValues(this) - - return newHostTable - } - - override fun setValues(table: HostTableReader) { - _timestamp = table.timestamp - _timestampAbsolute = table.timestampAbsolute - - _guestsTerminated = table.guestsTerminated - _guestsRunning = table.guestsRunning - _guestsError = table.guestsError - _guestsInvalid = table.guestsInvalid - _cpuLimit = table.cpuLimit - _cpuDemand = table.cpuDemand - _cpuUsage = table.cpuUsage - _cpuUtilization = table.cpuUtilization - _cpuActiveTime = table.cpuActiveTime - _cpuIdleTime = table.cpuIdleTime - _cpuStealTime = table.cpuStealTime - _cpuLostTime = table.cpuLostTime - _powerDraw = table.powerDraw - _energyUsage = table.energyUsage - _carbonIntensity = table.carbonIntensity - _carbonEmission = table.carbonEmission - _uptime = table.uptime - _downtime = table.downtime - _bootTime = table.bootTime - _bootTimeAbsolute = table.bootTimeAbsolute - } - - private val _host = host - - override val host: HostInfo = - HostInfo( - host.getUid().toString(), - host.getName(), - "x86", - host.getModel().coreCount, - host.getModel().cpuCapacity, - host.getModel().memoryCapacity, - ) - - override val timestamp: Instant - get() = _timestamp - private var _timestamp = Instant.MIN - - override val timestampAbsolute: Instant - get() = _timestampAbsolute - private var _timestampAbsolute = Instant.MIN - - override val guestsTerminated: Int - get() = _guestsTerminated - private var _guestsTerminated = 0 - - override val guestsRunning: Int - get() = _guestsRunning - private var _guestsRunning = 0 - - override val guestsError: Int - get() = _guestsError - private var _guestsError = 0 - - override val guestsInvalid: Int - get() = _guestsInvalid - private var _guestsInvalid = 0 - - override val cpuLimit: Double - get() = _cpuLimit - private var _cpuLimit = 0.0 - - override val cpuUsage: Double - get() = _cpuUsage - private var _cpuUsage = 0.0 - - override val cpuDemand: Double - get() = _cpuDemand - private var _cpuDemand = 0.0 - - override val cpuUtilization: Double - get() = _cpuUtilization - private var _cpuUtilization = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - private var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - private var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - private var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - private var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - override val powerDraw: Double - get() = _powerDraw - private var _powerDraw = 0.0 - - override val energyUsage: Double - get() = _energyUsage - previousEnergyUsage - private var _energyUsage = 0.0 - private var previousEnergyUsage = 0.0 - - override val carbonIntensity: Double - get() = _carbonIntensity - private var _carbonIntensity = 0.0 - - override val carbonEmission: Double - get() = _carbonEmission - private var _carbonEmission = 0.0 - - override val uptime: Long - get() = _uptime - previousUptime - private var _uptime = 0L - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - private var _downtime = 0L - private var previousDowntime = 0L - - override val bootTime: Instant? - get() = _bootTime - private var _bootTime: Instant? = null - - override val bootTimeAbsolute: Instant? - get() = _bootTimeAbsolute - private var _bootTimeAbsolute: Instant? = null - - /** - * Record the next cycle. - */ - fun record(now: Instant) { - val hostCpuStats = _host.getCpuStats() - val hostSysStats = _host.getSystemStats() - - _timestamp = now - _timestampAbsolute = now + startTime - - _guestsTerminated = hostSysStats.guestsTerminated - _guestsRunning = hostSysStats.guestsRunning - _guestsError = hostSysStats.guestsError - _guestsInvalid = hostSysStats.guestsInvalid - _cpuLimit = hostCpuStats.capacity - _cpuDemand = hostCpuStats.demand - _cpuUsage = hostCpuStats.usage - _cpuUtilization = hostCpuStats.utilization - _cpuActiveTime = hostCpuStats.activeTime - _cpuIdleTime = hostCpuStats.idleTime - _cpuStealTime = hostCpuStats.stealTime - _cpuLostTime = hostCpuStats.lostTime - _powerDraw = hostSysStats.powerDraw - _energyUsage = hostSysStats.energyUsage - _carbonIntensity = carbonTrace.getCarbonIntensity(timestampAbsolute) - - _carbonEmission = carbonIntensity * (energyUsage / 3600000.0) // convert energy usage from J to kWh - _uptime = hostSysStats.uptime.toMillis() - _downtime = hostSysStats.downtime.toMillis() - _bootTime = hostSysStats.bootTime - _bootTime = hostSysStats.bootTime + startTime - } - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - // Reset intermediate state for next aggregation - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - previousEnergyUsage = _energyUsage - previousUptime = _uptime - previousDowntime = _downtime - - _guestsTerminated = 0 - _guestsRunning = 0 - _guestsError = 0 - _guestsInvalid = 0 - - _cpuLimit = 0.0 - _cpuUsage = 0.0 - _cpuDemand = 0.0 - _cpuUtilization = 0.0 - - _powerDraw = 0.0 - _energyUsage = 0.0 - _carbonIntensity = 0.0 - _carbonEmission = 0.0 - } - } - - /** - * An aggregator for task metrics before they are reported. - */ - private class TaskTableReaderImpl( - private val service: ComputeService, - private val task: ServiceTask, - private val startTime: Duration = Duration.ofMillis(0), - ) : TaskTableReader { - override fun copy(): TaskTableReader { - val newTaskTable = - TaskTableReaderImpl( - service, - task, - ) - newTaskTable.setValues(this) - - return newTaskTable - } - - override fun setValues(table: TaskTableReader) { - host = table.host - - _timestamp = table.timestamp - _timestampAbsolute = table.timestampAbsolute - - _cpuLimit = table.cpuLimit - _cpuActiveTime = table.cpuActiveTime - _cpuIdleTime = table.cpuIdleTime - _cpuStealTime = table.cpuStealTime - _cpuLostTime = table.cpuLostTime - _uptime = table.uptime - _downtime = table.downtime - _provisionTime = table.provisionTime - _bootTime = table.bootTime - _bootTimeAbsolute = table.bootTimeAbsolute - - _creationTime = table.creationTime - _finishTime = table.finishTime - - _taskState = table.taskState - } - - /** - * The static information about this task. - */ - override val taskInfo = - TaskInfo( - task.uid.toString(), - task.name, - "vm", - "x86", - task.flavor.coreCount, - task.flavor.memorySize, - ) - - /** - * The [HostInfo] of the host on which the task is hosted. - */ - override var host: HostInfo? = null - private var _host: SimHost? = null - - private var _timestamp = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - private var _timestampAbsolute = Instant.MIN - override val timestampAbsolute: Instant - get() = _timestampAbsolute - - override val uptime: Long - get() = _uptime - previousUptime - private var _uptime: Long = 0 - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - private var _downtime: Long = 0 - private var previousDowntime = 0L - - override val provisionTime: Instant? - get() = _provisionTime - private var _provisionTime: Instant? = null - - override val bootTime: Instant? - get() = _bootTime - private var _bootTime: Instant? = null - - override val creationTime: Instant? - get() = _creationTime - private var _creationTime: Instant? = null - - override val finishTime: Instant? - get() = _finishTime - private var _finishTime: Instant? = null - - override val cpuLimit: Double - get() = _cpuLimit - private var _cpuLimit = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - private var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - private var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - private var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - private var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - override val bootTimeAbsolute: Instant? - get() = _bootTimeAbsolute - private var _bootTimeAbsolute: Instant? = null - - override val taskState: TaskState? - get() = _taskState - private var _taskState: TaskState? = null - - /** - * Record the next cycle. - */ - fun record(now: Instant) { - val newHost = service.lookupHost(task) - if (newHost != null && newHost.getUid() != _host?.getUid()) { - _host = newHost - host = - HostInfo( - newHost.getUid().toString(), - newHost.getName(), - "x86", - newHost.getModel().coreCount, - newHost.getModel().cpuCapacity, - newHost.getModel().memoryCapacity, - ) - } - - val cpuStats = _host?.getCpuStats(task) - val sysStats = _host?.getSystemStats(task) - - _timestamp = now - _timestampAbsolute = now + startTime - - _cpuLimit = cpuStats?.capacity ?: 0.0 - _cpuActiveTime = cpuStats?.activeTime ?: 0 - _cpuIdleTime = cpuStats?.idleTime ?: 0 - _cpuStealTime = cpuStats?.stealTime ?: 0 - _cpuLostTime = cpuStats?.lostTime ?: 0 - _uptime = sysStats?.uptime?.toMillis() ?: 0 - _downtime = sysStats?.downtime?.toMillis() ?: 0 - _provisionTime = task.launchedAt - _bootTime = sysStats?.bootTime - _creationTime = task.createdAt - _finishTime = task.finishedAt - - _taskState = task.state - - if (sysStats != null) { - _bootTimeAbsolute = sysStats.bootTime + startTime - } else { - _bootTimeAbsolute = null - } - } - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - previousUptime = _uptime - previousDowntime = _downtime - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - - _host = null - _cpuLimit = 0.0 - } - } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt index 534bcc09..5e1fe2c9 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt @@ -23,6 +23,7 @@ package org.opendc.compute.simulator.telemetry import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader import org.opendc.compute.simulator.telemetry.table.ServiceTableReader import org.opendc.compute.simulator.telemetry.table.TaskTableReader @@ -43,5 +44,10 @@ public interface ComputeMonitor { /** * Record an entry with the specified [reader]. */ + public fun record(reader: PowerSourceTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ public fun record(reader: ServiceTableReader) {} } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt index 3f220ad1..691d01c1 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt @@ -36,6 +36,7 @@ import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.jsonObject import org.opendc.common.logger.logger import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader import org.opendc.compute.simulator.telemetry.table.ServiceTableReader import org.opendc.compute.simulator.telemetry.table.TaskTableReader import org.opendc.trace.util.parquet.exporter.ColListSerializer @@ -49,21 +50,25 @@ import org.opendc.trace.util.parquet.exporter.columnSerializer * * @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file. * @param[taskExportColumns] the columns that will be included in the `task.parquet` raw output file. + * @param[powerSourceExportColumns] the columns that will be included in the `power.parquet` raw output file. * @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file. */ @Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class) public data class ComputeExportConfig( public val hostExportColumns: Set<ExportColumn<HostTableReader>>, public val taskExportColumns: Set<ExportColumn<TaskTableReader>>, + public val powerSourceExportColumns: Set<ExportColumn<PowerSourceTableReader>>, public val serviceExportColumns: Set<ExportColumn<ServiceTableReader>>, ) { public constructor( hostExportColumns: Collection<ExportColumn<HostTableReader>>, taskExportColumns: Collection<ExportColumn<TaskTableReader>>, + powerSourceExportColumns: Collection<ExportColumn<PowerSourceTableReader>>, serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>, ) : this( hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS, taskExportColumns.toSet() + DfltTaskExportColumns.BASE_EXPORT_COLUMNS, + powerSourceExportColumns.toSet() + DfltPowerSourceExportColumns.BASE_EXPORT_COLUMNS, serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS, ) @@ -75,6 +80,7 @@ public data class ComputeExportConfig( | === Compute Export Config === | Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')} | Task columns : ${taskExportColumns.map { it.name }.toString().trim('[', ']')} + | Power Source columns : ${powerSourceExportColumns.map { it.name }.toString().trim('[', ']')} | Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')} """.trimIndent() @@ -88,19 +94,21 @@ public data class ComputeExportConfig( public fun loadDfltColumns() { DfltHostExportColumns DfltTaskExportColumns + DfltPowerSourceExportColumns DfltServiceExportColumns } /** - * Config that includes all columns defined in [DfltHostExportColumns], - * [DfltTaskExportColumns], [DfltServiceExportColumns] among all other loaded + * Config that includes all columns defined in [DfltHostExportColumns], [DfltTaskExportColumns], + * [DfltPowerSourceExportColumns], [DfltServiceExportColumns] among all other loaded * columns for [HostTableReader], [TaskTableReader] and [ServiceTableReader]. */ public val ALL_COLUMNS: ComputeExportConfig by lazy { - ComputeExportConfig.Companion.loadDfltColumns() + loadDfltColumns() ComputeExportConfig( hostExportColumns = ExportColumn.getAllLoadedColumns(), taskExportColumns = ExportColumn.getAllLoadedColumns(), + powerSourceExportColumns = ExportColumn.getAllLoadedColumns(), serviceExportColumns = ExportColumn.getAllLoadedColumns(), ) } @@ -122,6 +130,10 @@ public data class ComputeExportConfig( ListSerializer(columnSerializer<TaskTableReader>()).descriptor, ) element( + "powerSourceExportColumns", + ListSerializer(columnSerializer<ServiceTableReader>()).descriptor, + ) + element( "serviceExportColumns", ListSerializer(columnSerializer<ServiceTableReader>()).descriptor, ) @@ -135,16 +147,18 @@ public data class ComputeExportConfig( } // Loads the default columns so that they are available for deserialization. - ComputeExportConfig.Companion.loadDfltColumns() + loadDfltColumns() val elem = jsonDec.decodeJsonElement().jsonObject val hostFields: List<ExportColumn<HostTableReader>> = elem["hostExportColumns"].toFieldList() val taskFields: List<ExportColumn<TaskTableReader>> = elem["taskExportColumns"].toFieldList() + val powerSourceFields: List<ExportColumn<PowerSourceTableReader>> = elem["powerSourceExportColumns"].toFieldList() val serviceFields: List<ExportColumn<ServiceTableReader>> = elem["serviceExportColumns"].toFieldList() return ComputeExportConfig( hostExportColumns = hostFields, taskExportColumns = taskFields, + powerSourceExportColumns = powerSourceFields, serviceExportColumns = serviceFields, ) } @@ -153,22 +167,28 @@ public data class ComputeExportConfig( encoder: Encoder, value: ComputeExportConfig, ) { - encoder.encodeStructure(ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor) { + encoder.encodeStructure(descriptor) { encodeSerializableElement( - ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor, + descriptor, 0, ColListSerializer(columnSerializer<HostTableReader>()), value.hostExportColumns.toList(), ) encodeSerializableElement( - ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor, + descriptor, 1, ColListSerializer(columnSerializer<TaskTableReader>()), value.taskExportColumns.toList(), ) encodeSerializableElement( - ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor, + descriptor, 2, + ColListSerializer(columnSerializer<PowerSourceTableReader>()), + value.powerSourceExportColumns.toList(), + ) + encodeSerializableElement( + descriptor, + 3, ColListSerializer(columnSerializer<ServiceTableReader>()), value.serviceExportColumns.toList(), ) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltPowerSourceExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltPowerSourceExportColumns.kt new file mode 100644 index 00000000..95db55fe --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltPowerSourceExportColumns.kt @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.parquet + +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable. + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltHostExportColumns + * ... + * ``` + */ +public object DfltPowerSourceExportColumns { + public val TIMESTAMP: ExportColumn<PowerSourceTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn<PowerSourceTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val CPU_COUNT: ExportColumn<PowerSourceTableReader> = + ExportColumn( + field = Types.required(INT32).named("hosts_connected"), + ) { it.hostsConnected } + + public val POWER_DRAW: ExportColumn<PowerSourceTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("power_draw"), + ) { it.powerDraw } + + public val ENERGY_USAGE: ExportColumn<PowerSourceTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("energy_usage"), + ) { it.energyUsage } + + public val CARBON_INTENSITY: ExportColumn<PowerSourceTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("carbon_intensity"), + ) { it.carbonIntensity } + + public val CARBON_EMISSION: ExportColumn<PowerSourceTableReader> = + ExportColumn( + field = Types.required(FLOAT).named("carbon_emission"), + ) { it.carbonEmission } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt index 4cd920c4..b3150018 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt @@ -24,6 +24,7 @@ package org.opendc.compute.simulator.telemetry.parquet import org.opendc.compute.simulator.telemetry.ComputeMonitor import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader import org.opendc.compute.simulator.telemetry.table.ServiceTableReader import org.opendc.compute.simulator.telemetry.table.TaskTableReader import org.opendc.trace.util.parquet.exporter.ExportColumn @@ -37,6 +38,7 @@ import java.io.File public class ParquetComputeMonitor( private val hostExporter: Exporter<HostTableReader>, private val taskExporter: Exporter<TaskTableReader>, + private val powerSourceExporter: Exporter<PowerSourceTableReader>, private val serviceExporter: Exporter<ServiceTableReader>, ) : ComputeMonitor, AutoCloseable { override fun record(reader: HostTableReader) { @@ -47,6 +49,10 @@ public class ParquetComputeMonitor( taskExporter.write(reader) } + override fun record(reader: PowerSourceTableReader) { + powerSourceExporter.write(reader) + } + override fun record(reader: ServiceTableReader) { serviceExporter.write(reader) } @@ -54,6 +60,7 @@ public class ParquetComputeMonitor( override fun close() { hostExporter.close() taskExporter.close() + powerSourceExporter.close() serviceExporter.close() } @@ -77,12 +84,13 @@ public class ParquetComputeMonitor( bufferSize = bufferSize, hostExportColumns = computeExportConfig.hostExportColumns, taskExportColumns = computeExportConfig.taskExportColumns, + powerSourceExportColumns = computeExportConfig.powerSourceExportColumns, serviceExportColumns = computeExportConfig.serviceExportColumns, ) /** * Constructor that loads default [ExportColumn]s defined in - * [DfltHostExportColumns], [DfltTaskExportColumns], [DfltServiceExportColumns] + * [DfltHostExportColumns], [DfltTaskExportColumns], [DfltPowerSourceExportColumns], [DfltServiceExportColumns] * in case optional parameters are omitted and all fields need to be retrieved. * * @param[base] parent pathname for output file. @@ -95,6 +103,7 @@ public class ParquetComputeMonitor( bufferSize: Int, hostExportColumns: Collection<ExportColumn<HostTableReader>>? = null, taskExportColumns: Collection<ExportColumn<TaskTableReader>>? = null, + powerSourceExportColumns: Collection<ExportColumn<PowerSourceTableReader>>? = null, serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>? = null, ): ParquetComputeMonitor { // Loads the fields in case they need to be retrieved if optional params are omitted. @@ -113,6 +122,12 @@ public class ParquetComputeMonitor( columns = taskExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, ), + powerSourceExporter = + Exporter( + outputFile = File(base, "$partition/powerSource.parquet").also { it.parentFile.mkdirs() }, + columns = powerSourceExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), serviceExporter = Exporter( outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt index a5a862ce..cf8d3c8c 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt @@ -33,6 +33,10 @@ public interface HostTableReader : Exportable { public fun setValues(table: HostTableReader) + public fun record(now: Instant) + + public fun reset() + /** * The [HostInfo] of the host to which the row belongs to. */ diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReaderImpl.kt new file mode 100644 index 00000000..ab8c0036 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReaderImpl.kt @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +import org.opendc.compute.carbon.CarbonTrace +import org.opendc.compute.simulator.host.SimHost +import java.time.Duration +import java.time.Instant + +/** + * An aggregator for host metrics before they are reported. + */ +public class HostTableReaderImpl( + host: SimHost, + private val startTime: Duration = Duration.ofMillis(0), + private val carbonTrace: CarbonTrace = CarbonTrace(null), +) : HostTableReader { + override fun copy(): HostTableReader { + val newHostTable = + HostTableReaderImpl(_host) + newHostTable.setValues(this) + + return newHostTable + } + + override fun setValues(table: HostTableReader) { + _timestamp = table.timestamp + _timestampAbsolute = table.timestampAbsolute + + _guestsTerminated = table.guestsTerminated + _guestsRunning = table.guestsRunning + _guestsError = table.guestsError + _guestsInvalid = table.guestsInvalid + _cpuLimit = table.cpuLimit + _cpuDemand = table.cpuDemand + _cpuUsage = table.cpuUsage + _cpuUtilization = table.cpuUtilization + _cpuActiveTime = table.cpuActiveTime + _cpuIdleTime = table.cpuIdleTime + _cpuStealTime = table.cpuStealTime + _cpuLostTime = table.cpuLostTime + _powerDraw = table.powerDraw + _energyUsage = table.energyUsage + _carbonIntensity = table.carbonIntensity + _carbonEmission = table.carbonEmission + _uptime = table.uptime + _downtime = table.downtime + _bootTime = table.bootTime + _bootTimeAbsolute = table.bootTimeAbsolute + } + + private val _host = host + + override val host: HostInfo = + HostInfo( + host.getUid().toString(), + host.getName(), + "x86", + host.getModel().coreCount, + host.getModel().cpuCapacity, + host.getModel().memoryCapacity, + ) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val timestampAbsolute: Instant + get() = _timestampAbsolute + private var _timestampAbsolute = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + private var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerDraw: Double + get() = _powerDraw + private var _powerDraw = 0.0 + + override val energyUsage: Double + get() = _energyUsage - previousEnergyUsage + private var _energyUsage = 0.0 + private var previousEnergyUsage = 0.0 + + override val carbonIntensity: Double + get() = _carbonIntensity + private var _carbonIntensity = 0.0 + + override val carbonEmission: Double + get() = _carbonEmission + private var _carbonEmission = 0.0 + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val bootTimeAbsolute: Instant? + get() = _bootTimeAbsolute + private var _bootTimeAbsolute: Instant? = null + + /** + * Record the next cycle. + */ + override fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _timestampAbsolute = now + startTime + + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerDraw = hostSysStats.powerDraw + _energyUsage = hostSysStats.energyUsage + _carbonIntensity = carbonTrace.getCarbonIntensity(timestampAbsolute) + + _carbonEmission = carbonIntensity * (energyUsage / 3600000.0) // convert energy usage from J to kWh + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + _bootTime = hostSysStats.bootTime + startTime + } + + /** + * Finish the aggregation for this cycle. + */ + override fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousEnergyUsage = _energyUsage + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerDraw = 0.0 + _energyUsage = 0.0 + _carbonIntensity = 0.0 + _carbonEmission = 0.0 + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReader.kt new file mode 100644 index 00000000..cd2b2d2c --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReader.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +import org.opendc.trace.util.parquet.exporter.Exportable +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface PowerSourceTableReader : Exportable { + public fun copy(): PowerSourceTableReader + + public fun setValues(table: PowerSourceTableReader) + + public fun record(now: Instant) + + public fun reset() + + /** + * The timestamp of the current entry of the reader relative to the start of the workload. + */ + public val timestamp: Instant + + /** + * The timestamp of the current entry of the reader. + */ + public val timestampAbsolute: Instant + + /** + * The number of connected hosts + */ + public val hostsConnected: Int + + /** + * The current power draw of the host in W. + */ + public val powerDraw: Double + + /** + * The total energy consumption of the host since last sample in J. + */ + public val energyUsage: Double + + /** + * The current carbon intensity of the host in gCO2 / kW. + */ + public val carbonIntensity: Double + + /** + * The current carbon emission since the last deadline in g. + */ + public val carbonEmission: Double +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReaderImpl.kt new file mode 100644 index 00000000..91918ea8 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReaderImpl.kt @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +import org.opendc.compute.carbon.CarbonTrace +import org.opendc.simulator.compute.power.SimPowerSource +import java.time.Duration +import java.time.Instant + +/** + * An aggregator for task metrics before they are reported. + */ +public class PowerSourceTableReaderImpl( + powerSource: SimPowerSource, + private val startTime: Duration = Duration.ofMillis(0), + private val carbonTrace: CarbonTrace = CarbonTrace(null), +) : PowerSourceTableReader { + override fun copy(): PowerSourceTableReader { + val newPowerSourceTable = + PowerSourceTableReaderImpl( + powerSource, + ) + newPowerSourceTable.setValues(this) + + return newPowerSourceTable + } + + override fun setValues(table: PowerSourceTableReader) { + _timestamp = table.timestamp + _timestampAbsolute = table.timestampAbsolute + + _hostsConnected = table.hostsConnected + _powerDraw = table.powerDraw + _energyUsage = table.energyUsage + _carbonIntensity = table.carbonIntensity + _carbonEmission = table.carbonEmission + } + + private val powerSource = powerSource + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + private var _timestampAbsolute = Instant.MIN + override val timestampAbsolute: Instant + get() = _timestampAbsolute + + override val hostsConnected: Int + get() = _hostsConnected + private var _hostsConnected: Int = 0 + + override val powerDraw: Double + get() = _powerDraw + private var _powerDraw = 0.0 + + override val energyUsage: Double + get() = _energyUsage - previousEnergyUsage + private var _energyUsage = 0.0 + private var previousEnergyUsage = 0.0 + + override val carbonIntensity: Double + get() = _carbonIntensity + private var _carbonIntensity = 0.0 + + override val carbonEmission: Double + get() = _carbonEmission + private var _carbonEmission = 0.0 + + /** + * Record the next cycle. + */ + override fun record(now: Instant) { + _timestamp = now + _timestampAbsolute = now + startTime + + _hostsConnected = 0 + _powerDraw = powerSource.powerDraw + _energyUsage = powerSource.energyUsage + _carbonIntensity = 0.0 + _carbonEmission = carbonIntensity * (energyUsage / 3600000.0) + } + + /** + * Finish the aggregation for this cycle. + */ + override fun reset() { + previousEnergyUsage = _energyUsage + + _hostsConnected = 0 + _powerDraw = 0.0 + _energyUsage = 0.0 + _carbonIntensity = 0.0 + _carbonEmission = 0.0 + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt index 690dfe0a..c8cc765a 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt @@ -33,6 +33,8 @@ public interface ServiceTableReader : Exportable { public fun setValues(table: ServiceTableReader) + public fun record(now: Instant) + /** * The timestamp of the current entry of the reader. */ diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReaderImpl.kt new file mode 100644 index 00000000..52a25021 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReaderImpl.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +import org.opendc.compute.simulator.service.ComputeService +import java.time.Duration +import java.time.Instant + +/** + * An aggregator for service metrics before they are reported. + */ +public class ServiceTableReaderImpl( + private val service: ComputeService, + private val startTime: Duration = Duration.ofMillis(0), +) : ServiceTableReader { + override fun copy(): ServiceTableReader { + val newServiceTable = + ServiceTableReaderImpl( + service, + ) + newServiceTable.setValues(this) + + return newServiceTable + } + + override fun setValues(table: ServiceTableReader) { + _timestamp = table.timestamp + _timestampAbsolute = table.timestampAbsolute + + _hostsUp = table.hostsUp + _hostsDown = table.hostsDown + _tasksTotal = table.tasksTotal + _tasksPending = table.tasksPending + _tasksActive = table.tasksActive + _tasksCompleted = table.tasksCompleted + _tasksTerminated = table.tasksTerminated + _attemptsSuccess = table.attemptsSuccess + _attemptsFailure = table.attemptsFailure + } + + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + private var _timestampAbsolute: Instant = Instant.MIN + override val timestampAbsolute: Instant + get() = _timestampAbsolute + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val tasksTotal: Int + get() = _tasksTotal + private var _tasksTotal = 0 + + override val tasksPending: Int + get() = _tasksPending + private var _tasksPending = 0 + + override val tasksCompleted: Int + get() = _tasksCompleted + private var _tasksCompleted = 0 + + override val tasksActive: Int + get() = _tasksActive + private var _tasksActive = 0 + + override val tasksTerminated: Int + get() = _tasksTerminated + private var _tasksTerminated = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + /** + * Record the next cycle. + */ + override fun record(now: Instant) { + _timestamp = now + _timestampAbsolute = now + startTime + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _tasksTotal = stats.tasksTotal + _tasksPending = stats.tasksPending + _tasksCompleted = stats.tasksCompleted + _tasksActive = stats.tasksActive + _tasksTerminated = stats.tasksTerminated + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt index 825019e8..50ffa5fc 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt @@ -35,6 +35,10 @@ public interface TaskTableReader : Exportable { public fun setValues(table: TaskTableReader) + public fun record(now: Instant) + + public fun reset() + /** * The timestamp of the current entry of the reader relative to the start of the workload. */ diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt new file mode 100644 index 00000000..5a0897f7 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry.table + +import org.opendc.compute.api.TaskState +import org.opendc.compute.simulator.host.SimHost +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.service.ServiceTask +import java.time.Duration +import java.time.Instant + +/** + * An aggregator for task metrics before they are reported. + */ +public class TaskTableReaderImpl( + private val service: ComputeService, + private val task: ServiceTask, + private val startTime: Duration = Duration.ofMillis(0), +) : TaskTableReader { + override fun copy(): TaskTableReader { + val newTaskTable = + TaskTableReaderImpl( + service, + task, + ) + newTaskTable.setValues(this) + + return newTaskTable + } + + override fun setValues(table: TaskTableReader) { + host = table.host + + _timestamp = table.timestamp + _timestampAbsolute = table.timestampAbsolute + + _cpuLimit = table.cpuLimit + _cpuActiveTime = table.cpuActiveTime + _cpuIdleTime = table.cpuIdleTime + _cpuStealTime = table.cpuStealTime + _cpuLostTime = table.cpuLostTime + _uptime = table.uptime + _downtime = table.downtime + _provisionTime = table.provisionTime + _bootTime = table.bootTime + _bootTimeAbsolute = table.bootTimeAbsolute + + _creationTime = table.creationTime + _finishTime = table.finishTime + + _taskState = table.taskState + } + + /** + * The static information about this task. + */ + override val taskInfo: TaskInfo = + TaskInfo( + task.uid.toString(), + task.name, + "vm", + "x86", + task.flavor.coreCount, + task.flavor.memorySize, + ) + + /** + * The [HostInfo] of the host on which the task is hosted. + */ + override var host: HostInfo? = null + private var _host: SimHost? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + private var _timestampAbsolute = Instant.MIN + override val timestampAbsolute: Instant + get() = _timestampAbsolute + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val creationTime: Instant? + get() = _creationTime + private var _creationTime: Instant? = null + + override val finishTime: Instant? + get() = _finishTime + private var _finishTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val bootTimeAbsolute: Instant? + get() = _bootTimeAbsolute + private var _bootTimeAbsolute: Instant? = null + + override val taskState: TaskState? + get() = _taskState + private var _taskState: TaskState? = null + + /** + * Record the next cycle. + */ + override fun record(now: Instant) { + val newHost = service.lookupHost(task) + if (newHost != null && newHost.getUid() != _host?.getUid()) { + _host = newHost + host = + HostInfo( + newHost.getUid().toString(), + newHost.getName(), + "x86", + newHost.getModel().coreCount, + newHost.getModel().cpuCapacity, + newHost.getModel().memoryCapacity, + ) + } + + val cpuStats = _host?.getCpuStats(task) + val sysStats = _host?.getSystemStats(task) + + _timestamp = now + _timestampAbsolute = now + startTime + + _cpuLimit = cpuStats?.capacity ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = task.launchedAt + _bootTime = sysStats?.bootTime + _creationTime = task.createdAt + _finishTime = task.finishedAt + + _taskState = task.state + + if (sysStats != null) { + _bootTimeAbsolute = sysStats.bootTime + startTime + } else { + _bootTimeAbsolute = null + } + } + + /** + * Finish the aggregation for this cycle. + */ + override fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0 + } +} |
