From d652fa2fa76556edd81d3b8087a0c943d462ec49 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Wed, 17 Apr 2024 18:17:17 +0200 Subject: Added support for carbon traces (#218) * Started with the carbon trace implementation * Moved the carbon trace system to the proper folders --- .../opendc-compute-carbon/build.gradle.kts | 37 ++++++ .../org/opendc/compute/carbon/CarbonTrace.kt | 113 +++++++++++++++++ .../org/opendc/compute/carbon/CarbonTraceLoader.kt | 136 ++++++++++++++++++++ .../org/opendc/compute/carbon/CarbonTraceReader.kt | 52 ++++++++ .../src/test/resources/log4j2.xml | 38 ++++++ .../opendc-compute-simulator/build.gradle.kts | 1 + .../provisioner/ComputeMonitorProvisioningStep.kt | 4 +- .../compute/simulator/provisioner/ComputeSteps.kt | 4 +- .../opendc-compute-telemetry/build.gradle.kts | 1 + .../compute/telemetry/ComputeMetricReader.kt | 21 +++- .../export/parquet/ParquetHostDataWriter.kt | 27 ++-- .../compute/telemetry/table/HostTableReader.kt | 10 ++ .../opendc-experiments-base/build.gradle.kts | 1 + .../experiments/base/models/scenario/Scenario.kt | 1 + .../base/models/scenario/ScenarioFactories.kt | 1 + .../base/models/scenario/ScenarioSpecs.kt | 2 + .../experiments/base/runner/ScenarioRunner.kt | 4 + opendc-trace/opendc-trace-api/build.gradle.kts | 1 + .../opendc/trace/conv/CarbonIntensityColumns.kt | 35 ++++++ .../org/opendc/trace/conv/ResourceColumns.kt | 6 + .../main/kotlin/org/opendc/trace/conv/Tables.kt | 2 + .../trace/formats/carbon/CarbonTableReader.kt | 140 +++++++++++++++++++++ .../trace/formats/carbon/CarbonTraceFormat.kt | 86 +++++++++++++ .../carbon/parquet/CarbonIntensityFragment.kt | 33 +++++ .../carbon/parquet/CarbonIntensityReadSupport.kt | 95 ++++++++++++++ .../parquet/CarbonIntensityRecordMaterializer.kt | 86 +++++++++++++ .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 14 +-- opendc-web/opendc-web-runner/build.gradle.kts | 1 + settings.gradle.kts | 1 + site/docs/documentation/Output.md | 105 ++++++++-------- 30 files changed, 987 insertions(+), 71 deletions(-) create mode 100644 opendc-compute/opendc-compute-carbon/build.gradle.kts create mode 100644 opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTrace.kt create mode 100644 opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTraceLoader.kt create mode 100644 opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTraceReader.kt create mode 100644 opendc-compute/opendc-compute-carbon/src/test/resources/log4j2.xml create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt diff --git a/opendc-compute/opendc-compute-carbon/build.gradle.kts b/opendc-compute/opendc-compute-carbon/build.gradle.kts new file mode 100644 index 00000000..58b7bc86 --- /dev/null +++ b/opendc-compute/opendc-compute-carbon/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "OpenDC Compute Service implementation" + +// Build configuration +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(projects.opendcCompute.opendcComputeApi) + implementation(projects.opendcCommon) + implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-api"))) + implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-compute"))) + + implementation(libs.kotlin.logging) +} diff --git a/opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTrace.kt b/opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTrace.kt new file mode 100644 index 00000000..2ba3e4e3 --- /dev/null +++ b/opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTrace.kt @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.carbon + +import java.time.Instant + +/** + * A virtual machine workload. + * + * @param uid The unique identifier of the virtual machine. + * @param name The name of the virtual machine. + * @param cpuCapacity The required CPU capacity for the VM in MHz. + * @param cpuCount The number of vCPUs in the VM. + * @param memCapacity The provisioned memory for the VM in MB. + * @param startTime The start time of the VM. + * @param stopTime The stop time of the VM. + * @param trace The trace that belong to this VM. + * @param interferenceProfile The interference profile of this virtual machine. + */ +public data class CarbonFragment( + var startTime: Long, + var endTime: Long, + var carbonIntensity: Double, +) { + init { + require(endTime > startTime) { + "The end time of a report should be higher than the start time -> start time: $startTime, end time: $endTime" + } + require(carbonIntensity >= 0.0) { "carbon intensity cannot be negative" } + } +} + +public class CarbonTrace(reports: List? = null) { + private var index: Int = 0 + private val numberOfReports = reports?.size + private val reports = reports?.sortedBy { it.startTime } + + private fun hasPreviousReport(): Boolean { + return index > 0 + } + + private fun hasNextReport(): Boolean { + if (numberOfReports == null) { + return false + } + + return index < numberOfReports + } + + public fun getCarbonIntensity(timestamp: Instant): Double { + return getCarbonIntensity(timestamp.toEpochMilli()) + } + + /** + * Get the carbon intensity of the energy at a given timestamp + * Returns the carbon intensity of the first or last [CarbonFragment] + * if the given timestamp is outside the information + * + * @param timestamp + * @return The carbon intensity at the given timestamp in gCO2/kWh + */ + public fun getCarbonIntensity(timestamp: Long): Double { + if (reports == null) { + return 0.0 + } + + var currentFragment: CarbonFragment + + while (true) { + currentFragment = reports[index] + + if (currentFragment.startTime > timestamp) { + if (hasPreviousReport()) { + index-- + continue + } + break + } + + if (currentFragment.endTime <= timestamp) { + if (hasNextReport()) { + index++ + continue + } + break + } + + break + } + + return currentFragment.carbonIntensity + } +} diff --git a/opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTraceLoader.kt b/opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTraceLoader.kt new file mode 100644 index 00000000..685a1fb3 --- /dev/null +++ b/opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTraceLoader.kt @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.carbon + +import mu.KotlinLogging +import org.opendc.trace.Trace +import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP +import org.opendc.trace.conv.CARBON_INTENSITY_VALUE +import org.opendc.trace.conv.TABLE_CARBON_INTENSITY +import java.io.File +import java.lang.ref.SoftReference +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap + +/** + * A helper class for loading compute workload traces into memory. + * + * @param baseDir The directory containing the traces. + */ +public class CarbonTraceLoader { + /** + * The logger for this instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * The cache of workloads. + */ + private val cache = ConcurrentHashMap>>() + + private val builder = CarbonFragmentBuilder() + + /** + * Read the metadata into a workload. + */ + private fun parseCarbon(trace: Trace): List { + val reader = checkNotNull(trace.getTable(TABLE_CARBON_INTENSITY)).newReader() + + val startTimeCol = reader.resolve(CARBON_INTENSITY_TIMESTAMP) + val carbonIntensityCol = reader.resolve(CARBON_INTENSITY_VALUE) + + val entries = mutableListOf() + + try { + while (reader.nextRow()) { + val startTime = reader.getInstant(startTimeCol)!! + val carbonIntensity = reader.getDouble(carbonIntensityCol) + + builder.add(startTime, carbonIntensity) + } + + // Make sure the virtual machines are ordered by start time + builder.fixReportTimes() + + return builder.fragments + } catch (e: Exception) { + e.printStackTrace() + throw e + } finally { + reader.close() + } + } + + /** + * Load the trace with the specified [name] and [format]. + */ + public fun get(pathToFile: File): List { + val trace = Trace.open(pathToFile, "carbon") + + return parseCarbon(trace) + } + + /** + * Clear the workload cache. + */ + public fun reset() { + cache.clear() + } + + /** + * A builder for a VM trace. + */ + private class CarbonFragmentBuilder { + /** + * The total load of the trace. + */ + public val fragments: MutableList = mutableListOf() + + /** + * Add a fragment to the trace. + * + * @param startTime Timestamp at which the fragment starts (in epoch millis). + * @param carbonIntensity The carbon intensity during this fragment + */ + fun add( + startTime: Instant, + carbonIntensity: Double, + ) { + fragments.add( + CarbonFragment(startTime.toEpochMilli(), Long.MAX_VALUE, carbonIntensity), + ) + } + + fun fixReportTimes() { + fragments.sortBy { it.startTime } + + // For each report, set the end time to the start time of the next report + for (i in 0..fragments.size - 2) { + fragments[i].endTime = fragments[i + 1].startTime + } + + // Set the start time of each report to the minimum value + fragments[0].startTime = Long.MIN_VALUE + } + } +} diff --git a/opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTraceReader.kt b/opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTraceReader.kt new file mode 100644 index 00000000..3e0269f8 --- /dev/null +++ b/opendc-compute/opendc-compute-carbon/src/main/kotlin/org/opendc/compute/carbon/CarbonTraceReader.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeWorkloads") + +package org.opendc.compute.carbon + +import java.io.File +import javax.management.InvalidAttributeValueException + +/** + * Construct a workload from a trace. + */ +public fun getCarbonTrace(pathToFile: String?): CarbonTrace { + if (pathToFile == null) { + return CarbonTrace(null) + } + + return getCarbonTrace(File(pathToFile)) +} + +/** + * Construct a workload from a trace. + */ +public fun getCarbonTrace(file: File): CarbonTrace { + if (!file.exists()) { + throw InvalidAttributeValueException("The carbon trace cannot be found") + } + + val fragments = CarbonTraceLoader().get(file) + + return CarbonTrace(fragments) +} diff --git a/opendc-compute/opendc-compute-carbon/src/test/resources/log4j2.xml b/opendc-compute/opendc-compute-carbon/src/test/resources/log4j2.xml new file mode 100644 index 00000000..0dfb75f2 --- /dev/null +++ b/opendc-compute/opendc-compute-carbon/src/test/resources/log4j2.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index 9692f6ba..0cddd296 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -37,6 +37,7 @@ dependencies { api(libs.microprofile.config) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-topology"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-telemetry"))) + implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon"))) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.slf4j.simple) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt index 09bc375d..f1123742 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt @@ -22,6 +22,7 @@ package org.opendc.compute.simulator.provisioner +import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.service.ComputeService import org.opendc.compute.telemetry.ComputeMetricReader import org.opendc.compute.telemetry.ComputeMonitor @@ -36,13 +37,14 @@ public class ComputeMonitorProvisioningStep( private val monitor: ComputeMonitor, private val exportInterval: Duration, private val startTime: Duration = Duration.ofMillis(0), + private val carbonTrace: CarbonTrace = CarbonTrace(null), ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { val service = requireNotNull( ctx.registry.resolve(serviceDomain, ComputeService::class.java), ) { "Compute service $serviceDomain does not exist" } - val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval, startTime) + val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval, startTime, carbonTrace) return AutoCloseable { metricReader.close() } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt index 8597b6f4..7a6b6927 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt @@ -24,6 +24,7 @@ package org.opendc.compute.simulator.provisioner +import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.telemetry.ComputeMonitor @@ -58,8 +59,9 @@ public fun registerComputeMonitor( monitor: ComputeMonitor, exportInterval: Duration = Duration.ofMinutes(5), startTime: Duration = Duration.ofMillis(0), + carbonTrace: CarbonTrace = CarbonTrace(null), ): ProvisioningStep { - return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval, startTime) + return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval, startTime, carbonTrace) } /** diff --git a/opendc-compute/opendc-compute-telemetry/build.gradle.kts b/opendc-compute/opendc-compute-telemetry/build.gradle.kts index f7af3877..10f7c610 100644 --- a/opendc-compute/opendc-compute-telemetry/build.gradle.kts +++ b/opendc-compute/opendc-compute-telemetry/build.gradle.kts @@ -33,6 +33,7 @@ dependencies { implementation(libs.kotlin.logging) implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service"))) + implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon"))) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.core) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt index 91f7cecf..46759ed1 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt @@ -30,6 +30,7 @@ import mu.KotlinLogging import org.opendc.common.Dispatcher import org.opendc.common.asCoroutineDispatcher import org.opendc.compute.api.Server +import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.telemetry.table.HostInfo @@ -55,6 +56,7 @@ public class ComputeMetricReader( private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5), private val startTime: Duration = Duration.ofMillis(0), + private val carbonTrace: CarbonTrace = CarbonTrace(null), ) : AutoCloseable { private val logger = KotlinLogging.logger {} private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) @@ -101,7 +103,7 @@ public class ComputeMetricReader( val now = this.clock.instant() for (host in this.service.hosts) { - val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it, startTime) } + val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it, startTime, carbonTrace) } reader.record(now) this.monitor.record(reader.copy()) reader.reset() @@ -218,6 +220,7 @@ public class ComputeMetricReader( private class HostTableReaderImpl( host: Host, private val startTime: Duration = Duration.ofMillis(0), + private val carbonTrace: CarbonTrace = CarbonTrace(null), ) : HostTableReader { override fun copy(): HostTableReader { val newHostTable = HostTableReaderImpl(_host) @@ -244,6 +247,8 @@ public class ComputeMetricReader( _cpuLostTime = table.cpuLostTime _powerDraw = table.powerDraw _energyUsage = table.energyUsage + _carbonIntensity = table.carbonIntensity + _carbonEmission = table.carbonEmission _uptime = table.uptime _downtime = table.downtime _bootTime = table.bootTime @@ -322,6 +327,14 @@ public class ComputeMetricReader( private var _energyUsage = 0.0 private var previousPowerTotal = 0.0 + override val carbonIntensity: Double + get() = _carbonIntensity + private var _carbonIntensity = 0.0 + + override val carbonEmission: Double + get() = _carbonEmission + private var _carbonEmission = 0.0 + override val uptime: Long get() = _uptime - previousUptime private var _uptime = 0L @@ -360,6 +373,9 @@ public class ComputeMetricReader( _cpuLostTime = hostCpuStats.lostTime _powerDraw = hostSysStats.powerDraw _energyUsage = hostSysStats.energyUsage + _carbonIntensity = carbonTrace.getCarbonIntensity(absoluteTimestamp) + + _carbonEmission = carbonIntensity * (energyUsage / 3600000.0) // convert energy usage from J to kWh _uptime = hostSysStats.uptime.toMillis() _downtime = hostSysStats.downtime.toMillis() _bootTime = hostSysStats.bootTime @@ -389,6 +405,9 @@ public class ComputeMetricReader( _cpuUtilization = 0.0 _powerDraw = 0.0 + _energyUsage = 0.0 + _carbonIntensity = 0.0 + _carbonEmission = 0.0 } } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt index 4fc5a078..dc2d39c2 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt @@ -148,19 +148,27 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : consumer.addDouble(data.energyUsage) consumer.endField("energy_usage", 18) - consumer.startField("uptime", 19) + consumer.startField("carbon_intensity", 19) + consumer.addDouble(data.carbonIntensity) + consumer.endField("carbon_intensity", 19) + + consumer.startField("carbon_emission", 20) + consumer.addDouble(data.carbonEmission) + consumer.endField("carbon_emission", 20) + + consumer.startField("uptime", 21) consumer.addLong(data.uptime) - consumer.endField("uptime", 19) + consumer.endField("uptime", 21) - consumer.startField("downtime", 20) + consumer.startField("downtime", 22) consumer.addLong(data.downtime) - consumer.endField("downtime", 20) + consumer.endField("downtime", 22) val bootTime = data.bootTime if (bootTime != null) { - consumer.startField("boot_time", 21) + consumer.startField("boot_time", 23) consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 21) + consumer.endField("boot_time", 23) } consumer.endMessage() @@ -233,6 +241,12 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : Types .required(PrimitiveType.PrimitiveTypeName.DOUBLE) .named("energy_usage"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("carbon_intensity"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("carbon_emission"), Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("uptime"), @@ -241,7 +255,6 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .named("downtime"), Types .optional(PrimitiveType.PrimitiveTypeName.INT64) -// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) .named("boot_time"), ) .named("host") diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt index 7246ef55..e6b19c11 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt @@ -117,6 +117,16 @@ public interface HostTableReader { */ public val energyUsage: Double + /** + * The current carbon intensity of the host in gCO2 / kW. + */ + public val carbonIntensity: Double + + /** + * The current carbon emission since the last deadline in g. + */ + public val carbonEmission: Double + /** * The uptime of the host since last time in ms. */ diff --git a/opendc-experiments/opendc-experiments-base/build.gradle.kts b/opendc-experiments/opendc-experiments-base/build.gradle.kts index 07d207a3..c75af87b 100644 --- a/opendc-experiments/opendc-experiments-base/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-base/build.gradle.kts @@ -41,6 +41,7 @@ dependencies { implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-telemetry"))) implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-core"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-topology"))) + implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon"))) runtimeOnly(libs.log4j.core) runtimeOnly(libs.log4j.slf4j) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/Scenario.kt index 192bba1e..f0e5717a 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/Scenario.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/Scenario.kt @@ -30,6 +30,7 @@ public data class Scenario( val workload: WorkloadSpec, val allocationPolicy: AllocationPolicySpec, val failureModel: FailureModel?, + val carbonTracePath: String? = null, val exportModel: ExportSpec = ExportSpec(), val outputFolder: String = "output", val name: String = "", diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/ScenarioFactories.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/ScenarioFactories.kt index d806e95e..93b2a2b5 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/ScenarioFactories.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/ScenarioFactories.kt @@ -48,6 +48,7 @@ public fun getScenario(scenarioSpec: ScenarioSpec): Scenario { workload, allocationPolicy, failureModel, + scenarioSpec.carbonTracePath, exportModel, scenarioSpec.outputFolder, scenarioSpec.name, diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/ScenarioSpecs.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/ScenarioSpecs.kt index 20c8a6e0..f39b16dd 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/ScenarioSpecs.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/models/scenario/ScenarioSpecs.kt @@ -47,6 +47,7 @@ public data class ScenarioSpec( val workload: WorkloadSpec, val allocationPolicy: AllocationPolicySpec, val failureModel: FailureModelSpec = FailureModelSpec(), + val carbonTracePath: String? = null, val exportModel: ExportSpec = ExportSpec(), val outputFolder: String = "output", val initialSeed: Int = 0, @@ -55,6 +56,7 @@ public data class ScenarioSpec( ) { init { require(runs > 0) { "The number of runs should always be positive" } + require(carbonTracePath == null || File(carbonTracePath).exists()) { "The provided carbon trace cannot be found: $carbonTracePath" } // generate name if not provided if (name == "") { diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 04998309..59c11f34 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -24,6 +24,7 @@ package org.opendc.experiments.base.runner import me.tongfei.progressbar.ProgressBarBuilder import me.tongfei.progressbar.ProgressBarStyle +import org.opendc.compute.carbon.getCarbonTrace import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeSchedulerEnum import org.opendc.compute.service.scheduler.createComputeScheduler @@ -118,6 +119,8 @@ public fun runScenario( setupHosts(serviceDomain, scenario.topology, optimize = true), ) + val carbonTrace = getCarbonTrace(scenario.carbonTracePath) + val partition = scenario.name + "/seed=$seed" val workloadLoader = ComputeWorkloadLoader(File(scenario.workload.pathToFile)) @@ -135,6 +138,7 @@ public fun runScenario( ), Duration.ofSeconds(scenario.exportModel.exportInterval), startTime, + carbonTrace, ), ) diff --git a/opendc-trace/opendc-trace-api/build.gradle.kts b/opendc-trace/opendc-trace-api/build.gradle.kts index 02195ed0..f9da98c5 100644 --- a/opendc-trace/opendc-trace-api/build.gradle.kts +++ b/opendc-trace/opendc-trace-api/build.gradle.kts @@ -30,6 +30,7 @@ dependencies { implementation(libs.jackson.dataformat.csv) + implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet"))) implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet"))) testImplementation(project(mapOf("path" to ":opendc-trace:opendc-trace-testkit"))) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt new file mode 100644 index 00000000..de74c4fd --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("CarbonIntensityColumns") + +package org.opendc.trace.conv + +/** + * A column containing the task identifier. + */ +public const val CARBON_INTENSITY_TIMESTAMP: String = "timestamp" + +/** + * A column containing the task identifier. + */ +public const val CARBON_INTENSITY_VALUE: String = "carbon_intensity" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index 046dd13d..baaa0690 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -42,6 +42,12 @@ public val resourceClusterID: String = "cluster_id" @JvmField public val resourceStartTime: String = "start_time" +/** + * Start time for the resource. + */ +@JvmField +public val resourceCarbonIntensity: String = "carbon_intensity" + /** * End time for the resource. */ diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt index 495628da..9b8fc6cf 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt @@ -48,3 +48,5 @@ public const val TABLE_RESOURCE_STATES: String = "resource_states" * A table containing the groups of resources that interfere when run on the same execution platform. */ public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups" + +public const val TABLE_CARBON_INTENSITY: String = "carbon_intensities" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt new file mode 100644 index 00000000..226c8806 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon + +import org.opendc.trace.TableReader +import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP +import org.opendc.trace.conv.CARBON_INTENSITY_VALUE +import org.opendc.trace.formats.carbon.parquet.CarbonIntensityFragment +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] implementation for the WTF format. + */ +internal class CarbonTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: CarbonIntensityFragment? = null + + override fun nextRow(): Boolean { + try { + val record = reader.read() + this.record = record + + return record != null + } catch (e: Throwable) { + this.record = null + throw e + } + } + + private val colTimestamp = 0 + private val colCarbonIntensity = 1 + + override fun resolve(name: String): Int { + return when (name) { + CARBON_INTENSITY_TIMESTAMP -> colTimestamp + CARBON_INTENSITY_VALUE -> colCarbonIntensity + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in colTimestamp..colCarbonIntensity) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colCarbonIntensity -> record.carbonIntensity + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getString(index: Int): String { + throw IllegalArgumentException("Invalid column") + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colTimestamp -> record.timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration { + throw IllegalArgumentException("Invalid column") + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt new file mode 100644 index 00000000..0daa1297 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon + +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP +import org.opendc.trace.conv.CARBON_INTENSITY_VALUE +import org.opendc.trace.conv.TABLE_CARBON_INTENSITY +import org.opendc.trace.formats.carbon.parquet.CarbonIntensityReadSupport +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path + +/** + * A [TraceFormat] implementation for the Carbon Intensity trace. + */ +public class CarbonTraceFormat : TraceFormat { + override val name: String = "carbon_intensity" + + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List = listOf(TABLE_CARBON_INTENSITY) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_CARBON_INTENSITY -> + TableDetails( + listOf( + TableColumn(CARBON_INTENSITY_TIMESTAMP, TableColumnType.Instant), + TableColumn(CARBON_INTENSITY_VALUE, TableColumnType.Double), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_CARBON_INTENSITY -> { + val reader = LocalParquetReader(path, CarbonIntensityReadSupport(projection)) + CarbonTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt new file mode 100644 index 00000000..3211cb6c --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon.parquet + +import java.time.Instant + +/** + * A task in the Workflow Trace Format. + */ +internal data class CarbonIntensityFragment( + val timestamp: Instant, + val carbonIntensity: Double, +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt new file mode 100644 index 00000000..2f4eac05 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types +import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP +import org.opendc.trace.conv.CARBON_INTENSITY_VALUE + +/** + * A [ReadSupport] instance for [Task] objects. + * + * @param projection The projection of the table to read. + */ +internal class CarbonIntensityReadSupport(private val projection: List?) : ReadSupport() { + /** + * Mapping of table columns to their Parquet column names. + */ + private val colMap = + mapOf( + CARBON_INTENSITY_TIMESTAMP to "timestamp", + CARBON_INTENSITY_VALUE to "carbon_intensity", + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + + for (col in projection) { + val fieldName = colMap[col] ?: continue + addField(fieldByName.getValue(fieldName)) + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer = CarbonIntensityRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema for the "tasks" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("carbon_intensity"), + ) + .named("carbon_intensity_fragment") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt new file mode 100644 index 00000000..f5d68f22 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon.parquet + +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import java.time.Instant + +/** + * A [RecordMaterializer] for [Task] records. + */ +internal class CarbonIntensityRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var localTimestamp: Instant = Instant.MIN + private var localCarbonIntensity: Double = 0.0 + + /** + * Root converter for the record. + */ + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "timestamp" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localTimestamp = Instant.ofEpochMilli(value) + } + } + "carbon_intensity" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCarbonIntensity = value + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + localTimestamp = Instant.MIN + localCarbonIntensity = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): CarbonIntensityFragment = + CarbonIntensityFragment( + localTimestamp, + localCarbonIntensity, + ) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 89cac608..67df667b 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -26,6 +26,7 @@ import org.opendc.trace.TableReader import org.opendc.trace.TableWriter import org.opendc.trace.azure.AzureTraceFormat import org.opendc.trace.bitbrains.BitbrainsTraceFormat +import org.opendc.trace.formats.carbon.CarbonTraceFormat import org.opendc.trace.formats.opendc.OdcVmTraceFormat import org.opendc.trace.gwf.GwfTraceFormat import org.opendc.trace.swf.SwfTraceFormat @@ -114,26 +115,17 @@ public interface TraceFormat { return ServiceLoader.load(TraceFormat::class.java) } -// /** -// * Obtain a [TraceFormat] implementation by [name]. -// */ -// @JvmStatic -// public fun byName(name: String): TraceFormat? { -// -// val loader = ServiceLoader.load(TraceFormat::class.java) -// return loader.find { it.name == name } -// } - /** * Obtain a [TraceFormat] implementation by [name]. */ @JvmStatic public fun byName(name: String): TraceFormat? { return when (name) { - "opendc-vm" -> OdcVmTraceFormat() "azure" -> AzureTraceFormat() "bitbrains" -> BitbrainsTraceFormat() + "carbon" -> CarbonTraceFormat() "gwf" -> GwfTraceFormat() + "opendc-vm" -> OdcVmTraceFormat() "swf" -> SwfTraceFormat() "wfformat" -> WfFormatTraceFormat() "wtf" -> WtfTraceFormat() diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index fcf9a89a..d5d1dbc6 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -54,6 +54,7 @@ dependencies { implementation(libs.kotlin.logging) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-workload"))) + implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon"))) implementation(project(mapOf("path" to ":opendc-experiments:opendc-experiments-base"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-topology"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-telemetry"))) diff --git a/settings.gradle.kts b/settings.gradle.kts index 8553ad9f..26ff6d34 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -28,6 +28,7 @@ include(":opendc-compute:opendc-compute-telemetry") include(":opendc-compute:opendc-compute-topology") include(":opendc-compute:opendc-compute-simulator") include(":opendc-compute:opendc-compute-workload") +include(":opendc-compute:opendc-compute-carbon") include(":opendc-workflow:opendc-workflow-api") include(":opendc-workflow:opendc-workflow-service") include(":opendc-faas:opendc-faas-api") diff --git a/site/docs/documentation/Output.md b/site/docs/documentation/Output.md index ef968741..176c8721 100644 --- a/site/docs/documentation/Output.md +++ b/site/docs/documentation/Output.md @@ -6,61 +6,66 @@ contains metrics describing the overall performance. An experiment in OpenDC has ### Server The server output file, contains all metrics of related to the servers run. -| Metric | Datatype | Unit | Summary | -|-----------------|----------|--------|-------------------------------------------------------------------------------| -| timestamp | int64 | ms | Timestamp of the sample | -| server_id | binary | string | The id of the server determined during runtime | -| server_name | binary | string | The name of the server provided by the Trace | -| host_id | binary | string | The id of the host on which the server is hosted or `null` if it has no host. | -| mem_capacity | int64 | Mb | | -| cpu_count | int32 | count | | -| cpu_limit | double | MHz | The capacity of the CPUs of Host on which the server is running. | -| cpu_time_active | int64 | ms | The duration that a CPU was active in the server. | -| cpu_time_idle | int64 | ms | The duration that a CPU was idle in the server. | -| cpu_time_steal | int64 | ms | The duration that a vCPU wanted to run, but no capacity was available. | -| cpu_time_lost | int64 | ms | The duration of CPU time that was lost due to interference. | -| uptime | int64 | ms | The uptime of the host since last sample. | -| downtime | int64 | ms | The downtime of the host since last sample. | -| provision_time | int64 | ms | The time for which the server was enqueued for the scheduler. | -| boot_time | int64 | ms | The time the server took booting. | +| Metric | Datatype | Unit | Summary | +|--------------------|----------|--------|-------------------------------------------------------------------------------| +| timestamp | int64 | ms | Timestamp of the sample since the start of the workload | +| absolute timestamp | int64 | ms | The absolute timestamp based on the given workload | +| server_id | binary | string | The id of the server determined during runtime | +| server_name | binary | string | The name of the server provided by the Trace | +| host_id | binary | string | The id of the host on which the server is hosted or `null` if it has no host. | +| mem_capacity | int64 | Mb | | +| cpu_count | int32 | count | | +| cpu_limit | double | MHz | The capacity of the CPUs of Host on which the server is running. | +| cpu_time_active | int64 | ms | The duration that a CPU was active in the server. | +| cpu_time_idle | int64 | ms | The duration that a CPU was idle in the server. | +| cpu_time_steal | int64 | ms | The duration that a vCPU wanted to run, but no capacity was available. | +| cpu_time_lost | int64 | ms | The duration of CPU time that was lost due to interference. | +| uptime | int64 | ms | The uptime of the host since last sample. | +| downtime | int64 | ms | The downtime of the host since last sample. | +| provision_time | int64 | ms | The time for which the server was enqueued for the scheduler. | +| boot_time | int64 | ms | The time the server took booting. | ### Host The host output file, contains all metrics of related to the host run. -| Metric | DataType | Unit | Summary | -|-------------------|----------|------------|-------------------------------------------------------------------------------------------------| -| timestamp | int64 | ms | Timestamp of the sample | -| host_id | binary | string | The id of the host given by OpenDC | -| cpu_count | int32 | count | The number of available cpu cores | -| mem_capacity | int64 | Mb | The amount of available memory | -| guests_terminated | int32 | count | The number of guests that are in a terminated state. | -| guests_running | int32 | count | The number of guests that are in a running state. | -| guests_error | int32 | count | The number of guests that are in an error state. | -| guests_invalid | int32 | count | The number of guests that are in an unknown state. | -| cpu_limit | double | MHz | The capacity of the CPUs in the host. | -| cpu_usage | double | MHz | The usage of all CPUs in the host. | -| cpu_demand | double | MHz | The demand of all vCPUs of the guests | -| cpu_utilization | double | ratio | The CPU utilization of the host. This is calculated by dividing the cpu_usage, by the cpu_limit | -| cpu_time_active | int64 | ms | The duration that a CPU was active in the host. | -| cpu_time_idle | int64 | ms | The duration that a CPU was idle in the host. | -| cpu_time_steal | int64 | ms | The duration that a vCPU wanted to run, but no capacity was available. | -| cpu_time_lost | int64 | ms | The duration of CPU time that was lost due to interference. | -| power_draw | double | Watt | The current power draw of the host. | -| energy_usage | double | Joule (Ws) | he total energy consumption of the host since last sample. | -| uptime | int64 | ms | The uptime of the host since last sample. | -| downtime | int64 | ms | The downtime of the host since last sample. | -| boot_time | int64 | ms | The time the host took to boot. | +| Metric | DataType | Unit | Summary | +|--------------------|----------|------------|-------------------------------------------------------------------------------------------------| +| timestamp | int64 | ms | Timestamp of the sample | +| absolute timestamp | int64 | ms | The absolute timestamp based on the given workload | +| host_id | binary | string | The id of the host given by OpenDC | +| cpu_count | int32 | count | The number of available cpu cores | +| mem_capacity | int64 | Mb | The amount of available memory | +| guests_terminated | int32 | count | The number of guests that are in a terminated state. | +| guests_running | int32 | count | The number of guests that are in a running state. | +| guests_error | int32 | count | The number of guests that are in an error state. | +| guests_invalid | int32 | count | The number of guests that are in an unknown state. | +| cpu_limit | double | MHz | The capacity of the CPUs in the host. | +| cpu_usage | double | MHz | The usage of all CPUs in the host. | +| cpu_demand | double | MHz | The demand of all vCPUs of the guests | +| cpu_utilization | double | ratio | The CPU utilization of the host. This is calculated by dividing the cpu_usage, by the cpu_limit | +| cpu_time_active | int64 | ms | The duration that a CPU was active in the host. | +| cpu_time_idle | int64 | ms | The duration that a CPU was idle in the host. | +| cpu_time_steal | int64 | ms | The duration that a vCPU wanted to run, but no capacity was available. | +| cpu_time_lost | int64 | ms | The duration of CPU time that was lost due to interference. | +| power_draw | double | Watt | The current power draw of the host. | +| energy_usage | double | Joule (Ws) | The total energy consumption of the host since last sample. | +| carbon_intensity | double | gCO2/kW | The amount of carbon that is emitted when using a unit of energy | +| carbon_emission | double | gram | The amount of carbon emitted since the previous sample | +| uptime | int64 | ms | The uptime of the host since last sample. | +| downtime | int64 | ms | The downtime of the host since last sample. | +| boot_time | int64 | ms | The time the host took to boot. | ### Service The service output file, contains metrics providing an overview of the performance. -| Metric | DataType | Unit | Summary | -|------------------|----------|-------|------------------------------------------------------------------------| -| timestamp | int64 | ms | Timestamp of the sample | -| hosts_up | int32 | count | The number of hosts that are up at this instant. | -| hosts_down | int32 | count | The number of hosts that are down at this instant. | -| servers_pending | int32 | count | The number of servers that are pending to be scheduled. | -| servers_active | int32 | count | The number of servers that are currently active. | -| attempts_success | int32 | count | The scheduling attempts that were successful. | -| attempts_failure | int32 | count | The scheduling attempts that were unsuccessful due to client error. | -| attempts_error | int32 | count | The scheduling attempts that were unsuccessful due to scheduler error. | +| Metric | DataType | Unit | Summary | +|--------------------|----------|-------|------------------------------------------------------------------------| +| timestamp | int64 | ms | Timestamp of the sample | +| absolute timestamp | int64 | ms | The absolute timestamp based on the given workload | +| hosts_up | int32 | count | The number of hosts that are up at this instant. | +| hosts_down | int32 | count | The number of hosts that are down at this instant. | +| servers_pending | int32 | count | The number of servers that are pending to be scheduled. | +| servers_active | int32 | count | The number of servers that are currently active. | +| attempts_success | int32 | count | The scheduling attempts that were successful. | +| attempts_failure | int32 | count | The scheduling attempts that were unsuccessful due to client error. | +| attempts_error | int32 | count | The scheduling attempts that were unsuccessful due to scheduler error. | -- cgit v1.2.3