From 616017ba78a0882fe38b9b171b2b0f68e593cd8d Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Mon, 8 Jan 2024 13:44:09 +0100 Subject: refactored opendc-experiment-compute (#190) * removed experiment-compute and integrated all components into opendc-compute * updated workflow gradle file * removed unneeded code --- .../compute/service/scheduler/ComputeSchedulers.kt | 85 ++++ .../opendc-compute-simulator/build.gradle.kts | 4 + .../compute/simulator/MutableServiceRegistry.kt | 58 +++ .../opendc/compute/simulator/ServiceRegistry.kt | 45 ++ .../compute/simulator/ServiceRegistryImpl.kt | 66 +++ .../compute/simulator/failure/FailureModel.kt | 43 ++ .../compute/simulator/failure/FailureModels.kt | 68 +++ .../provisioner/ComputeMonitorProvisioningStep.kt | 44 ++ .../provisioner/ComputeServiceProvisioningStep.kt | 49 ++ .../compute/simulator/provisioner/ComputeSteps.kt | 74 +++ .../simulator/provisioner/HostsProvisioningStep.kt | 75 +++ .../compute/simulator/provisioner/Provisioner.kt | 98 ++++ .../simulator/provisioner/ProvisioningContext.kt | 50 ++ .../simulator/provisioner/ProvisioningStep.kt | 61 +++ .../opendc-compute-telemetry/build.gradle.kts | 40 ++ .../compute/telemetry/ComputeMetricReader.kt | 508 +++++++++++++++++++++ .../org/opendc/compute/telemetry/ComputeMonitor.kt | 47 ++ .../export/parquet/ParquetComputeMonitor.kt | 67 +++ .../telemetry/export/parquet/ParquetDataWriter.kt | 132 ++++++ .../export/parquet/ParquetHostDataWriter.kt | 233 ++++++++++ .../export/parquet/ParquetServerDataWriter.kt | 208 +++++++++ .../export/parquet/ParquetServiceDataWriter.kt | 131 ++++++ .../compute/telemetry/export/parquet/Utils.kt | 38 ++ .../org/opendc/compute/telemetry/table/HostInfo.kt | 28 ++ .../compute/telemetry/table/HostTableReader.kt | 130 ++++++ .../opendc/compute/telemetry/table/ServerInfo.kt | 37 ++ .../compute/telemetry/table/ServerTableReader.kt | 95 ++++ .../opendc/compute/telemetry/table/ServiceData.kt | 57 +++ .../compute/telemetry/table/ServiceTableReader.kt | 80 ++++ .../src/test/resources/log4j2.xml | 38 ++ .../opendc-compute-topology/build.gradle.kts | 37 ++ .../org/opendc/compute/topology/ClusterSpec.kt | 46 ++ .../opendc/compute/topology/ClusterSpecReader.kt | 121 +++++ .../kotlin/org/opendc/compute/topology/HostSpec.kt | 48 ++ .../opendc/compute/topology/TopologyFactories.kt | 98 ++++ .../src/test/resources/log4j2.xml | 38 ++ .../opendc-compute-workload/build.gradle.kts | 37 ++ .../org/opendc/compute/workload/ComputeWorkload.kt | 35 ++ .../compute/workload/ComputeWorkloadLoader.kt | 266 +++++++++++ .../opendc/compute/workload/ComputeWorkloads.kt | 63 +++ .../org/opendc/compute/workload/VirtualMachine.kt | 54 +++ .../workload/internal/CompositeComputeWorkload.kt | 66 +++ .../workload/internal/HpcSampledComputeWorkload.kt | 142 ++++++ .../internal/LoadSampledComputeWorkload.kt | 61 +++ .../workload/internal/TraceComputeWorkload.kt | 37 ++ .../src/test/resources/log4j2.xml | 38 ++ 46 files changed, 3876 insertions(+) create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeSchedulers.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModel.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModels.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningStep.kt create mode 100644 opendc-compute/opendc-compute-telemetry/build.gradle.kts create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostInfo.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/test/resources/log4j2.xml create mode 100644 opendc-compute/opendc-compute-topology/build.gradle.kts create mode 100644 opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpec.kt create mode 100644 opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpecReader.kt create mode 100644 opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/HostSpec.kt create mode 100644 opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt create mode 100644 opendc-compute/opendc-compute-topology/src/test/resources/log4j2.xml create mode 100644 opendc-compute/opendc-compute-workload/build.gradle.kts create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt create mode 100644 opendc-compute/opendc-compute-workload/src/test/resources/log4j2.xml (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeSchedulers.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeSchedulers.kt new file mode 100644 index 00000000..2f071c13 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeSchedulers.kt @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") + +package org.opendc.compute.service.scheduler + +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.SplittableRandom +import java.util.random.RandomGenerator + +/** + * Create a [ComputeScheduler] for the experiment. + */ +public fun createComputeScheduler(name: String, seeder: RandomGenerator, placements: Map = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (name) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = SplittableRandom(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(placements) + else -> throw IllegalArgumentException("Unknown policy $name") + } +} diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index 72962147..625f278b 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -34,6 +34,10 @@ dependencies { implementation(projects.opendcCommon) implementation(libs.kotlin.logging) + api(libs.microprofile.config) + implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-topology"))) + implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-telemetry"))) + testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt new file mode 100644 index 00000000..49b3688e --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +/** + * A mutable [ServiceRegistry]. + */ +public interface MutableServiceRegistry : ServiceRegistry { + /** + * Register [service] for the specified [name] in this registry. + * + * @param name The name of the service to register, which should follow the rules for domain names as defined by + * DNS. + * @param type The interface provided by the service. + * @param service A reference to the actual implementation of the service. + */ + public fun register(name: String, type: Class, service: T) + + /** + * Remove the service with [name] and [type] from this registry. + * + * @param name The name of the service to remove, which should follow the rules for domain names as defined by DNS. + * @param type The type of the service to remove. + */ + public fun remove(name: String, type: Class<*>) + + /** + * Remove all services registered with [name]. + * + * @param name The name of the services to remove, which should follow the rules for domain names as defined by DNS. + */ + public fun remove(name: String) + + /** + * Create a copy of the registry. + */ + public override fun clone(): MutableServiceRegistry +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt new file mode 100644 index 00000000..d3af3f01 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +/** + * A read-only registry of services used during experiments to resolve services. + * + * The service registry is similar conceptually to the Domain Name System (DNS), which is a naming system used to + * identify computers reachable via the Internet. The service registry should be used in a similar fashion. + */ +public interface ServiceRegistry { + /** + * Lookup the service with the specified [name] and [type]. + * + * @param name The name of the service to resolve, which should follow the rules for domain names as defined by DNS. + * @param type The type of the service to resolve, identified by the interface that is implemented by the service. + * @return The service with specified [name] and implementing [type] or `null` if it does not exist. + */ + public fun resolve(name: String, type: Class): T? + + /** + * Create a copy of the registry. + */ + public fun clone(): ServiceRegistry +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt new file mode 100644 index 00000000..a9d05844 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +/** + * Implementation of the [MutableServiceRegistry] interface. + */ +internal class ServiceRegistryImpl(private val registry: MutableMap, Any>> = mutableMapOf()) : + MutableServiceRegistry { + override fun resolve(name: String, type: Class): T? { + val servicesForName = registry[name] ?: return null + + @Suppress("UNCHECKED_CAST") + return servicesForName[type] as T? + } + + override fun register(name: String, type: Class, service: T) { + val services = registry.computeIfAbsent(name) { mutableMapOf() } + + if (type in services) { + throw IllegalStateException("Duplicate service $type registered for name $name") + } + + services[type] = service + } + + override fun remove(name: String, type: Class<*>) { + val services = registry[name] ?: return + services.remove(type) + } + + override fun remove(name: String) { + registry.remove(name) + } + + override fun clone(): MutableServiceRegistry { + val res = mutableMapOf, Any>>() + registry.mapValuesTo(res) { (_, v) -> v.toMutableMap() } + return ServiceRegistryImpl(res) + } + + override fun toString(): String { + val entries = registry.map { "${it.key}=${it.value}" }.joinToString() + return "ServiceRegistry{$entries}" + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModel.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModel.kt new file mode 100644 index 00000000..5e94830c --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModel.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.opendc.compute.service.ComputeService +import java.time.InstantSource +import java.util.random.RandomGenerator +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +public interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + public fun createInjector( + context: CoroutineContext, + clock: InstantSource, + service: ComputeService, + random: RandomGenerator + ): HostFaultInjector +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModels.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModels.kt new file mode 100644 index 00000000..337f3c60 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModels.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") + +package org.opendc.compute.simulator.failure + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import java.time.Duration +import java.time.InstantSource +import java.util.random.RandomGenerator +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +public fun grid5000(failureInterval: Duration): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: InstantSource, + service: ComputeService, + random: RandomGenerator + ): HostFaultInjector { + val rng = Well19937c(random.nextLong()) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt new file mode 100644 index 00000000..50e7bd0d --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.provisioner + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.telemetry.ComputeMetricReader +import org.opendc.compute.telemetry.ComputeMonitor +import java.time.Duration + +/** + * A [ProvisioningStep] that provisions a [ComputeMetricReader] to periodically collect the metrics of a [ComputeService] + * and report them to a [ComputeMonitor]. + */ +public class ComputeMonitorProvisioningStep( + private val serviceDomain: String, + private val monitor: ComputeMonitor, + private val exportInterval: Duration +) : ProvisioningStep { + override fun apply(ctx: ProvisioningContext): AutoCloseable { + val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } + val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval) + return AutoCloseable { metricReader.close() } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt new file mode 100644 index 00000000..fc555016 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.provisioner + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import java.time.Duration + +/** + * A [ProvisioningStep] that provisions a [ComputeService] without any hosts. + * + * @param serviceDomain The domain name under which to register the compute service. + * @param scheduler A function to construct the compute scheduler. + * @param schedulingQuantum The scheduling quantum of the compute scheduler. + */ +public class ComputeServiceProvisioningStep internal constructor( + private val serviceDomain: String, + private val scheduler: (ProvisioningContext) -> ComputeScheduler, + private val schedulingQuantum: Duration +) : ProvisioningStep { + override fun apply(ctx: ProvisioningContext): AutoCloseable { + val service = ComputeService.builder(ctx.dispatcher, scheduler(ctx)) + .withQuantum(schedulingQuantum) + .build() + ctx.registry.register(serviceDomain, ComputeService::class.java, service) + + return AutoCloseable { service.close() } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt new file mode 100644 index 00000000..93f8fa4f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSteps") + +package org.opendc.compute.simulator.provisioner + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.telemetry.ComputeMonitor +import org.opendc.compute.topology.HostSpec +import java.time.Duration + +/** + * Return a [ProvisioningStep] that provisions a [ComputeService] without any hosts. + * + * @param serviceDomain The domain name under which to register the compute service. + * @param scheduler A function to construct the compute scheduler. + * @param schedulingQuantum The scheduling quantum of the compute scheduler. + */ +public fun setupComputeService( + serviceDomain: String, + scheduler: (ProvisioningContext) -> ComputeScheduler, + schedulingQuantum: Duration = Duration.ofMinutes(5) +): ProvisioningStep { + return ComputeServiceProvisioningStep(serviceDomain, scheduler, schedulingQuantum) +} + +/** + * Return a [ProvisioningStep] that installs a [ComputeMetricReader] to periodically collect the metrics of a + * [ComputeService] and report them to a [ComputeMonitor]. + * + * @param serviceDomain The service domain at which the [ComputeService] is located. + * @param monitor The [ComputeMonitor] to install. + * @param exportInterval The interval between which to collect the metrics. + */ +public fun registerComputeMonitor( + serviceDomain: String, + monitor: ComputeMonitor, + exportInterval: Duration = Duration.ofMinutes(5) +): ProvisioningStep { + return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval) +} + +/** + * Return a [ProvisioningStep] that sets up the specified list of hosts (based on [specs]) for the specified compute + * service. + * + * @param serviceDomain The domain name under which the compute service is registered. + * @param specs A list of [HostSpec] objects describing the simulated hosts to provision. + * @param optimize A flag to indicate that the CPU resources of the host should be merged into a single CPU resource. + */ +public fun setupHosts(serviceDomain: String, specs: List, optimize: Boolean = false): ProvisioningStep { + return HostsProvisioningStep(serviceDomain, specs, optimize) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt new file mode 100644 index 00000000..3104ccbe --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.provisioner + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.topology.HostSpec +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.kernel.SimHypervisor +import org.opendc.simulator.flow2.FlowEngine +import java.util.SplittableRandom + +/** + * A [ProvisioningStep] that provisions a list of hosts for a [ComputeService]. + * + * @param serviceDomain The domain name under which the compute service is registered. + * @param specs A list of [HostSpec] objects describing the simulated hosts to provision. + * @param optimize A flag to indicate that the CPU resources of the host should be merged into a single CPU resource. + */ +public class HostsProvisioningStep internal constructor( + private val serviceDomain: String, + private val specs: List, + private val optimize: Boolean +) : ProvisioningStep { + override fun apply(ctx: ProvisioningContext): AutoCloseable { + val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } + val engine = FlowEngine.create(ctx.dispatcher) + val graph = engine.newGraph() + val hosts = mutableSetOf() + + for (spec in specs) { + val machine = SimBareMetalMachine.create(graph, spec.model, spec.psuFactory) + val hypervisor = SimHypervisor.create(spec.multiplexerFactory, SplittableRandom(ctx.seeder.nextLong())) + + val host = SimHost( + spec.uid, + spec.name, + spec.meta, + ctx.dispatcher.timeSource, + machine, + hypervisor, + optimize = optimize + ) + + require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" } + service.addHost(host) + } + + return AutoCloseable { + for (host in hosts) { + host.close() + } + } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt new file mode 100644 index 00000000..275378e7 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.provisioner + +import org.opendc.common.Dispatcher +import org.opendc.compute.simulator.MutableServiceRegistry +import org.opendc.compute.simulator.ServiceRegistry +import org.opendc.compute.simulator.ServiceRegistryImpl +import java.util.ArrayDeque +import java.util.SplittableRandom + +/** + * A helper class to set up the experimental environment in a reproducible manner. + * + * With this class, users describe the environment using multiple [ProvisioningStep]s. These re-usable + * [ProvisioningStep]s are executed sequentially and ensure that the necessary infrastructure is configured and teared + * down after the simulation completes. + * + * @param dispatcher The [Dispatcher] implementation for scheduling future tasks. + * @param seed A seed for initializing the randomness of the environment. + */ +public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable { + /** + * Implementation of [ProvisioningContext]. + */ + private val context = object : ProvisioningContext { + override val dispatcher: Dispatcher = dispatcher + override val seeder: SplittableRandom = SplittableRandom(seed) + override val registry: MutableServiceRegistry = ServiceRegistryImpl() + + override fun toString(): String = "Provisioner.ProvisioningContext" + } + + /** + * The stack of handles to run during the clean-up process. + */ + private val stack = ArrayDeque() + + /** + * The [ServiceRegistry] containing the services registered in this environment. + */ + public val registry: ServiceRegistry + get() = context.registry + + /** + * Run a single [ProvisioningStep] for this environment. + * + * @param step The step to apply to the environment. + */ + public fun runStep(step: ProvisioningStep) { + val handle = step.apply(context) + stack.push(handle) + } + + /** + * Run multiple [ProvisioningStep]s for this environment. + * + * @param steps The steps to apply to the environment. + */ + public fun runSteps(vararg steps: ProvisioningStep) { + val ctx = context + val stack = stack + for (step in steps) { + val handle = step.apply(ctx) + stack.push(handle) + } + } + + /** + * Clean-up the environment. + */ + override fun close() { + val stack = stack + while (stack.isNotEmpty()) { + stack.pop().close() + } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt new file mode 100644 index 00000000..1788c8e2 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningContext.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.provisioner + +import org.opendc.common.Dispatcher +import org.opendc.compute.simulator.MutableServiceRegistry +import java.util.SplittableRandom +import java.util.random.RandomGenerator + +/** + * The [ProvisioningContext] class provides access to shared state between subsequent [ProvisioningStep]s, as well as + * access to the simulation dispatcher, the virtual clock, and a randomness seeder to allow + * the provisioning steps to initialize the (simulated) resources. + */ +public interface ProvisioningContext { + /** + * The [Dispatcher] provided by the provisioner to schedule future events during the simulation. + */ + public val dispatcher: Dispatcher + + /** + * A [SplittableRandom] instance used to seed the provisioners. + */ + public val seeder: RandomGenerator + + /** + * A [MutableServiceRegistry] where the provisioned services are registered. + */ + public val registry: MutableServiceRegistry +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningStep.kt new file mode 100644 index 00000000..0226a704 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningStep.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.provisioner + +import org.eclipse.microprofile.config.Config + +/** + * A provisioning step is responsible for provisioning (acquiring or configuring) infrastructure necessary for a + * simulation experiment. + */ +public fun interface ProvisioningStep { + /** + * Apply the step by provisioning the required resources for the experiment using the specified + * [ProvisioningContext][ctx]. + * + * @param ctx The environment in which the resources should be provisioned. + * @return A handle that is invoked once the simulation completes, so that the resources can be cleaned up. + */ + public fun apply(ctx: ProvisioningContext): AutoCloseable + + /** + * A factory interface for [ProvisioningStep] instances. + * + * @param S The type that describes the input for constructing a [ProvisioningStep]. + */ + public abstract class Provider(public val type: Class) { + /** + * The name that identifies the provisioning step. + */ + public abstract val name: String + + /** + * Construct a [ProvisioningStep] with the specified [spec]. + * + * @param spec The specification that describes the provisioner to be created. + * @param config The external configuration of the experiment runner. + * @return The [ProvisioningStep] constructed according to [spec]. + */ + public abstract fun create(spec: S, config: Config): ProvisioningStep + } +} diff --git a/opendc-compute/opendc-compute-telemetry/build.gradle.kts b/opendc-compute/opendc-compute-telemetry/build.gradle.kts new file mode 100644 index 00000000..c403ccb9 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/build.gradle.kts @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "OpenDC Compute Service implementation" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(projects.opendcCompute.opendcComputeApi) + implementation(projects.opendcCommon) + implementation(libs.kotlin.logging) + implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet"))) + implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service"))) + + testImplementation(projects.opendcSimulator.opendcSimulatorCore) + testRuntimeOnly(libs.log4j.core) + testRuntimeOnly(libs.log4j.slf4j) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt new file mode 100644 index 00000000..3a1fed1f --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt @@ -0,0 +1,508 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.common.Dispatcher +import org.opendc.common.asCoroutineDispatcher +import org.opendc.compute.api.Server +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.compute.telemetry.table.HostInfo +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.compute.telemetry.table.ServerInfo +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.compute.telemetry.table.ServiceTableReader +import java.time.Duration +import java.time.Instant + +/** + * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every + * export interval. + * + * @param dispatcher A [Dispatcher] for scheduling the future events. + * @param service The [ComputeService] to monitor. + * @param monitor The monitor to export the metrics to. + * @param exportInterval The export interval. + */ +public class ComputeMetricReader( + dispatcher: Dispatcher, + private val service: ComputeService, + private val monitor: ComputeMonitor, + private val exportInterval: Duration = Duration.ofMinutes(5) +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) + private val clock = dispatcher.timeSource + + /** + * Aggregator for service metrics. + */ + private val serviceTableReader = ServiceTableReaderImpl(service) + + /** + * Mapping from [Host] instances to [HostTableReaderImpl] + */ + private val hostTableReaders = mutableMapOf() + + /** + * Mapping from [Server] instances to [ServerTableReaderImpl] + */ + private val serverTableReaders = mutableMapOf() + + /** + * The background job that is responsible for collecting the metrics every cycle. + */ + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + try { + while (isActive) { + delay(intervalMs) + + loggState() + } + } finally { + loggState() + + if (monitor is AutoCloseable) { + monitor.close() + } + } + } + + private fun loggState() { + try { + val now = this.clock.instant() + + for (host in this.service.hosts) { + val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } + + for (server in this.service.servers) { + val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } + + this.serviceTableReader.record(now) + monitor.record(this.serviceTableReader.copy()) + } catch (cause: Throwable) { + this.logger.warn(cause) { "Exporter threw an Exception" } + } + } + + override fun close() { + job.cancel() + } + + /** + * An aggregator for service metrics before they are reported. + */ + private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + + override fun copy(): ServiceTableReader { + val newServiceTable = ServiceTableReaderImpl(service) + newServiceTable.setValues(this) + + return newServiceTable + } + + override fun setValues(table: ServiceTableReader) { + _timestamp = table.timestamp + + _hostsUp = table.hostsUp + _hostsDown = table.hostsDown + _serversTotal = table.serversTotal + _serversPending = table.serversPending + _serversActive = table.serversActive + _attemptsSuccess = table.attemptsSuccess + _attemptsFailure = table.attemptsFailure + _attemptsError = table.attemptsError + } + + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val serversTotal: Int + get() = _serversTotal + private var _serversTotal = 0 + + override val serversPending: Int + get() = _serversPending + private var _serversPending = 0 + + override val serversActive: Int + get() = _serversActive + private var _serversActive = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + private var _attemptsError = 0 + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + _timestamp = now + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _serversTotal = stats.serversTotal + _serversPending = stats.serversPending + _serversActive = stats.serversActive + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + _attemptsError = stats.attemptsError.toInt() + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostTableReaderImpl(host: Host) : HostTableReader { + override fun copy(): HostTableReader { + val newHostTable = HostTableReaderImpl(_host) + newHostTable.setValues(this) + + return newHostTable + } + + override fun setValues(table: HostTableReader) { + _timestamp = table.timestamp + _guestsTerminated = table.guestsTerminated + _guestsRunning = table.guestsRunning + _guestsError = table.guestsError + _guestsInvalid = table.guestsInvalid + _cpuLimit = table.cpuLimit + _cpuDemand = table.cpuDemand + _cpuUsage = table.cpuUsage + _cpuUtilization = table.cpuUtilization + _cpuActiveTime = table.cpuActiveTime + _cpuIdleTime = table.cpuIdleTime + _cpuStealTime = table.cpuStealTime + _cpuLostTime = table.cpuLostTime + _powerUsage = table.powerUsage + _powerTotal = table.powerTotal + _uptime = table.uptime + _downtime = table.downtime + _bootTime = table.bootTime + } + + private val _host = host + + override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + private var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerUsage: Double + get() = _powerUsage + private var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + private var _powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerUsage = hostSysStats.powerUsage + _powerTotal = hostSysStats.energyUsage + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + override fun copy(): ServerTableReader { + val newServerTable = ServerTableReaderImpl(service, _server) + newServerTable.setValues(this) + + return newServerTable + } + + override fun setValues(table: ServerTableReader) { + host = table.host + + _timestamp = table.timestamp + _cpuLimit = table.cpuLimit + _cpuActiveTime = table.cpuActiveTime + _cpuIdleTime = table.cpuIdleTime + _cpuStealTime = table.cpuStealTime + _cpuLostTime = table.cpuLostTime + _uptime = table.uptime + _downtime = table.downtime + _provisionTime = table.provisionTime + _bootTime = table.bootTime + } + + private val _server = server + + /** + * The static information about this server. + */ + override val server = ServerInfo( + server.uid.toString(), + server.name, + "vm", + "x86", + server.image.uid.toString(), + server.image.name, + server.flavor.cpuCount, + server.flavor.memorySize + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + override var host: HostInfo? = null + private var _host: Host? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val newHost = service.lookupHost(_server) + if (newHost != null && newHost.uid != _host?.uid) { + _host = newHost + host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) + } + + val cpuStats = _host?.getCpuStats(_server) + val sysStats = _host?.getSystemStats(_server) + + _timestamp = now + _cpuLimit = cpuStats?.capacity ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = _server.launchedAt + _bootTime = sysStats?.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0 + } + } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt new file mode 100644 index 00000000..b236a7df --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry + +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.compute.telemetry.table.ServiceTableReader + +/** + * A monitor that tracks the metrics and events of the OpenDC Compute service. + */ +public interface ComputeMonitor { + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: ServerTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: HostTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: ServiceTableReader) {} +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt new file mode 100644 index 00000000..2a4f27d4 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.opendc.compute.telemetry.ComputeMonitor +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.compute.telemetry.table.ServiceTableReader +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(reader: ServerTableReader) { + serverWriter.write(reader) + } + + override fun record(reader: HostTableReader) { + hostWriter.write(reader) + } + + override fun record(reader: ServiceTableReader) { + serviceWriter.write(reader) + } + + override fun close() { + hostWriter.close() + serviceWriter.close() + serverWriter.close() + } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt new file mode 100644 index 00000000..34a75d75 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import mu.KotlinLogging +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.util.parquet.LocalParquetWriter +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * A writer that writes data in Parquet format. + * + * @param path The path to the file to write the data to. + * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format. + */ +public abstract class ParquetDataWriter( + path: File, + private val writeSupport: WriteSupport, + bufferSize: Int = 4096 +) : AutoCloseable { + /** + * The logging instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** + * The queue of records to process. + */ + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + + /** + * An exception to be propagated to the actual writer. + */ + private var exception: Throwable? = null + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = LocalParquetWriter.builder(path.toPath(), writeSupport) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf() + var shouldStop = false + + try { + while (!shouldStop) { + try { + writer.write(queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + writer.write(data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> { + return builder.build() + } + + /** + * Write the specified metrics to the database. + */ + public fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) + } + + /** + * Signal the writer to stop. + */ + override fun close() { + writerThread.interrupt() + writerThread.join() + } + + init { + writerThread.start() + } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt new file mode 100644 index 00000000..4cf9c6f1 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.trace.util.parquet.LocalParquetWriter +import java.io.File + +/** + * A Parquet event writer for [HostTableReader]s. + */ +public class ParquetHostDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, HostDataWriteSupport(), bufferSize) { + + override fun buildWriter(builder: LocalParquetWriter.Builder): ParquetWriter { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun toString(): String = "host-writer" + + /** + * A [WriteSupport] implementation for a [HostTableReader]. + */ + private class HostDataWriteSupport : WriteSupport() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: HostTableReader) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, data: HostTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("host_id", 1) + consumer.addBinary(Binary.fromString(data.host.id)) + consumer.endField("host_id", 1) + + consumer.startField("cpu_count", 2) + consumer.addInteger(data.host.cpuCount) + consumer.endField("cpu_count", 2) + + consumer.startField("mem_capacity", 3) + consumer.addLong(data.host.memCapacity) + consumer.endField("mem_capacity", 3) + + consumer.startField("guests_terminated", 4) + consumer.addInteger(data.guestsTerminated) + consumer.endField("guests_terminated", 4) + + consumer.startField("guests_running", 5) + consumer.addInteger(data.guestsRunning) + consumer.endField("guests_running", 5) + + consumer.startField("guests_error", 6) + consumer.addInteger(data.guestsError) + consumer.endField("guests_error", 6) + + consumer.startField("guests_invalid", 7) + consumer.addInteger(data.guestsInvalid) + consumer.endField("guests_invalid", 7) + + consumer.startField("cpu_limit", 8) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 8) + + consumer.startField("cpu_usage", 9) + consumer.addDouble(data.cpuUsage) + consumer.endField("cpu_usage", 9) + + consumer.startField("cpu_demand", 10) + consumer.addDouble(data.cpuUsage) + consumer.endField("cpu_demand", 10) + + consumer.startField("cpu_utilization", 11) + consumer.addDouble(data.cpuUtilization) + consumer.endField("cpu_utilization", 11) + + consumer.startField("cpu_time_active", 12) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 12) + + consumer.startField("cpu_time_idle", 13) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 13) + + consumer.startField("cpu_time_steal", 14) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 14) + + consumer.startField("cpu_time_lost", 15) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 15) + + consumer.startField("power_total", 16) + consumer.addDouble(data.powerTotal) + consumer.endField("power_total", 16) + + consumer.startField("uptime", 17) + consumer.addLong(data.uptime) + consumer.endField("uptime", 17) + + consumer.startField("downtime", 18) + consumer.addLong(data.downtime) + consumer.endField("downtime", 18) + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 19) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 19) + } + + consumer.endMessage() + } + } + + private companion object { + /** + * The schema of the host data. + */ + val SCHEMA: MessageType = Types + .buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_terminated"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_running"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_error"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_invalid"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_demand"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_utilization"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("power_total"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time") + ) + .named("host") + } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..6645028e --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.trace.util.parquet.LocalParquetWriter +import java.io.File + +/** + * A Parquet event writer for [ServerTableReader]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, ServerDataWriteSupport(), bufferSize) { + + override fun buildWriter(builder: LocalParquetWriter.Builder): ParquetWriter { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun toString(): String = "server-writer" + + /** + * A [WriteSupport] implementation for a [ServerTableReader]. + */ + private class ServerDataWriteSupport : WriteSupport() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ServerTableReader) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, data: ServerTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("server_id", 1) + consumer.addBinary(Binary.fromString(data.server.id)) + consumer.endField("server_id", 1) + + consumer.startField("server_name", 2) + consumer.addBinary(Binary.fromString(data.server.name)) + consumer.endField("server_name", 2) + + val hostId = data.host?.id + if (hostId != null) { + consumer.startField("host_id", 3) + consumer.addBinary(Binary.fromString(hostId)) + consumer.endField("host_id", 3) + } + + consumer.startField("mem_capacity", 4) + consumer.addLong(data.server.memCapacity) + consumer.endField("mem_capacity", 4) + + consumer.startField("cpu_count", 5) + consumer.addInteger(data.server.cpuCount) + consumer.endField("cpu_count", 5) + + consumer.startField("cpu_limit", 6) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 6) + + consumer.startField("cpu_time_active", 7) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 7) + + consumer.startField("cpu_time_idle", 8) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 8) + + consumer.startField("cpu_time_steal", 9) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 9) + + consumer.startField("cpu_time_lost", 10) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 10) + + consumer.startField("uptime", 11) + consumer.addLong(data.uptime) + consumer.endField("uptime", 11) + + consumer.startField("downtime", 12) + consumer.addLong(data.downtime) + consumer.endField("downtime", 12) + + val provisionTime = data.provisionTime + if (provisionTime != null) { + consumer.startField("provision_time", 13) + consumer.addLong(provisionTime.toEpochMilli()) + consumer.endField("provision_time", 13) + } + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 14) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 14) + } + + consumer.endMessage() + } + } + + private companion object { + /** + * The schema of the server data. + */ + val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_name"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("provision_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time") + + ) + .named("server") + } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt new file mode 100644 index 00000000..6908a018 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.ServiceTableReader +import java.io.File + +/** + * A Parquet event writer for [ServiceTableReader]s. + */ +public class ParquetServiceDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, ServiceDataWriteSupport(), bufferSize) { + + override fun toString(): String = "service-writer" + + /** + * A [WriteSupport] implementation for a [ServiceTableReader]. + */ + private class ServiceDataWriteSupport : WriteSupport() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ServiceTableReader) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, data: ServiceTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("hosts_up", 1) + consumer.addInteger(data.hostsUp) + consumer.endField("hosts_up", 1) + + consumer.startField("hosts_down", 2) + consumer.addInteger(data.hostsDown) + consumer.endField("hosts_down", 2) + + consumer.startField("servers_pending", 3) + consumer.addInteger(data.serversPending) + consumer.endField("servers_pending", 3) + + consumer.startField("servers_active", 4) + consumer.addInteger(data.serversActive) + consumer.endField("servers_active", 4) + + consumer.startField("attempts_success", 5) + consumer.addInteger(data.attemptsSuccess) + consumer.endField("attempts_pending", 5) + + consumer.startField("attempts_failure", 6) + consumer.addInteger(data.attemptsFailure) + consumer.endField("attempts_failure", 6) + + consumer.startField("attempts_error", 7) + consumer.addInteger(data.attemptsError) + consumer.endField("attempts_error", 7) + + consumer.endMessage() + } + } + + private companion object { + private val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_up"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_down"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_pending"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_success"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_failure"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_error") + ) + .named("service") + } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt new file mode 100644 index 00000000..a2e82df1 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.io.api.Binary +import java.nio.ByteBuffer +import java.util.UUID + +/** + * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet. + */ +internal fun UUID.toBinary(): Binary { + val bb = ByteBuffer.allocate(16) + bb.putLong(mostSignificantBits) + bb.putLong(leastSignificantBits) + bb.rewind() + return Binary.fromConstantByteBuffer(bb) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostInfo.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostInfo.kt new file mode 100644 index 00000000..58b7853d --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostInfo.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.table + +/** + * Information about a host exposed to the telemetry service. + */ +public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt new file mode 100644 index 00000000..3761b4b3 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface HostTableReader { + + public fun copy(): HostTableReader + + public fun setValues(table: HostTableReader) + + /** + * The [HostInfo] of the host to which the row belongs to. + */ + public val host: HostInfo + + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The number of guests that are in a terminated state. + */ + public val guestsTerminated: Int + + /** + * The number of guests that are in a running state. + */ + public val guestsRunning: Int + + /** + * The number of guests that are in an error state. + */ + public val guestsError: Int + + /** + * The number of guests that are in an unknown state. + */ + public val guestsInvalid: Int + + /** + * The capacity of the CPUs in the host (in MHz). + */ + public val cpuLimit: Double + + /** + * The usage of all CPUs in the host (in MHz). + */ + public val cpuUsage: Double + + /** + * The demand of all vCPUs of the guests (in MHz) + */ + public val cpuDemand: Double + + /** + * The CPU utilization of the host. + */ + public val cpuUtilization: Double + + /** + * The duration (in seconds) that a CPU was active in the host. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the host. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The current power usage of the host in W. + */ + public val powerUsage: Double + + /** + * The total power consumption of the host since last time in J. + */ + public val powerTotal: Double + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the host booted. + */ + public val bootTime: Instant? +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt new file mode 100644 index 00000000..96c5bb13 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.table + +/** + * Static information about a server exposed to the telemetry service. + */ +public data class ServerInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long +) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt new file mode 100644 index 00000000..1d1ecd10 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a server trace entry. + */ +public interface ServerTableReader { + + public fun copy(): ServerTableReader + + public fun setValues(table: ServerTableReader) + + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [ServerInfo] of the server to which the row belongs to. + */ + public val server: ServerInfo + + /** + * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the server was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the server booted. + */ + public val bootTime: Instant? + + /** + * The capacity of the CPUs of the servers (in MHz). + */ + public val cpuLimit: Double + + /** + * The duration (in seconds) that a CPU was active in the server. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the server. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt new file mode 100644 index 00000000..0d8b2abd --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.table + +import java.time.Instant + +/** + * A trace entry for the compute service. + */ +public data class ServiceData( + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val serversTotal: Int, + val serversPending: Int, + val serversActive: Int, + val attemptsSuccess: Int, + val attemptsFailure: Int, + val attemptsError: Int +) + +/** + * Convert a [ServiceTableReader] into a persistent object. + */ +public fun ServiceTableReader.toServiceData(): ServiceData { + return ServiceData( + timestamp, + hostsUp, + hostsDown, + serversTotal, + serversPending, + serversActive, + attemptsSuccess, + attemptsFailure, + attemptsError + ) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt new file mode 100644 index 00000000..90b0c6ea --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a service trace entry. + */ +public interface ServiceTableReader { + + public fun copy(): ServiceTableReader + + public fun setValues(table: ServiceTableReader) + + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The number of hosts that are up at this instant. + */ + public val hostsUp: Int + + /** + * The number of hosts that are down at this instant. + */ + public val hostsDown: Int + + /** + * The number of servers that are registered with the compute service.. + */ + public val serversTotal: Int + + /** + * The number of servers that are pending to be scheduled. + */ + public val serversPending: Int + + /** + * The number of servers that are currently active. + */ + public val serversActive: Int + + /** + * The scheduling attempts that were successful. + */ + public val attemptsSuccess: Int + + /** + * The scheduling attempts that were unsuccessful due to client error. + */ + public val attemptsFailure: Int + + /** + * The scheduling attempts that were unsuccessful due to scheduler error. + */ + public val attemptsError: Int +} diff --git a/opendc-compute/opendc-compute-telemetry/src/test/resources/log4j2.xml b/opendc-compute/opendc-compute-telemetry/src/test/resources/log4j2.xml new file mode 100644 index 00000000..0dfb75f2 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/test/resources/log4j2.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + diff --git a/opendc-compute/opendc-compute-topology/build.gradle.kts b/opendc-compute/opendc-compute-topology/build.gradle.kts new file mode 100644 index 00000000..d4c084c0 --- /dev/null +++ b/opendc-compute/opendc-compute-topology/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "OpenDC Compute Topology implementation" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(projects.opendcCompute.opendcComputeApi) + implementation(projects.opendcCommon) + implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-compute"))) + + implementation(libs.jackson.dataformat.csv) + testImplementation(projects.opendcSimulator.opendcSimulatorCore) +} diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpec.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpec.kt new file mode 100644 index 00000000..e36c4e1e --- /dev/null +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpec.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.topology + +/** + * Definition of a compute cluster modeled in the simulation. + * + * @param id A unique identifier representing the compute cluster. + * @param name The name of the cluster. + * @param cpuCount The total number of CPUs in the cluster. + * @param cpuSpeed The speed of a CPU in the cluster in MHz. + * @param memCapacity The total memory capacity of the cluster (in MiB). + * @param hostCount The number of hosts in the cluster. + * @param memCapacityPerHost The memory capacity per host in the cluster (MiB). + * @param cpuCountPerHost The number of CPUs per host in the cluster. + */ +public data class ClusterSpec( + val id: String, + val name: String, + val cpuCount: Int, + val cpuSpeed: Double, + val memCapacity: Double, + val hostCount: Int, + val memCapacityPerHost: Double, + val cpuCountPerHost: Int +) diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpecReader.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpecReader.kt new file mode 100644 index 00000000..a1e9bc3d --- /dev/null +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpecReader.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.topology + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.MappingIterator +import com.fasterxml.jackson.databind.ObjectReader +import com.fasterxml.jackson.dataformat.csv.CsvMapper +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import java.io.File +import java.io.InputStream + +/** + * A helper class for reading a cluster specification file. + */ +public class ClusterSpecReader { + /** + * The [CsvMapper] to map the environment file to an object. + */ + private val mapper = CsvMapper() + + /** + * The [ObjectReader] to convert the lines into objects. + */ + private val reader: ObjectReader = mapper.readerFor(Entry::class.java).with(schema) + + /** + * Read the specified [file]. + */ + public fun read(file: File): List { + return reader.readValues(file).use { read(it) } + } + + /** + * Read the specified [input]. + */ + public fun read(input: InputStream): List { + return reader.readValues(input).use { read(it) } + } + + /** + * Convert the specified [MappingIterator] into a list of [ClusterSpec]s. + */ + private fun read(it: MappingIterator): List { + val result = mutableListOf() + + for (entry in it) { + val def = ClusterSpec( + entry.id, + entry.name, + entry.cpuCount, + entry.cpuSpeed * 1000, // Convert to MHz + entry.memCapacity * 1000, // Convert to MiB + entry.hostCount, + entry.memCapacityPerHost * 1000, + entry.cpuCountPerHost + ) + result.add(def) + } + + return result + } + + private open class Entry( + @JsonProperty("ClusterID") + val id: String, + @JsonProperty("ClusterName") + val name: String, + @JsonProperty("Cores") + val cpuCount: Int, + @JsonProperty("Speed") + val cpuSpeed: Double, + @JsonProperty("Memory") + val memCapacity: Double, + @JsonProperty("numberOfHosts") + val hostCount: Int, + @JsonProperty("memoryCapacityPerHost") + val memCapacityPerHost: Double, + @JsonProperty("coreCountPerHost") + val cpuCountPerHost: Int + ) + + public companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("ClusterID", CsvSchema.ColumnType.STRING) + .addColumn("ClusterName", CsvSchema.ColumnType.STRING) + .addColumn("Cores", CsvSchema.ColumnType.NUMBER) + .addColumn("Speed", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory", CsvSchema.ColumnType.NUMBER) + .addColumn("numberOfHosts", CsvSchema.ColumnType.NUMBER) + .addColumn("memoryCapacityPerHost", CsvSchema.ColumnType.NUMBER) + .addColumn("coreCountPerHost", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .setColumnSeparator(';') + .setUseHeader(true) + .build() + } +} diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/HostSpec.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/HostSpec.kt new file mode 100644 index 00000000..596121b0 --- /dev/null +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/HostSpec.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.topology + +import org.opendc.simulator.compute.SimPsuFactories +import org.opendc.simulator.compute.SimPsuFactory +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory +import java.util.UUID + +/** + * Description of a physical host that will be simulated by OpenDC and host the virtual machines. + * + * @param uid Unique identifier of the host. + * @param name The name of the host. + * @param meta The metadata of the host. + * @param model The physical model of the machine. + * @param psuFactory The [SimPsuFactory] to construct the PSU that models the power consumption of the machine. + * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host. + */ +public data class HostSpec( + val uid: UUID, + val name: String, + val meta: Map, + val model: MachineModel, + val psuFactory: SimPsuFactory = SimPsuFactories.noop(), + val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer() +) diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt new file mode 100644 index 00000000..5f0fe511 --- /dev/null +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TopologyFactories") + +package org.opendc.compute.topology + +import org.opendc.simulator.compute.SimPsuFactories +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.CpuPowerModel +import org.opendc.simulator.compute.power.CpuPowerModels +import java.io.File +import java.io.InputStream +import java.util.SplittableRandom +import java.util.UUID +import java.util.random.RandomGenerator +import kotlin.math.roundToLong + +/** + * A [ClusterSpecReader] that is used to read the cluster definition file. + */ +private val reader = ClusterSpecReader() + +/** + * Construct a topology from the specified [file]. + */ +public fun clusterTopology( + file: File, + powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0), + random: RandomGenerator = SplittableRandom(0) +): List { + return clusterTopology(reader.read(file), powerModel, random) +} + +/** + * Construct a topology from the specified [input]. + */ +public fun clusterTopology( + input: InputStream, + powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0), + random: RandomGenerator = SplittableRandom(0) +): List { + return clusterTopology(reader.read(input), powerModel, random) +} + +/** + * Construct a topology from the given list of [clusters]. + */ +public fun clusterTopology(clusters: List, powerModel: CpuPowerModel, random: RandomGenerator = SplittableRandom(0)): List { + return clusters.flatMap { it.toHostSpecs(random, powerModel) } +} + +/** + * Helper method to convert a [ClusterSpec] into a list of [HostSpec]s. + */ +private fun ClusterSpec.toHostSpecs(random: RandomGenerator, powerModel: CpuPowerModel): List { + val cpuSpeed = cpuSpeed + val memoryPerHost = memCapacityPerHost.roundToLong() + + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cpuCountPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + val machineModel = MachineModel( + List(cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) }, + listOf(unknownMemoryUnit) + ) + + return List(hostCount) { + HostSpec( + UUID(random.nextLong(), it.toLong()), + "node-$name-$it", + mapOf("cluster" to id), + machineModel, + SimPsuFactories.simple(powerModel) + ) + } +} diff --git a/opendc-compute/opendc-compute-topology/src/test/resources/log4j2.xml b/opendc-compute/opendc-compute-topology/src/test/resources/log4j2.xml new file mode 100644 index 00000000..0dfb75f2 --- /dev/null +++ b/opendc-compute/opendc-compute-topology/src/test/resources/log4j2.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts new file mode 100644 index 00000000..905f905c --- /dev/null +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "OpenDC Compute Service implementation" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(projects.opendcCompute.opendcComputeApi) + implementation(projects.opendcCommon) + implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-api"))) + implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-compute"))) + + implementation(libs.kotlin.logging) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt new file mode 100644 index 00000000..a802afdb --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import java.util.random.RandomGenerator + +/** + * An interface that describes how a workload is resolved. + */ +public interface ComputeWorkload { + /** + * Resolve the workload into a list of [VirtualMachine]s to simulate. + */ + public fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt new file mode 100644 index 00000000..c5fb3e56 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import mu.KotlinLogging +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.trace.Trace +import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS +import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE +import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET +import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY +import org.opendc.trace.conv.RESOURCE_CPU_COUNT +import org.opendc.trace.conv.RESOURCE_ID +import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY +import org.opendc.trace.conv.RESOURCE_START_TIME +import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE +import org.opendc.trace.conv.RESOURCE_STATE_DURATION +import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.RESOURCE_STOP_TIME +import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS +import org.opendc.trace.conv.TABLE_RESOURCES +import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import java.io.File +import java.lang.ref.SoftReference +import java.time.Duration +import java.time.Instant +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import kotlin.math.roundToLong + +/** + * A helper class for loading compute workload traces into memory. + * + * @param baseDir The directory containing the traces. + */ +public class ComputeWorkloadLoader(private val baseDir: File) { + /** + * The logger for this instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * The cache of workloads. + */ + private val cache = ConcurrentHashMap>>() + + /** + * Read the fragments into memory. + */ + private fun parseFragments(trace: Trace): Map { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val durationCol = reader.resolve(RESOURCE_STATE_DURATION) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + + val fragments = mutableMapOf() + + return try { + while (reader.nextRow()) { + val id = reader.getString(idCol)!! + val time = reader.getInstant(timestampCol)!! + val durationMs = reader.getDuration(durationCol)!! + val cores = reader.getInt(coresCol) + val cpuUsage = reader.getDouble(usageCol) + + val builder = fragments.computeIfAbsent(id) { Builder() } + builder.add(time, durationMs, cpuUsage, cores) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(trace: Trace, fragments: Map, interferenceModel: VmInterferenceModel): List { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val startTimeCol = reader.resolve(RESOURCE_START_TIME) + val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) + val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) + + var counter = 0 + val entries = mutableListOf() + + return try { + while (reader.nextRow()) { + val id = reader.getString(idCol)!! + if (!fragments.containsKey(id)) { + continue + } + + val submissionTime = reader.getInstant(startTimeCol)!! + val endTime = reader.getInstant(stopTimeCol)!! + val cpuCount = reader.getInt(cpuCountCol) + val cpuCapacity = reader.getDouble(cpuCapacityCol) + val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + val builder = fragments.getValue(id) + val totalLoad = builder.totalLoad + + entries.add( + VirtualMachine( + uid, + id, + cpuCount, + cpuCapacity, + memCapacity.roundToLong(), + totalLoad, + submissionTime, + endTime, + builder.build(), + interferenceModel.getProfile(id) + ) + ) + } + + // Make sure the virtual machines are ordered by start time + entries.sortBy { it.startTime } + + entries + } catch (e: Exception) { + e.printStackTrace() + throw e + } finally { + reader.close() + } + } + + /** + * Read the interference model associated with the specified [trace]. + */ + private fun parseInterferenceModel(trace: Trace): VmInterferenceModel { + val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader() + + return try { + val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS) + val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET) + val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE) + + val modelBuilder = VmInterferenceModel.builder() + + while (reader.nextRow()) { + val members = reader.getSet(membersCol, String::class.java)!! + val target = reader.getDouble(targetCol) + val score = reader.getDouble(scoreCol) + + modelBuilder + .addGroup(members, target, score) + } + + modelBuilder.build() + } finally { + reader.close() + } + } + + /** + * Load the trace with the specified [name] and [format]. + */ + public fun get(name: String, format: String): List { + val ref = cache.compute(name) { key, oldVal -> + val inst = oldVal?.get() + if (inst == null) { + val path = baseDir.resolve(key) + + logger.info { "Loading trace $key at $path" } + + val trace = Trace.open(path, format) + val fragments = parseFragments(trace) + val interferenceModel = parseInterferenceModel(trace) + val vms = parseMeta(trace, fragments, interferenceModel) + + SoftReference(vms) + } else { + oldVal + } + } + + return checkNotNull(ref?.get()) { "Memory pressure" } + } + + /** + * Clear the workload cache. + */ + public fun reset() { + cache.clear() + } + + /** + * A builder for a VM trace. + */ + private class Builder { + /** + * The total load of the trace. + */ + @JvmField var totalLoad: Double = 0.0 + + /** + * The internal builder for the trace. + */ + private val builder = SimTrace.builder() + + /** + * The deadline of the previous fragment. + */ + private var previousDeadline = Long.MIN_VALUE + + /** + * Add a fragment to the trace. + * + * @param timestamp Timestamp at which the fragment starts (in epoch millis). + * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param usage CPU usage of this fragment. + * @param cores Number of cores used. + */ + fun add(deadline: Instant, duration: Duration, usage: Double, cores: Int) { + val startTimeMs = (deadline - duration).toEpochMilli() + totalLoad += (usage * duration.toMillis()) / 1000.0 // avg MHz * duration = MFLOPs + + if ((startTimeMs != previousDeadline) && (previousDeadline != Long.MIN_VALUE)) { + // There is a gap between the previous and current fragment; fill the gap + builder.add(startTimeMs, 0.0, cores) + } + + builder.add(deadline.toEpochMilli(), usage, cores) + previousDeadline = deadline.toEpochMilli() + } + + /** + * Build the trace. + */ + fun build(): SimTrace = builder.build() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt new file mode 100644 index 00000000..61a6e3a0 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeWorkloads") + +package org.opendc.compute.workload + +import org.opendc.compute.workload.internal.CompositeComputeWorkload +import org.opendc.compute.workload.internal.HpcSampledComputeWorkload +import org.opendc.compute.workload.internal.LoadSampledComputeWorkload +import org.opendc.compute.workload.internal.TraceComputeWorkload + +/** + * Construct a workload from a trace. + */ +public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format) + +/** + * Construct a composite workload with the specified fractions. + */ +public fun composite(vararg pairs: Pair): ComputeWorkload { + return CompositeComputeWorkload(pairs.toMap()) +} + +/** + * Sample a workload by a [fraction] of the total load. + */ +public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload { + return LoadSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC VMs (count) + */ +public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC load + */ +public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction, sampleLoad = true) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt new file mode 100644 index 00000000..622b3c55 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile +import org.opendc.simulator.compute.workload.SimTrace +import java.time.Instant +import java.util.UUID + +/** + * A virtual machine workload. + * + * @param uid The unique identifier of the virtual machine. + * @param name The name of the virtual machine. + * @param cpuCapacity The required CPU capacity for the VM in MHz. + * @param cpuCount The number of vCPUs in the VM. + * @param memCapacity The provisioned memory for the VM in MB. + * @param startTime The start time of the VM. + * @param stopTime The stop time of the VM. + * @param trace The trace that belong to this VM. + * @param interferenceProfile The interference profile of this virtual machine. + */ +public data class VirtualMachine( + val uid: UUID, + val name: String, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Long, + val totalLoad: Double, + val startTime: Instant, + val stopTime: Instant, + val trace: SimTrace, + val interferenceProfile: VmInterferenceProfile? +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt new file mode 100644 index 00000000..1ac5f4ad --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.random.RandomGenerator + +/** + * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads. + */ +internal class CompositeComputeWorkload(val sources: Map) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List { + val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } + + val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } + + val res = mutableListOf() + + for ((fraction, vms) in traces) { + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + } + + val vmCount = traces.sumOf { (_, vms) -> vms.size } + logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt new file mode 100644 index 00000000..fdb599c1 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.UUID +import java.util.random.RandomGenerator + +/** + * A [ComputeWorkload] that samples HPC VMs in the workload. + * + * @param fraction The fraction of load/virtual machines to sample + * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs. + */ +internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + /** + * The pattern to match compute nodes in the workload. + */ + private val pattern = Regex("^(ComputeNode|cn).*") + + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List { + val vms = source.resolve(loader, random) + + val (hpc, nonHpc) = vms.partition { entry -> + val name = entry.name + name.matches(pattern) + } + + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf() + hpc.mapTo(res) { sample(it, index) } + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf() + nonHpc.mapTo(res) { sample(it, index) } + res + } + .flatten() + + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } + + val totalLoad = vms.sumOf { it.totalLoad } + + logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 + + val res = mutableListOf() + + if (sampleLoad) { + var currentLoad = 0.0 + for (entry in hpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + hpcLoad += entryLoad + hpcCount += 1 + currentLoad += entryLoad + res += entry + } + + for (entry in nonHpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > 1) { + break + } + + nonHpcLoad += entryLoad + nonHpcCount += 1 + currentLoad += entryLoad + res += entry + } + } else { + hpcSequence + .take((fraction * vms.size).toInt()) + .forEach { entry -> + hpcLoad += entry.totalLoad + hpcCount += 1 + res.add(entry) + } + + nonHpcSequence + .take(((1 - fraction) * vms.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.totalLoad + nonHpcCount += 1 + res.add(entry) + } + } + + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } + + /** + * Sample a random trace entry. + */ + private fun sample(entry: VirtualMachine, i: Int): VirtualMachine { + val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) + return entry.copy(uid = uid) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt new file mode 100644 index 00000000..6014f37a --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.random.RandomGenerator + +/** + * A [ComputeWorkload] that is sampled based on total load. + */ +internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List { + val vms = source.resolve(loader, random) + val res = mutableListOf() + + val totalLoad = vms.sumOf { it.totalLoad } + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt new file mode 100644 index 00000000..ff88fa3e --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.random.RandomGenerator + +/** + * A [ComputeWorkload] from a trace. + */ +internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List { + return loader.get(name, format) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/log4j2.xml b/opendc-compute/opendc-compute-workload/src/test/resources/log4j2.xml new file mode 100644 index 00000000..0dfb75f2 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/resources/log4j2.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + -- cgit v1.2.3