summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute/opendc-compute-simulator')
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java19
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt60
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt532
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt6
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt36
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltPowerSourceExportColumns.kt91
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt17
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReaderImpl.kt240
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReader.kt74
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReaderImpl.kt116
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReaderImpl.kt123
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt218
17 files changed, 1014 insertions, 534 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
index 84e23516..f6447573 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
@@ -50,6 +50,7 @@ import org.opendc.compute.simulator.host.HostState;
import org.opendc.compute.simulator.host.SimHost;
import org.opendc.compute.simulator.scheduler.ComputeScheduler;
import org.opendc.compute.simulator.telemetry.SchedulerStats;
+import org.opendc.simulator.compute.power.SimPowerSource;
import org.opendc.simulator.compute.workload.Workload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +99,11 @@ public final class ComputeService implements AutoCloseable {
private final Set<HostView> availableHosts = new HashSet<>();
/**
+ * The available powerSources
+ */
+ private final Set<SimPowerSource> powerSources = new HashSet<>();
+
+ /**
* The tasks that should be launched by the service.
*/
private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>();
@@ -283,6 +289,15 @@ public final class ComputeService implements AutoCloseable {
host.addListener(hostListener);
}
+ public void addPowerSource(SimPowerSource simPowerSource) {
+ // Check if host is already known
+ if (powerSources.contains(simPowerSource)) {
+ return;
+ }
+
+ powerSources.add(simPowerSource);
+ }
+
/**
* Remove a {@link SimHost} from the scheduling pool of the compute service.
*/
@@ -313,6 +328,10 @@ public final class ComputeService implements AutoCloseable {
return this.clock;
}
+ public Set<SimPowerSource> getPowerSources() {
+ return Collections.unmodifiableSet(this.powerSources);
+ }
+
/**
* Collect the statistics about the scheduler component of this service.
*/
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
index 31ff384c..32fcf277 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
@@ -31,6 +31,7 @@ import org.opendc.compute.simulator.telemetry.GuestCpuStats
import org.opendc.compute.simulator.telemetry.GuestSystemStats
import org.opendc.compute.simulator.telemetry.HostCpuStats
import org.opendc.compute.simulator.telemetry.HostSystemStats
+import org.opendc.simulator.Multiplexer
import org.opendc.simulator.compute.cpu.CpuPowerModel
import org.opendc.simulator.compute.machine.SimMachine
import org.opendc.simulator.compute.models.MachineModel
@@ -61,6 +62,7 @@ public class SimHost(
private val graph: FlowGraph,
private val machineModel: MachineModel,
private val powerModel: CpuPowerModel,
+ private val powerMux: Multiplexer,
) : AutoCloseable {
/**
* The event listeners registered with this host.
@@ -130,6 +132,7 @@ public class SimHost(
this.graph,
this.machineModel,
this.powerModel,
+ this.powerMux,
) { cause ->
hostState = if (cause != null) HostState.ERROR else HostState.DOWN
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
index 07db3d26..d8bb703e 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
@@ -27,6 +27,7 @@ package org.opendc.compute.simulator.provisioner
import org.opendc.compute.carbon.CarbonTrace
import org.opendc.compute.simulator.scheduler.ComputeScheduler
import org.opendc.compute.simulator.telemetry.ComputeMonitor
+import org.opendc.compute.topology.specs.ClusterSpec
import org.opendc.compute.topology.specs.HostSpec
import java.time.Duration
@@ -74,7 +75,7 @@ public fun registerComputeMonitor(
*/
public fun setupHosts(
serviceDomain: String,
- specs: List<HostSpec>,
+ specs: List<ClusterSpec>,
): ProvisioningStep {
return HostsProvisioningStep(serviceDomain, specs)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
index 19674d5e..d2231f0d 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
@@ -24,7 +24,10 @@ package org.opendc.compute.simulator.provisioner
import org.opendc.compute.simulator.host.SimHost
import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.compute.topology.specs.ClusterSpec
import org.opendc.compute.topology.specs.HostSpec
+import org.opendc.simulator.Multiplexer
+import org.opendc.simulator.compute.power.SimPowerSource
import org.opendc.simulator.engine.FlowEngine
/**
@@ -36,37 +39,56 @@ import org.opendc.simulator.engine.FlowEngine
*/
public class HostsProvisioningStep internal constructor(
private val serviceDomain: String,
- private val specs: List<HostSpec>,
+ private val clusterSpecs: List<ClusterSpec>,
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
val service =
requireNotNull(
ctx.registry.resolve(serviceDomain, ComputeService::class.java),
) { "Compute service $serviceDomain does not exist" }
- val hosts = mutableSetOf<SimHost>()
+ val simHosts = mutableSetOf<SimHost>()
+ val simPowerSources = mutableListOf<SimPowerSource>()
- val flowEngine = FlowEngine.create(ctx.dispatcher)
- val flowGraph = flowEngine.newGraph()
+ val engine = FlowEngine.create(ctx.dispatcher)
+ val graph = engine.newGraph()
- for (spec in specs) {
- val host =
- SimHost(
- spec.uid,
- spec.name,
- spec.meta,
- ctx.dispatcher.timeSource,
- flowGraph,
- spec.model,
- spec.cpuPowerModel,
- )
+ for (cluster in clusterSpecs) {
+ // Create the Power Source to which hosts are connected
+ // TODO: Add connection to totalPower
+ val simPowerSource = SimPowerSource(graph)
+ service.addPowerSource(simPowerSource)
+ simPowerSources.add(simPowerSource)
- require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" }
- service.addHost(host)
+ val powerMux = Multiplexer(graph)
+ graph.addEdge(powerMux, simPowerSource)
+
+ // Create hosts, they are connected to the powerMux when SimMachine is created
+ for (hostSpec in cluster.hostSpecs) {
+ val simHost =
+ SimHost(
+ hostSpec.uid,
+ hostSpec.name,
+ hostSpec.meta,
+ ctx.dispatcher.timeSource,
+ graph,
+ hostSpec.model,
+ hostSpec.cpuPowerModel,
+ powerMux,
+ )
+
+ require(simHosts.add(simHost)) { "Host with uid ${hostSpec.uid} already exists" }
+ service.addHost(simHost)
+ }
}
return AutoCloseable {
- for (host in hosts) {
- host.close()
+ for (simHost in simHosts) {
+ simHost.close()
+ }
+
+ for (simPowerSource in simPowerSources) {
+ // TODO: add close function
+// simPowerSource.close()
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt
index c84b2a3f..fb7c8f89 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt
@@ -29,18 +29,16 @@ import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.common.Dispatcher
import org.opendc.common.asCoroutineDispatcher
-import org.opendc.compute.api.TaskState
import org.opendc.compute.carbon.CarbonTrace
import org.opendc.compute.simulator.host.SimHost
import org.opendc.compute.simulator.service.ComputeService
import org.opendc.compute.simulator.service.ServiceTask
-import org.opendc.compute.simulator.telemetry.table.HostInfo
-import org.opendc.compute.simulator.telemetry.table.HostTableReader
-import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
-import org.opendc.compute.simulator.telemetry.table.TaskInfo
-import org.opendc.compute.simulator.telemetry.table.TaskTableReader
+import org.opendc.compute.simulator.telemetry.table.HostTableReaderImpl
+import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReaderImpl
+import org.opendc.compute.simulator.telemetry.table.ServiceTableReaderImpl
+import org.opendc.compute.simulator.telemetry.table.TaskTableReaderImpl
+import org.opendc.simulator.compute.power.SimPowerSource
import java.time.Duration
-import java.time.Instant
/**
* A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
@@ -75,16 +73,21 @@ public class ComputeMetricReader(
private var loggCounter = 0
/**
- * Mapping from [Host] instances to [HostTableReaderImpl]
+ * Mapping from [SimHost] instances to [HostTableReaderImpl]
*/
private val hostTableReaders = mutableMapOf<SimHost, HostTableReaderImpl>()
/**
- * Mapping from [Task] instances to [TaskTableReaderImpl]
+ * Mapping from [ServiceTask] instances to [TaskTableReaderImpl]
*/
private val taskTableReaders = mutableMapOf<ServiceTask, TaskTableReaderImpl>()
/**
+ * Mapping from [SimPowerSource] instances to [PowerSourceTableReaderImpl]
+ */
+ private val powerSourceTableReaders = mutableMapOf<SimPowerSource, PowerSourceTableReaderImpl>()
+
+ /**
* The background job that is responsible for collecting the metrics every cycle.
*/
private val job =
@@ -143,6 +146,21 @@ public class ComputeMetricReader(
}
this.service.clearTasksToRemove()
+ for (simPowerSource in this.service.powerSources) {
+ val reader =
+ this.powerSourceTableReaders.computeIfAbsent(simPowerSource) {
+ PowerSourceTableReaderImpl(
+ it,
+ startTime,
+ carbonTrace,
+ )
+ }
+
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
+
this.serviceTableReader.record(now)
monitor.record(this.serviceTableReader.copy())
@@ -165,500 +183,4 @@ public class ComputeMetricReader(
override fun close() {
job.cancel()
}
-
- /**
- * An aggregator for service metrics before they are reported.
- */
- private class ServiceTableReaderImpl(
- private val service: ComputeService,
- private val startTime: Duration = Duration.ofMillis(0),
- ) : ServiceTableReader {
- override fun copy(): ServiceTableReader {
- val newServiceTable =
- ServiceTableReaderImpl(
- service,
- )
- newServiceTable.setValues(this)
-
- return newServiceTable
- }
-
- override fun setValues(table: ServiceTableReader) {
- _timestamp = table.timestamp
- _timestampAbsolute = table.timestampAbsolute
-
- _hostsUp = table.hostsUp
- _hostsDown = table.hostsDown
- _tasksTotal = table.tasksTotal
- _tasksPending = table.tasksPending
- _tasksActive = table.tasksActive
- _tasksCompleted = table.tasksCompleted
- _tasksTerminated = table.tasksTerminated
- _attemptsSuccess = table.attemptsSuccess
- _attemptsFailure = table.attemptsFailure
- }
-
- private var _timestamp: Instant = Instant.MIN
- override val timestamp: Instant
- get() = _timestamp
-
- private var _timestampAbsolute: Instant = Instant.MIN
- override val timestampAbsolute: Instant
- get() = _timestampAbsolute
-
- override val hostsUp: Int
- get() = _hostsUp
- private var _hostsUp = 0
-
- override val hostsDown: Int
- get() = _hostsDown
- private var _hostsDown = 0
-
- override val tasksTotal: Int
- get() = _tasksTotal
- private var _tasksTotal = 0
-
- override val tasksPending: Int
- get() = _tasksPending
- private var _tasksPending = 0
-
- override val tasksCompleted: Int
- get() = _tasksCompleted
- private var _tasksCompleted = 0
-
- override val tasksActive: Int
- get() = _tasksActive
- private var _tasksActive = 0
-
- override val tasksTerminated: Int
- get() = _tasksTerminated
- private var _tasksTerminated = 0
-
- override val attemptsSuccess: Int
- get() = _attemptsSuccess
- private var _attemptsSuccess = 0
-
- override val attemptsFailure: Int
- get() = _attemptsFailure
- private var _attemptsFailure = 0
-
- /**
- * Record the next cycle.
- */
- fun record(now: Instant) {
- _timestamp = now
- _timestampAbsolute = now + startTime
-
- val stats = service.getSchedulerStats()
- _hostsUp = stats.hostsAvailable
- _hostsDown = stats.hostsUnavailable
- _tasksTotal = stats.tasksTotal
- _tasksPending = stats.tasksPending
- _tasksCompleted = stats.tasksCompleted
- _tasksActive = stats.tasksActive
- _tasksTerminated = stats.tasksTerminated
- _attemptsSuccess = stats.attemptsSuccess.toInt()
- _attemptsFailure = stats.attemptsFailure.toInt()
- }
- }
-
- /**
- * An aggregator for host metrics before they are reported.
- */
- private class HostTableReaderImpl(
- host: SimHost,
- private val startTime: Duration = Duration.ofMillis(0),
- private val carbonTrace: CarbonTrace = CarbonTrace(null),
- ) : HostTableReader {
- override fun copy(): HostTableReader {
- val newHostTable =
- HostTableReaderImpl(_host)
- newHostTable.setValues(this)
-
- return newHostTable
- }
-
- override fun setValues(table: HostTableReader) {
- _timestamp = table.timestamp
- _timestampAbsolute = table.timestampAbsolute
-
- _guestsTerminated = table.guestsTerminated
- _guestsRunning = table.guestsRunning
- _guestsError = table.guestsError
- _guestsInvalid = table.guestsInvalid
- _cpuLimit = table.cpuLimit
- _cpuDemand = table.cpuDemand
- _cpuUsage = table.cpuUsage
- _cpuUtilization = table.cpuUtilization
- _cpuActiveTime = table.cpuActiveTime
- _cpuIdleTime = table.cpuIdleTime
- _cpuStealTime = table.cpuStealTime
- _cpuLostTime = table.cpuLostTime
- _powerDraw = table.powerDraw
- _energyUsage = table.energyUsage
- _carbonIntensity = table.carbonIntensity
- _carbonEmission = table.carbonEmission
- _uptime = table.uptime
- _downtime = table.downtime
- _bootTime = table.bootTime
- _bootTimeAbsolute = table.bootTimeAbsolute
- }
-
- private val _host = host
-
- override val host: HostInfo =
- HostInfo(
- host.getUid().toString(),
- host.getName(),
- "x86",
- host.getModel().coreCount,
- host.getModel().cpuCapacity,
- host.getModel().memoryCapacity,
- )
-
- override val timestamp: Instant
- get() = _timestamp
- private var _timestamp = Instant.MIN
-
- override val timestampAbsolute: Instant
- get() = _timestampAbsolute
- private var _timestampAbsolute = Instant.MIN
-
- override val guestsTerminated: Int
- get() = _guestsTerminated
- private var _guestsTerminated = 0
-
- override val guestsRunning: Int
- get() = _guestsRunning
- private var _guestsRunning = 0
-
- override val guestsError: Int
- get() = _guestsError
- private var _guestsError = 0
-
- override val guestsInvalid: Int
- get() = _guestsInvalid
- private var _guestsInvalid = 0
-
- override val cpuLimit: Double
- get() = _cpuLimit
- private var _cpuLimit = 0.0
-
- override val cpuUsage: Double
- get() = _cpuUsage
- private var _cpuUsage = 0.0
-
- override val cpuDemand: Double
- get() = _cpuDemand
- private var _cpuDemand = 0.0
-
- override val cpuUtilization: Double
- get() = _cpuUtilization
- private var _cpuUtilization = 0.0
-
- override val cpuActiveTime: Long
- get() = _cpuActiveTime - previousCpuActiveTime
- private var _cpuActiveTime = 0L
- private var previousCpuActiveTime = 0L
-
- override val cpuIdleTime: Long
- get() = _cpuIdleTime - previousCpuIdleTime
- private var _cpuIdleTime = 0L
- private var previousCpuIdleTime = 0L
-
- override val cpuStealTime: Long
- get() = _cpuStealTime - previousCpuStealTime
- private var _cpuStealTime = 0L
- private var previousCpuStealTime = 0L
-
- override val cpuLostTime: Long
- get() = _cpuLostTime - previousCpuLostTime
- private var _cpuLostTime = 0L
- private var previousCpuLostTime = 0L
-
- override val powerDraw: Double
- get() = _powerDraw
- private var _powerDraw = 0.0
-
- override val energyUsage: Double
- get() = _energyUsage - previousEnergyUsage
- private var _energyUsage = 0.0
- private var previousEnergyUsage = 0.0
-
- override val carbonIntensity: Double
- get() = _carbonIntensity
- private var _carbonIntensity = 0.0
-
- override val carbonEmission: Double
- get() = _carbonEmission
- private var _carbonEmission = 0.0
-
- override val uptime: Long
- get() = _uptime - previousUptime
- private var _uptime = 0L
- private var previousUptime = 0L
-
- override val downtime: Long
- get() = _downtime - previousDowntime
- private var _downtime = 0L
- private var previousDowntime = 0L
-
- override val bootTime: Instant?
- get() = _bootTime
- private var _bootTime: Instant? = null
-
- override val bootTimeAbsolute: Instant?
- get() = _bootTimeAbsolute
- private var _bootTimeAbsolute: Instant? = null
-
- /**
- * Record the next cycle.
- */
- fun record(now: Instant) {
- val hostCpuStats = _host.getCpuStats()
- val hostSysStats = _host.getSystemStats()
-
- _timestamp = now
- _timestampAbsolute = now + startTime
-
- _guestsTerminated = hostSysStats.guestsTerminated
- _guestsRunning = hostSysStats.guestsRunning
- _guestsError = hostSysStats.guestsError
- _guestsInvalid = hostSysStats.guestsInvalid
- _cpuLimit = hostCpuStats.capacity
- _cpuDemand = hostCpuStats.demand
- _cpuUsage = hostCpuStats.usage
- _cpuUtilization = hostCpuStats.utilization
- _cpuActiveTime = hostCpuStats.activeTime
- _cpuIdleTime = hostCpuStats.idleTime
- _cpuStealTime = hostCpuStats.stealTime
- _cpuLostTime = hostCpuStats.lostTime
- _powerDraw = hostSysStats.powerDraw
- _energyUsage = hostSysStats.energyUsage
- _carbonIntensity = carbonTrace.getCarbonIntensity(timestampAbsolute)
-
- _carbonEmission = carbonIntensity * (energyUsage / 3600000.0) // convert energy usage from J to kWh
- _uptime = hostSysStats.uptime.toMillis()
- _downtime = hostSysStats.downtime.toMillis()
- _bootTime = hostSysStats.bootTime
- _bootTime = hostSysStats.bootTime + startTime
- }
-
- /**
- * Finish the aggregation for this cycle.
- */
- fun reset() {
- // Reset intermediate state for next aggregation
- previousCpuActiveTime = _cpuActiveTime
- previousCpuIdleTime = _cpuIdleTime
- previousCpuStealTime = _cpuStealTime
- previousCpuLostTime = _cpuLostTime
- previousEnergyUsage = _energyUsage
- previousUptime = _uptime
- previousDowntime = _downtime
-
- _guestsTerminated = 0
- _guestsRunning = 0
- _guestsError = 0
- _guestsInvalid = 0
-
- _cpuLimit = 0.0
- _cpuUsage = 0.0
- _cpuDemand = 0.0
- _cpuUtilization = 0.0
-
- _powerDraw = 0.0
- _energyUsage = 0.0
- _carbonIntensity = 0.0
- _carbonEmission = 0.0
- }
- }
-
- /**
- * An aggregator for task metrics before they are reported.
- */
- private class TaskTableReaderImpl(
- private val service: ComputeService,
- private val task: ServiceTask,
- private val startTime: Duration = Duration.ofMillis(0),
- ) : TaskTableReader {
- override fun copy(): TaskTableReader {
- val newTaskTable =
- TaskTableReaderImpl(
- service,
- task,
- )
- newTaskTable.setValues(this)
-
- return newTaskTable
- }
-
- override fun setValues(table: TaskTableReader) {
- host = table.host
-
- _timestamp = table.timestamp
- _timestampAbsolute = table.timestampAbsolute
-
- _cpuLimit = table.cpuLimit
- _cpuActiveTime = table.cpuActiveTime
- _cpuIdleTime = table.cpuIdleTime
- _cpuStealTime = table.cpuStealTime
- _cpuLostTime = table.cpuLostTime
- _uptime = table.uptime
- _downtime = table.downtime
- _provisionTime = table.provisionTime
- _bootTime = table.bootTime
- _bootTimeAbsolute = table.bootTimeAbsolute
-
- _creationTime = table.creationTime
- _finishTime = table.finishTime
-
- _taskState = table.taskState
- }
-
- /**
- * The static information about this task.
- */
- override val taskInfo =
- TaskInfo(
- task.uid.toString(),
- task.name,
- "vm",
- "x86",
- task.flavor.coreCount,
- task.flavor.memorySize,
- )
-
- /**
- * The [HostInfo] of the host on which the task is hosted.
- */
- override var host: HostInfo? = null
- private var _host: SimHost? = null
-
- private var _timestamp = Instant.MIN
- override val timestamp: Instant
- get() = _timestamp
-
- private var _timestampAbsolute = Instant.MIN
- override val timestampAbsolute: Instant
- get() = _timestampAbsolute
-
- override val uptime: Long
- get() = _uptime - previousUptime
- private var _uptime: Long = 0
- private var previousUptime = 0L
-
- override val downtime: Long
- get() = _downtime - previousDowntime
- private var _downtime: Long = 0
- private var previousDowntime = 0L
-
- override val provisionTime: Instant?
- get() = _provisionTime
- private var _provisionTime: Instant? = null
-
- override val bootTime: Instant?
- get() = _bootTime
- private var _bootTime: Instant? = null
-
- override val creationTime: Instant?
- get() = _creationTime
- private var _creationTime: Instant? = null
-
- override val finishTime: Instant?
- get() = _finishTime
- private var _finishTime: Instant? = null
-
- override val cpuLimit: Double
- get() = _cpuLimit
- private var _cpuLimit = 0.0
-
- override val cpuActiveTime: Long
- get() = _cpuActiveTime - previousCpuActiveTime
- private var _cpuActiveTime = 0L
- private var previousCpuActiveTime = 0L
-
- override val cpuIdleTime: Long
- get() = _cpuIdleTime - previousCpuIdleTime
- private var _cpuIdleTime = 0L
- private var previousCpuIdleTime = 0L
-
- override val cpuStealTime: Long
- get() = _cpuStealTime - previousCpuStealTime
- private var _cpuStealTime = 0L
- private var previousCpuStealTime = 0L
-
- override val cpuLostTime: Long
- get() = _cpuLostTime - previousCpuLostTime
- private var _cpuLostTime = 0L
- private var previousCpuLostTime = 0L
-
- override val bootTimeAbsolute: Instant?
- get() = _bootTimeAbsolute
- private var _bootTimeAbsolute: Instant? = null
-
- override val taskState: TaskState?
- get() = _taskState
- private var _taskState: TaskState? = null
-
- /**
- * Record the next cycle.
- */
- fun record(now: Instant) {
- val newHost = service.lookupHost(task)
- if (newHost != null && newHost.getUid() != _host?.getUid()) {
- _host = newHost
- host =
- HostInfo(
- newHost.getUid().toString(),
- newHost.getName(),
- "x86",
- newHost.getModel().coreCount,
- newHost.getModel().cpuCapacity,
- newHost.getModel().memoryCapacity,
- )
- }
-
- val cpuStats = _host?.getCpuStats(task)
- val sysStats = _host?.getSystemStats(task)
-
- _timestamp = now
- _timestampAbsolute = now + startTime
-
- _cpuLimit = cpuStats?.capacity ?: 0.0
- _cpuActiveTime = cpuStats?.activeTime ?: 0
- _cpuIdleTime = cpuStats?.idleTime ?: 0
- _cpuStealTime = cpuStats?.stealTime ?: 0
- _cpuLostTime = cpuStats?.lostTime ?: 0
- _uptime = sysStats?.uptime?.toMillis() ?: 0
- _downtime = sysStats?.downtime?.toMillis() ?: 0
- _provisionTime = task.launchedAt
- _bootTime = sysStats?.bootTime
- _creationTime = task.createdAt
- _finishTime = task.finishedAt
-
- _taskState = task.state
-
- if (sysStats != null) {
- _bootTimeAbsolute = sysStats.bootTime + startTime
- } else {
- _bootTimeAbsolute = null
- }
- }
-
- /**
- * Finish the aggregation for this cycle.
- */
- fun reset() {
- previousUptime = _uptime
- previousDowntime = _downtime
- previousCpuActiveTime = _cpuActiveTime
- previousCpuIdleTime = _cpuIdleTime
- previousCpuStealTime = _cpuStealTime
- previousCpuLostTime = _cpuLostTime
-
- _host = null
- _cpuLimit = 0.0
- }
- }
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt
index 534bcc09..5e1fe2c9 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMonitor.kt
@@ -23,6 +23,7 @@
package org.opendc.compute.simulator.telemetry
import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader
import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
import org.opendc.compute.simulator.telemetry.table.TaskTableReader
@@ -43,5 +44,10 @@ public interface ComputeMonitor {
/**
* Record an entry with the specified [reader].
*/
+ public fun record(reader: PowerSourceTableReader) {}
+
+ /**
+ * Record an entry with the specified [reader].
+ */
public fun record(reader: ServiceTableReader) {}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt
index 3f220ad1..691d01c1 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ComputeExportConfig.kt
@@ -36,6 +36,7 @@ import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.jsonObject
import org.opendc.common.logger.logger
import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader
import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
import org.opendc.compute.simulator.telemetry.table.TaskTableReader
import org.opendc.trace.util.parquet.exporter.ColListSerializer
@@ -49,21 +50,25 @@ import org.opendc.trace.util.parquet.exporter.columnSerializer
*
* @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file.
* @param[taskExportColumns] the columns that will be included in the `task.parquet` raw output file.
+ * @param[powerSourceExportColumns] the columns that will be included in the `power.parquet` raw output file.
* @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file.
*/
@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class)
public data class ComputeExportConfig(
public val hostExportColumns: Set<ExportColumn<HostTableReader>>,
public val taskExportColumns: Set<ExportColumn<TaskTableReader>>,
+ public val powerSourceExportColumns: Set<ExportColumn<PowerSourceTableReader>>,
public val serviceExportColumns: Set<ExportColumn<ServiceTableReader>>,
) {
public constructor(
hostExportColumns: Collection<ExportColumn<HostTableReader>>,
taskExportColumns: Collection<ExportColumn<TaskTableReader>>,
+ powerSourceExportColumns: Collection<ExportColumn<PowerSourceTableReader>>,
serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>,
) : this(
hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS,
taskExportColumns.toSet() + DfltTaskExportColumns.BASE_EXPORT_COLUMNS,
+ powerSourceExportColumns.toSet() + DfltPowerSourceExportColumns.BASE_EXPORT_COLUMNS,
serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS,
)
@@ -75,6 +80,7 @@ public data class ComputeExportConfig(
| === Compute Export Config ===
| Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')}
| Task columns : ${taskExportColumns.map { it.name }.toString().trim('[', ']')}
+ | Power Source columns : ${powerSourceExportColumns.map { it.name }.toString().trim('[', ']')}
| Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')}
""".trimIndent()
@@ -88,19 +94,21 @@ public data class ComputeExportConfig(
public fun loadDfltColumns() {
DfltHostExportColumns
DfltTaskExportColumns
+ DfltPowerSourceExportColumns
DfltServiceExportColumns
}
/**
- * Config that includes all columns defined in [DfltHostExportColumns],
- * [DfltTaskExportColumns], [DfltServiceExportColumns] among all other loaded
+ * Config that includes all columns defined in [DfltHostExportColumns], [DfltTaskExportColumns],
+ * [DfltPowerSourceExportColumns], [DfltServiceExportColumns] among all other loaded
* columns for [HostTableReader], [TaskTableReader] and [ServiceTableReader].
*/
public val ALL_COLUMNS: ComputeExportConfig by lazy {
- ComputeExportConfig.Companion.loadDfltColumns()
+ loadDfltColumns()
ComputeExportConfig(
hostExportColumns = ExportColumn.getAllLoadedColumns(),
taskExportColumns = ExportColumn.getAllLoadedColumns(),
+ powerSourceExportColumns = ExportColumn.getAllLoadedColumns(),
serviceExportColumns = ExportColumn.getAllLoadedColumns(),
)
}
@@ -122,6 +130,10 @@ public data class ComputeExportConfig(
ListSerializer(columnSerializer<TaskTableReader>()).descriptor,
)
element(
+ "powerSourceExportColumns",
+ ListSerializer(columnSerializer<ServiceTableReader>()).descriptor,
+ )
+ element(
"serviceExportColumns",
ListSerializer(columnSerializer<ServiceTableReader>()).descriptor,
)
@@ -135,16 +147,18 @@ public data class ComputeExportConfig(
}
// Loads the default columns so that they are available for deserialization.
- ComputeExportConfig.Companion.loadDfltColumns()
+ loadDfltColumns()
val elem = jsonDec.decodeJsonElement().jsonObject
val hostFields: List<ExportColumn<HostTableReader>> = elem["hostExportColumns"].toFieldList()
val taskFields: List<ExportColumn<TaskTableReader>> = elem["taskExportColumns"].toFieldList()
+ val powerSourceFields: List<ExportColumn<PowerSourceTableReader>> = elem["powerSourceExportColumns"].toFieldList()
val serviceFields: List<ExportColumn<ServiceTableReader>> = elem["serviceExportColumns"].toFieldList()
return ComputeExportConfig(
hostExportColumns = hostFields,
taskExportColumns = taskFields,
+ powerSourceExportColumns = powerSourceFields,
serviceExportColumns = serviceFields,
)
}
@@ -153,22 +167,28 @@ public data class ComputeExportConfig(
encoder: Encoder,
value: ComputeExportConfig,
) {
- encoder.encodeStructure(ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor) {
+ encoder.encodeStructure(descriptor) {
encodeSerializableElement(
- ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor,
+ descriptor,
0,
ColListSerializer(columnSerializer<HostTableReader>()),
value.hostExportColumns.toList(),
)
encodeSerializableElement(
- ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor,
+ descriptor,
1,
ColListSerializer(columnSerializer<TaskTableReader>()),
value.taskExportColumns.toList(),
)
encodeSerializableElement(
- ComputeExportConfig.Companion.ComputeExportConfigSerializer.descriptor,
+ descriptor,
2,
+ ColListSerializer(columnSerializer<PowerSourceTableReader>()),
+ value.powerSourceExportColumns.toList(),
+ )
+ encodeSerializableElement(
+ descriptor,
+ 3,
ColListSerializer(columnSerializer<ServiceTableReader>()),
value.serviceExportColumns.toList(),
)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltPowerSourceExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltPowerSourceExportColumns.kt
new file mode 100644
index 00000000..95db55fe
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltPowerSourceExportColumns.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.parquet
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable.
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltHostExportColumns
+ * ...
+ * ```
+ */
+public object DfltPowerSourceExportColumns {
+ public val TIMESTAMP: ExportColumn<PowerSourceTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<PowerSourceTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val CPU_COUNT: ExportColumn<PowerSourceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("hosts_connected"),
+ ) { it.hostsConnected }
+
+ public val POWER_DRAW: ExportColumn<PowerSourceTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("power_draw"),
+ ) { it.powerDraw }
+
+ public val ENERGY_USAGE: ExportColumn<PowerSourceTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("energy_usage"),
+ ) { it.energyUsage }
+
+ public val CARBON_INTENSITY: ExportColumn<PowerSourceTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("carbon_intensity"),
+ ) { it.carbonIntensity }
+
+ public val CARBON_EMISSION: ExportColumn<PowerSourceTableReader> =
+ ExportColumn(
+ field = Types.required(FLOAT).named("carbon_emission"),
+ ) { it.carbonEmission }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt
index 4cd920c4..b3150018 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt
@@ -24,6 +24,7 @@ package org.opendc.compute.simulator.telemetry.parquet
import org.opendc.compute.simulator.telemetry.ComputeMonitor
import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader
import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
import org.opendc.compute.simulator.telemetry.table.TaskTableReader
import org.opendc.trace.util.parquet.exporter.ExportColumn
@@ -37,6 +38,7 @@ import java.io.File
public class ParquetComputeMonitor(
private val hostExporter: Exporter<HostTableReader>,
private val taskExporter: Exporter<TaskTableReader>,
+ private val powerSourceExporter: Exporter<PowerSourceTableReader>,
private val serviceExporter: Exporter<ServiceTableReader>,
) : ComputeMonitor, AutoCloseable {
override fun record(reader: HostTableReader) {
@@ -47,6 +49,10 @@ public class ParquetComputeMonitor(
taskExporter.write(reader)
}
+ override fun record(reader: PowerSourceTableReader) {
+ powerSourceExporter.write(reader)
+ }
+
override fun record(reader: ServiceTableReader) {
serviceExporter.write(reader)
}
@@ -54,6 +60,7 @@ public class ParquetComputeMonitor(
override fun close() {
hostExporter.close()
taskExporter.close()
+ powerSourceExporter.close()
serviceExporter.close()
}
@@ -77,12 +84,13 @@ public class ParquetComputeMonitor(
bufferSize = bufferSize,
hostExportColumns = computeExportConfig.hostExportColumns,
taskExportColumns = computeExportConfig.taskExportColumns,
+ powerSourceExportColumns = computeExportConfig.powerSourceExportColumns,
serviceExportColumns = computeExportConfig.serviceExportColumns,
)
/**
* Constructor that loads default [ExportColumn]s defined in
- * [DfltHostExportColumns], [DfltTaskExportColumns], [DfltServiceExportColumns]
+ * [DfltHostExportColumns], [DfltTaskExportColumns], [DfltPowerSourceExportColumns], [DfltServiceExportColumns]
* in case optional parameters are omitted and all fields need to be retrieved.
*
* @param[base] parent pathname for output file.
@@ -95,6 +103,7 @@ public class ParquetComputeMonitor(
bufferSize: Int,
hostExportColumns: Collection<ExportColumn<HostTableReader>>? = null,
taskExportColumns: Collection<ExportColumn<TaskTableReader>>? = null,
+ powerSourceExportColumns: Collection<ExportColumn<PowerSourceTableReader>>? = null,
serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>? = null,
): ParquetComputeMonitor {
// Loads the fields in case they need to be retrieved if optional params are omitted.
@@ -113,6 +122,12 @@ public class ParquetComputeMonitor(
columns = taskExportColumns ?: Exportable.getAllLoadedColumns(),
bufferSize = bufferSize,
),
+ powerSourceExporter =
+ Exporter(
+ outputFile = File(base, "$partition/powerSource.parquet").also { it.parentFile.mkdirs() },
+ columns = powerSourceExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
serviceExporter =
Exporter(
outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() },
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt
index a5a862ce..cf8d3c8c 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReader.kt
@@ -33,6 +33,10 @@ public interface HostTableReader : Exportable {
public fun setValues(table: HostTableReader)
+ public fun record(now: Instant)
+
+ public fun reset()
+
/**
* The [HostInfo] of the host to which the row belongs to.
*/
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReaderImpl.kt
new file mode 100644
index 00000000..ab8c0036
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/HostTableReaderImpl.kt
@@ -0,0 +1,240 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+import org.opendc.compute.carbon.CarbonTrace
+import org.opendc.compute.simulator.host.SimHost
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * An aggregator for host metrics before they are reported.
+ */
+public class HostTableReaderImpl(
+ host: SimHost,
+ private val startTime: Duration = Duration.ofMillis(0),
+ private val carbonTrace: CarbonTrace = CarbonTrace(null),
+) : HostTableReader {
+ override fun copy(): HostTableReader {
+ val newHostTable =
+ HostTableReaderImpl(_host)
+ newHostTable.setValues(this)
+
+ return newHostTable
+ }
+
+ override fun setValues(table: HostTableReader) {
+ _timestamp = table.timestamp
+ _timestampAbsolute = table.timestampAbsolute
+
+ _guestsTerminated = table.guestsTerminated
+ _guestsRunning = table.guestsRunning
+ _guestsError = table.guestsError
+ _guestsInvalid = table.guestsInvalid
+ _cpuLimit = table.cpuLimit
+ _cpuDemand = table.cpuDemand
+ _cpuUsage = table.cpuUsage
+ _cpuUtilization = table.cpuUtilization
+ _cpuActiveTime = table.cpuActiveTime
+ _cpuIdleTime = table.cpuIdleTime
+ _cpuStealTime = table.cpuStealTime
+ _cpuLostTime = table.cpuLostTime
+ _powerDraw = table.powerDraw
+ _energyUsage = table.energyUsage
+ _carbonIntensity = table.carbonIntensity
+ _carbonEmission = table.carbonEmission
+ _uptime = table.uptime
+ _downtime = table.downtime
+ _bootTime = table.bootTime
+ _bootTimeAbsolute = table.bootTimeAbsolute
+ }
+
+ private val _host = host
+
+ override val host: HostInfo =
+ HostInfo(
+ host.getUid().toString(),
+ host.getName(),
+ "x86",
+ host.getModel().coreCount,
+ host.getModel().cpuCapacity,
+ host.getModel().memoryCapacity,
+ )
+
+ override val timestamp: Instant
+ get() = _timestamp
+ private var _timestamp = Instant.MIN
+
+ override val timestampAbsolute: Instant
+ get() = _timestampAbsolute
+ private var _timestampAbsolute = Instant.MIN
+
+ override val guestsTerminated: Int
+ get() = _guestsTerminated
+ private var _guestsTerminated = 0
+
+ override val guestsRunning: Int
+ get() = _guestsRunning
+ private var _guestsRunning = 0
+
+ override val guestsError: Int
+ get() = _guestsError
+ private var _guestsError = 0
+
+ override val guestsInvalid: Int
+ get() = _guestsInvalid
+ private var _guestsInvalid = 0
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuUsage: Double
+ get() = _cpuUsage
+ private var _cpuUsage = 0.0
+
+ override val cpuDemand: Double
+ get() = _cpuDemand
+ private var _cpuDemand = 0.0
+
+ override val cpuUtilization: Double
+ get() = _cpuUtilization
+ private var _cpuUtilization = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ override val powerDraw: Double
+ get() = _powerDraw
+ private var _powerDraw = 0.0
+
+ override val energyUsage: Double
+ get() = _energyUsage - previousEnergyUsage
+ private var _energyUsage = 0.0
+ private var previousEnergyUsage = 0.0
+
+ override val carbonIntensity: Double
+ get() = _carbonIntensity
+ private var _carbonIntensity = 0.0
+
+ override val carbonEmission: Double
+ get() = _carbonEmission
+ private var _carbonEmission = 0.0
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime = 0L
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime = 0L
+ private var previousDowntime = 0L
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ override val bootTimeAbsolute: Instant?
+ get() = _bootTimeAbsolute
+ private var _bootTimeAbsolute: Instant? = null
+
+ /**
+ * Record the next cycle.
+ */
+ override fun record(now: Instant) {
+ val hostCpuStats = _host.getCpuStats()
+ val hostSysStats = _host.getSystemStats()
+
+ _timestamp = now
+ _timestampAbsolute = now + startTime
+
+ _guestsTerminated = hostSysStats.guestsTerminated
+ _guestsRunning = hostSysStats.guestsRunning
+ _guestsError = hostSysStats.guestsError
+ _guestsInvalid = hostSysStats.guestsInvalid
+ _cpuLimit = hostCpuStats.capacity
+ _cpuDemand = hostCpuStats.demand
+ _cpuUsage = hostCpuStats.usage
+ _cpuUtilization = hostCpuStats.utilization
+ _cpuActiveTime = hostCpuStats.activeTime
+ _cpuIdleTime = hostCpuStats.idleTime
+ _cpuStealTime = hostCpuStats.stealTime
+ _cpuLostTime = hostCpuStats.lostTime
+ _powerDraw = hostSysStats.powerDraw
+ _energyUsage = hostSysStats.energyUsage
+ _carbonIntensity = carbonTrace.getCarbonIntensity(timestampAbsolute)
+
+ _carbonEmission = carbonIntensity * (energyUsage / 3600000.0) // convert energy usage from J to kWh
+ _uptime = hostSysStats.uptime.toMillis()
+ _downtime = hostSysStats.downtime.toMillis()
+ _bootTime = hostSysStats.bootTime
+ _bootTime = hostSysStats.bootTime + startTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ override fun reset() {
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+ previousEnergyUsage = _energyUsage
+ previousUptime = _uptime
+ previousDowntime = _downtime
+
+ _guestsTerminated = 0
+ _guestsRunning = 0
+ _guestsError = 0
+ _guestsInvalid = 0
+
+ _cpuLimit = 0.0
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ _cpuUtilization = 0.0
+
+ _powerDraw = 0.0
+ _energyUsage = 0.0
+ _carbonIntensity = 0.0
+ _carbonEmission = 0.0
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReader.kt
new file mode 100644
index 00000000..cd2b2d2c
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReader.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+import org.opendc.trace.util.parquet.exporter.Exportable
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a host trace entry.
+ */
+public interface PowerSourceTableReader : Exportable {
+ public fun copy(): PowerSourceTableReader
+
+ public fun setValues(table: PowerSourceTableReader)
+
+ public fun record(now: Instant)
+
+ public fun reset()
+
+ /**
+ * The timestamp of the current entry of the reader relative to the start of the workload.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestampAbsolute: Instant
+
+ /**
+ * The number of connected hosts
+ */
+ public val hostsConnected: Int
+
+ /**
+ * The current power draw of the host in W.
+ */
+ public val powerDraw: Double
+
+ /**
+ * The total energy consumption of the host since last sample in J.
+ */
+ public val energyUsage: Double
+
+ /**
+ * The current carbon intensity of the host in gCO2 / kW.
+ */
+ public val carbonIntensity: Double
+
+ /**
+ * The current carbon emission since the last deadline in g.
+ */
+ public val carbonEmission: Double
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReaderImpl.kt
new file mode 100644
index 00000000..91918ea8
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/PowerSourceTableReaderImpl.kt
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+import org.opendc.compute.carbon.CarbonTrace
+import org.opendc.simulator.compute.power.SimPowerSource
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * An aggregator for task metrics before they are reported.
+ */
+public class PowerSourceTableReaderImpl(
+ powerSource: SimPowerSource,
+ private val startTime: Duration = Duration.ofMillis(0),
+ private val carbonTrace: CarbonTrace = CarbonTrace(null),
+) : PowerSourceTableReader {
+ override fun copy(): PowerSourceTableReader {
+ val newPowerSourceTable =
+ PowerSourceTableReaderImpl(
+ powerSource,
+ )
+ newPowerSourceTable.setValues(this)
+
+ return newPowerSourceTable
+ }
+
+ override fun setValues(table: PowerSourceTableReader) {
+ _timestamp = table.timestamp
+ _timestampAbsolute = table.timestampAbsolute
+
+ _hostsConnected = table.hostsConnected
+ _powerDraw = table.powerDraw
+ _energyUsage = table.energyUsage
+ _carbonIntensity = table.carbonIntensity
+ _carbonEmission = table.carbonEmission
+ }
+
+ private val powerSource = powerSource
+
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ private var _timestampAbsolute = Instant.MIN
+ override val timestampAbsolute: Instant
+ get() = _timestampAbsolute
+
+ override val hostsConnected: Int
+ get() = _hostsConnected
+ private var _hostsConnected: Int = 0
+
+ override val powerDraw: Double
+ get() = _powerDraw
+ private var _powerDraw = 0.0
+
+ override val energyUsage: Double
+ get() = _energyUsage - previousEnergyUsage
+ private var _energyUsage = 0.0
+ private var previousEnergyUsage = 0.0
+
+ override val carbonIntensity: Double
+ get() = _carbonIntensity
+ private var _carbonIntensity = 0.0
+
+ override val carbonEmission: Double
+ get() = _carbonEmission
+ private var _carbonEmission = 0.0
+
+ /**
+ * Record the next cycle.
+ */
+ override fun record(now: Instant) {
+ _timestamp = now
+ _timestampAbsolute = now + startTime
+
+ _hostsConnected = 0
+ _powerDraw = powerSource.powerDraw
+ _energyUsage = powerSource.energyUsage
+ _carbonIntensity = 0.0
+ _carbonEmission = carbonIntensity * (energyUsage / 3600000.0)
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ override fun reset() {
+ previousEnergyUsage = _energyUsage
+
+ _hostsConnected = 0
+ _powerDraw = 0.0
+ _energyUsage = 0.0
+ _carbonIntensity = 0.0
+ _carbonEmission = 0.0
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt
index 690dfe0a..c8cc765a 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReader.kt
@@ -33,6 +33,8 @@ public interface ServiceTableReader : Exportable {
public fun setValues(table: ServiceTableReader)
+ public fun record(now: Instant)
+
/**
* The timestamp of the current entry of the reader.
*/
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReaderImpl.kt
new file mode 100644
index 00000000..52a25021
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/ServiceTableReaderImpl.kt
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+import org.opendc.compute.simulator.service.ComputeService
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * An aggregator for service metrics before they are reported.
+ */
+public class ServiceTableReaderImpl(
+ private val service: ComputeService,
+ private val startTime: Duration = Duration.ofMillis(0),
+) : ServiceTableReader {
+ override fun copy(): ServiceTableReader {
+ val newServiceTable =
+ ServiceTableReaderImpl(
+ service,
+ )
+ newServiceTable.setValues(this)
+
+ return newServiceTable
+ }
+
+ override fun setValues(table: ServiceTableReader) {
+ _timestamp = table.timestamp
+ _timestampAbsolute = table.timestampAbsolute
+
+ _hostsUp = table.hostsUp
+ _hostsDown = table.hostsDown
+ _tasksTotal = table.tasksTotal
+ _tasksPending = table.tasksPending
+ _tasksActive = table.tasksActive
+ _tasksCompleted = table.tasksCompleted
+ _tasksTerminated = table.tasksTerminated
+ _attemptsSuccess = table.attemptsSuccess
+ _attemptsFailure = table.attemptsFailure
+ }
+
+ private var _timestamp: Instant = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ private var _timestampAbsolute: Instant = Instant.MIN
+ override val timestampAbsolute: Instant
+ get() = _timestampAbsolute
+
+ override val hostsUp: Int
+ get() = _hostsUp
+ private var _hostsUp = 0
+
+ override val hostsDown: Int
+ get() = _hostsDown
+ private var _hostsDown = 0
+
+ override val tasksTotal: Int
+ get() = _tasksTotal
+ private var _tasksTotal = 0
+
+ override val tasksPending: Int
+ get() = _tasksPending
+ private var _tasksPending = 0
+
+ override val tasksCompleted: Int
+ get() = _tasksCompleted
+ private var _tasksCompleted = 0
+
+ override val tasksActive: Int
+ get() = _tasksActive
+ private var _tasksActive = 0
+
+ override val tasksTerminated: Int
+ get() = _tasksTerminated
+ private var _tasksTerminated = 0
+
+ override val attemptsSuccess: Int
+ get() = _attemptsSuccess
+ private var _attemptsSuccess = 0
+
+ override val attemptsFailure: Int
+ get() = _attemptsFailure
+ private var _attemptsFailure = 0
+
+ /**
+ * Record the next cycle.
+ */
+ override fun record(now: Instant) {
+ _timestamp = now
+ _timestampAbsolute = now + startTime
+
+ val stats = service.getSchedulerStats()
+ _hostsUp = stats.hostsAvailable
+ _hostsDown = stats.hostsUnavailable
+ _tasksTotal = stats.tasksTotal
+ _tasksPending = stats.tasksPending
+ _tasksCompleted = stats.tasksCompleted
+ _tasksActive = stats.tasksActive
+ _tasksTerminated = stats.tasksTerminated
+ _attemptsSuccess = stats.attemptsSuccess.toInt()
+ _attemptsFailure = stats.attemptsFailure.toInt()
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt
index 825019e8..50ffa5fc 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt
@@ -35,6 +35,10 @@ public interface TaskTableReader : Exportable {
public fun setValues(table: TaskTableReader)
+ public fun record(now: Instant)
+
+ public fun reset()
+
/**
* The timestamp of the current entry of the reader relative to the start of the workload.
*/
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt
new file mode 100644
index 00000000..5a0897f7
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry.table
+
+import org.opendc.compute.api.TaskState
+import org.opendc.compute.simulator.host.SimHost
+import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.compute.simulator.service.ServiceTask
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * An aggregator for task metrics before they are reported.
+ */
+public class TaskTableReaderImpl(
+ private val service: ComputeService,
+ private val task: ServiceTask,
+ private val startTime: Duration = Duration.ofMillis(0),
+) : TaskTableReader {
+ override fun copy(): TaskTableReader {
+ val newTaskTable =
+ TaskTableReaderImpl(
+ service,
+ task,
+ )
+ newTaskTable.setValues(this)
+
+ return newTaskTable
+ }
+
+ override fun setValues(table: TaskTableReader) {
+ host = table.host
+
+ _timestamp = table.timestamp
+ _timestampAbsolute = table.timestampAbsolute
+
+ _cpuLimit = table.cpuLimit
+ _cpuActiveTime = table.cpuActiveTime
+ _cpuIdleTime = table.cpuIdleTime
+ _cpuStealTime = table.cpuStealTime
+ _cpuLostTime = table.cpuLostTime
+ _uptime = table.uptime
+ _downtime = table.downtime
+ _provisionTime = table.provisionTime
+ _bootTime = table.bootTime
+ _bootTimeAbsolute = table.bootTimeAbsolute
+
+ _creationTime = table.creationTime
+ _finishTime = table.finishTime
+
+ _taskState = table.taskState
+ }
+
+ /**
+ * The static information about this task.
+ */
+ override val taskInfo: TaskInfo =
+ TaskInfo(
+ task.uid.toString(),
+ task.name,
+ "vm",
+ "x86",
+ task.flavor.coreCount,
+ task.flavor.memorySize,
+ )
+
+ /**
+ * The [HostInfo] of the host on which the task is hosted.
+ */
+ override var host: HostInfo? = null
+ private var _host: SimHost? = null
+
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ private var _timestampAbsolute = Instant.MIN
+ override val timestampAbsolute: Instant
+ get() = _timestampAbsolute
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime: Long = 0
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime: Long = 0
+ private var previousDowntime = 0L
+
+ override val provisionTime: Instant?
+ get() = _provisionTime
+ private var _provisionTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ override val creationTime: Instant?
+ get() = _creationTime
+ private var _creationTime: Instant? = null
+
+ override val finishTime: Instant?
+ get() = _finishTime
+ private var _finishTime: Instant? = null
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ override val bootTimeAbsolute: Instant?
+ get() = _bootTimeAbsolute
+ private var _bootTimeAbsolute: Instant? = null
+
+ override val taskState: TaskState?
+ get() = _taskState
+ private var _taskState: TaskState? = null
+
+ /**
+ * Record the next cycle.
+ */
+ override fun record(now: Instant) {
+ val newHost = service.lookupHost(task)
+ if (newHost != null && newHost.getUid() != _host?.getUid()) {
+ _host = newHost
+ host =
+ HostInfo(
+ newHost.getUid().toString(),
+ newHost.getName(),
+ "x86",
+ newHost.getModel().coreCount,
+ newHost.getModel().cpuCapacity,
+ newHost.getModel().memoryCapacity,
+ )
+ }
+
+ val cpuStats = _host?.getCpuStats(task)
+ val sysStats = _host?.getSystemStats(task)
+
+ _timestamp = now
+ _timestampAbsolute = now + startTime
+
+ _cpuLimit = cpuStats?.capacity ?: 0.0
+ _cpuActiveTime = cpuStats?.activeTime ?: 0
+ _cpuIdleTime = cpuStats?.idleTime ?: 0
+ _cpuStealTime = cpuStats?.stealTime ?: 0
+ _cpuLostTime = cpuStats?.lostTime ?: 0
+ _uptime = sysStats?.uptime?.toMillis() ?: 0
+ _downtime = sysStats?.downtime?.toMillis() ?: 0
+ _provisionTime = task.launchedAt
+ _bootTime = sysStats?.bootTime
+ _creationTime = task.createdAt
+ _finishTime = task.finishedAt
+
+ _taskState = task.state
+
+ if (sysStats != null) {
+ _bootTimeAbsolute = sysStats.bootTime + startTime
+ } else {
+ _bootTimeAbsolute = null
+ }
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ override fun reset() {
+ previousUptime = _uptime
+ previousDowntime = _downtime
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+
+ _host = null
+ _cpuLimit = 0.0
+ }
+}