diff options
Diffstat (limited to 'opendc-experiments')
47 files changed, 3006 insertions, 39 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index b19ce750..e19784ba 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -32,9 +32,8 @@ plugins { } dependencies { - api(projects.opendcCompute.opendcComputeWorkload) + api(projects.opendcExperiments.opendcExperimentsCompute) - implementation(projects.opendcExperiments.opendcExperimentsCompute) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcCompute.opendcComputeSimulator) diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 378c3833..db56f75d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -28,11 +28,9 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.workload.* -import org.opendc.compute.workload.topology.Topology import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.experiments.compute.setupComputeService -import org.opendc.experiments.compute.setupHosts +import org.opendc.experiments.compute.* +import org.opendc.experiments.compute.topology.Topology import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.core.runBlockingSimulation import org.openjdk.jmh.annotations.* diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index 6bd470f3..e019af34 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -23,14 +23,10 @@ package org.opendc.experiments.capelin import org.opendc.compute.service.ComputeService -import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.createComputeScheduler -import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor -import org.opendc.compute.workload.grid5000 -import org.opendc.compute.workload.replay import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.experiments.compute.* +import org.opendc.experiments.compute.export.parquet.ParquetComputeMonitor import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.core.runBlockingSimulation import java.io.File diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt index a2e71243..ed2588f0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.model -import org.opendc.compute.workload.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkload /** * A single workload originating from a trace. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt index 68eb15b3..80b8859c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt @@ -22,12 +22,12 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.composite -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.composite +import org.opendc.experiments.compute.trace /** * A [Portfolio] that explores the effect of a composite workload. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt index 0d7f3072..f3c002ac 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt @@ -22,12 +22,12 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.sampleByLoad -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.sampleByLoad +import org.opendc.experiments.compute.trace /** * A [Portfolio] that explores the difference between horizontal and vertical scaling. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt index 6afffc09..22f9f3ac 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt @@ -22,13 +22,13 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.sampleByHpc -import org.opendc.compute.workload.sampleByHpcLoad -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.sampleByHpc +import org.opendc.experiments.compute.sampleByHpcLoad +import org.opendc.experiments.compute.trace /** * A [Portfolio] to explore the effect of HPC workloads. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt index 92bf80b3..e63a5807 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt @@ -22,12 +22,12 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.sampleByLoad -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.sampleByLoad +import org.opendc.experiments.compute.trace /** * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines). diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt index f9a9d681..12570108 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt @@ -22,12 +22,12 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.sampleByLoad -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.sampleByLoad +import org.opendc.experiments.compute.trace /** * A [Portfolio] that explores the effect of operational phenomena on metrics. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt index 944e9f43..6f126b87 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt @@ -22,11 +22,11 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.trace /** * A [Portfolio] to perform a simple test run. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt index 5ab4261a..d152a6db 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt @@ -23,8 +23,8 @@ @file:JvmName("TopologyFactories") package org.opendc.experiments.capelin.topology -import org.opendc.compute.workload.topology.HostSpec -import org.opendc.compute.workload.topology.Topology +import org.opendc.experiments.compute.topology.HostSpec +import org.opendc.experiments.compute.topology.Topology import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 3407e30f..ae1ca807 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,13 +32,12 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.ComputeMonitor -import org.opendc.compute.workload.telemetry.table.HostTableReader -import org.opendc.compute.workload.telemetry.table.ServiceTableReader -import org.opendc.compute.workload.topology.Topology import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.experiments.compute.* +import org.opendc.experiments.compute.telemetry.ComputeMonitor +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader +import org.opendc.experiments.compute.topology.Topology import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.core.runBlockingSimulation import java.io.File diff --git a/opendc-experiments/opendc-experiments-compute/build.gradle.kts b/opendc-experiments/opendc-experiments-compute/build.gradle.kts index 70f16199..5cae1d43 100644 --- a/opendc-experiments/opendc-experiments-compute/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-compute/build.gradle.kts @@ -30,6 +30,16 @@ plugins { } dependencies { + api(projects.opendcCompute.opendcComputeService) api(projects.opendcExperiments.opendcExperimentsBase) - api(projects.opendcCompute.opendcComputeWorkload) + api(projects.opendcCompute.opendcComputeSimulator) + + implementation(projects.opendcTrace.opendcTraceApi) + implementation(projects.opendcTrace.opendcTraceParquet) + implementation(projects.opendcSimulator.opendcSimulatorCore) + implementation(projects.opendcSimulator.opendcSimulatorCompute) + + implementation(libs.kotlin.logging) + + testImplementation(libs.slf4j.simple) } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt new file mode 100644 index 00000000..1731a4ac --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.experiments.compute + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (name) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(placements) + else -> throw IllegalArgumentException("Unknown policy $name") + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt index ce36fac8..3ae4b0df 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt @@ -25,9 +25,9 @@ package org.opendc.experiments.compute import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.telemetry.ComputeMetricReader -import org.opendc.compute.workload.telemetry.ComputeMonitor -import org.opendc.compute.workload.topology.HostSpec +import org.opendc.experiments.compute.telemetry.ComputeMonitor +import org.opendc.experiments.compute.telemetry.ComputeMonitorProvisioningStep +import org.opendc.experiments.compute.topology.HostSpec import org.opendc.experiments.provisioner.ProvisioningContext import org.opendc.experiments.provisioner.ProvisioningStep import java.time.Duration diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt new file mode 100644 index 00000000..3db980ca --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute + +import java.util.* + +/** + * An interface that describes how a workload is resolved. + */ +public interface ComputeWorkload { + /** + * Resolve the workload into a list of [VirtualMachine]s to simulate. + */ + public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt new file mode 100644 index 00000000..f92e10e3 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute + +import mu.KotlinLogging +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.trace.* +import org.opendc.trace.conv.* +import java.io.File +import java.lang.ref.SoftReference +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import kotlin.math.max +import kotlin.math.roundToLong + +/** + * A helper class for loading compute workload traces into memory. + * + * @param baseDir The directory containing the traces. + */ +public class ComputeWorkloadLoader(private val baseDir: File) { + /** + * The logger for this instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * The cache of workloads. + */ + private val cache = ConcurrentHashMap<String, SoftReference<List<VirtualMachine>>>() + + /** + * Read the fragments into memory. + */ + private fun parseFragments(trace: Trace): Map<String, Builder> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val durationCol = reader.resolve(RESOURCE_STATE_DURATION) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + + val fragments = mutableMapOf<String, Builder>() + + return try { + while (reader.nextRow()) { + val id = reader.getString(idCol)!! + val time = reader.getInstant(timestampCol)!! + val duration = reader.getDuration(durationCol)!! + val cores = reader.getInt(coresCol) + val cpuUsage = reader.getDouble(usageCol) + + val deadlineMs = time.toEpochMilli() + val timeMs = (time - duration).toEpochMilli() + val builder = fragments.computeIfAbsent(id) { Builder() } + builder.add(timeMs, deadlineMs, cpuUsage, cores) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(trace: Trace, fragments: Map<String, Builder>, interferenceModel: VmInterferenceModel): List<VirtualMachine> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val startTimeCol = reader.resolve(RESOURCE_START_TIME) + val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) + val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) + + var counter = 0 + val entries = mutableListOf<VirtualMachine>() + + return try { + while (reader.nextRow()) { + + val id = reader.getString(idCol)!! + if (!fragments.containsKey(id)) { + continue + } + + val submissionTime = reader.getInstant(startTimeCol)!! + val endTime = reader.getInstant(stopTimeCol)!! + val cpuCount = reader.getInt(cpuCountCol) + val cpuCapacity = reader.getDouble(cpuCapacityCol) + val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + val builder = fragments.getValue(id) + val totalLoad = builder.totalLoad + + entries.add( + VirtualMachine( + uid, + id, + cpuCount, + cpuCapacity, + memCapacity.roundToLong(), + totalLoad, + submissionTime, + endTime, + builder.build(), + interferenceModel.getProfile(id) + ) + ) + } + + // Make sure the virtual machines are ordered by start time + entries.sortBy { it.startTime } + + entries + } catch (e: Exception) { + e.printStackTrace() + throw e + } finally { + reader.close() + } + } + + /** + * Read the interference model associated with the specified [trace]. + */ + private fun parseInterferenceModel(trace: Trace): VmInterferenceModel { + val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader() + + return try { + val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS) + val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET) + val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE) + + val modelBuilder = VmInterferenceModel.builder() + + while (reader.nextRow()) { + val members = reader.getSet(membersCol, String::class.java)!! + val target = reader.getDouble(targetCol) + val score = reader.getDouble(scoreCol) + + modelBuilder + .addGroup(members, target, score) + } + + modelBuilder.build() + } finally { + reader.close() + } + } + + /** + * Load the trace with the specified [name] and [format]. + */ + public fun get(name: String, format: String): List<VirtualMachine> { + val ref = cache.compute(name) { key, oldVal -> + val inst = oldVal?.get() + if (inst == null) { + + val path = baseDir.resolve(key) + + logger.info { "Loading trace $key at $path" } + + val trace = Trace.open(path, format) + val fragments = parseFragments(trace) + val interferenceModel = parseInterferenceModel(trace) + val vms = parseMeta(trace, fragments, interferenceModel) + + SoftReference(vms) + } else { + oldVal + } + } + + return checkNotNull(ref?.get()) { "Memory pressure" } + } + + /** + * Clear the workload cache. + */ + public fun reset() { + cache.clear() + } + + /** + * A builder for a VM trace. + */ + private class Builder { + /** + * The total load of the trace. + */ + @JvmField var totalLoad: Double = 0.0 + + /** + * The internal builder for the trace. + */ + private val builder = SimTrace.builder() + + /** + * The deadline of the previous fragment. + */ + private var previousDeadline = Long.MIN_VALUE + + /** + * Add a fragment to the trace. + * + * @param timestamp Timestamp at which the fragment starts (in epoch millis). + * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param usage CPU usage of this fragment. + * @param cores Number of cores used. + */ + fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { + val duration = max(0, deadline - timestamp) + totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs + + if (timestamp != previousDeadline) { + // There is a gap between the previous and current fragment; fill the gap + builder.add(timestamp, 0.0, cores) + } + + builder.add(deadline, usage, cores) + previousDeadline = deadline + } + + /** + * Build the trace. + */ + fun build(): SimTrace = builder.build() + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt new file mode 100644 index 00000000..732f761e --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeWorkloads") +package org.opendc.experiments.compute + +import org.opendc.experiments.compute.internal.CompositeComputeWorkload +import org.opendc.experiments.compute.internal.HpcSampledComputeWorkload +import org.opendc.experiments.compute.internal.LoadSampledComputeWorkload +import org.opendc.experiments.compute.internal.TraceComputeWorkload + +/** + * Construct a workload from a trace. + */ +public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format) + +/** + * Construct a composite workload with the specified fractions. + */ +public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload { + return CompositeComputeWorkload(pairs.toMap()) +} + +/** + * Sample a workload by a [fraction] of the total load. + */ +public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload { + return LoadSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC VMs (count) + */ +public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC load + */ +public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction, sampleLoad = true) +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt new file mode 100644 index 00000000..f96b7e16 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import java.util.* +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +public interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt new file mode 100644 index 00000000..00bf44a1 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.experiments.compute + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import java.time.Clock +import java.time.Duration +import java.util.* +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +public fun grid5000(failureInterval: Duration): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService, + random: Random + ): HostFaultInjector { + val rng = Well19937c(random.nextLong()) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt index 6aca4ab7..28c9bc01 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.compute import org.opendc.compute.service.ComputeService import org.opendc.compute.simulator.SimHost -import org.opendc.compute.workload.topology.HostSpec +import org.opendc.experiments.compute.topology.HostSpec import org.opendc.experiments.provisioner.ProvisioningContext import org.opendc.experiments.provisioner.ProvisioningStep import org.opendc.simulator.compute.SimBareMetalMachine diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt new file mode 100644 index 00000000..0df9305a --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TraceHelpers") +package org.opendc.experiments.compute + +import kotlinx.coroutines.* +import org.opendc.compute.service.ComputeService +import org.opendc.simulator.compute.workload.SimTraceWorkload +import java.time.Clock +import java.util.* +import kotlin.coroutines.coroutineContext +import kotlin.math.max + +/** + * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. + * + * @param clock The simulation clock. + * @param trace The trace to simulate. + * @param seed The seed to use for randomness. + * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). + * @param failureModel A failure model to use for injecting failures. + * @param interference A flag to indicate that VM interference needs to be enabled. + */ +public suspend fun ComputeService.replay( + clock: Clock, + trace: List<VirtualMachine>, + seed: Long, + submitImmediately: Boolean = false, + failureModel: FailureModel? = null, + interference: Boolean = false +) { + val injector = failureModel?.createInjector(coroutineContext, clock, this, Random(seed)) + val client = newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + for (entry in trace.sortedBy { it.startTime }) { + val now = clock.millis() + val start = entry.startTime.toEpochMilli() + + if (offset < 0) { + offset = start - now + } + + // Make sure the trace entries are ordered by submission time + assert(start - offset >= 0) { "Invalid trace order" } + + if (!submitImmediately) { + delay(max(0, (start - offset) - now)) + } + + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload(entry.trace, workloadOffset) + val meta = mutableMapOf<String, Any>("workload" to workload) + + val interferenceProfile = entry.interferenceProfile + if (interference && interferenceProfile != null) { + meta["interference-profile"] = interferenceProfile + } + + launch { + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.cpuCount, + entry.memCapacity, + meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() + ), + meta = meta + ) + + // Wait for the server reach its end time + val endTime = entry.stopTime.toEpochMilli() + delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) + + // Stop the server after reaching the end-time of the virtual machine + server.stop() + } + } + } + + yield() + } finally { + injector?.close() + client.close() + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt new file mode 100644 index 00000000..3ed497a0 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute + +import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile +import org.opendc.simulator.compute.workload.SimTrace +import java.time.Instant +import java.util.* + +/** + * A virtual machine workload. + * + * @param uid The unique identifier of the virtual machine. + * @param name The name of the virtual machine. + * @param cpuCapacity The required CPU capacity for the VM in MHz. + * @param cpuCount The number of vCPUs in the VM. + * @param memCapacity The provisioned memory for the VM in MB. + * @param startTime The start time of the VM. + * @param stopTime The stop time of the VM. + * @param trace The trace that belong to this VM. + * @param interferenceProfile The interference profile of this virtual machine. + */ +public data class VirtualMachine( + val uid: UUID, + val name: String, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Long, + val totalLoad: Double, + val startTime: Instant, + val stopTime: Instant, + val trace: SimTrace, + val interferenceProfile: VmInterferenceProfile? +) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt new file mode 100644 index 00000000..a104851f --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.export.parquet + +import org.opendc.experiments.compute.telemetry.ComputeMonitor +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServerTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(reader: ServerTableReader) { + serverWriter.write(reader) + } + + override fun record(reader: HostTableReader) { + hostWriter.write(reader) + } + + override fun record(reader: ServiceTableReader) { + serviceWriter.write(reader) + } + + override fun close() { + hostWriter.close() + serviceWriter.close() + serverWriter.close() + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt new file mode 100644 index 00000000..60629a95 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.export.parquet + +import mu.KotlinLogging +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.util.parquet.LocalParquetWriter +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * A writer that writes data in Parquet format. + * + * @param path The path to the file to write the data to. + * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format. + */ +public abstract class ParquetDataWriter<in T>( + path: File, + private val writeSupport: WriteSupport<T>, + bufferSize: Int = 4096 +) : AutoCloseable { + /** + * The logging instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** + * The queue of records to process. + */ + private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) + + /** + * An exception to be propagated to the actual writer. + */ + private var exception: Throwable? = null + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = LocalParquetWriter.builder(path.toPath(), writeSupport) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf<T>() + var shouldStop = false + + try { + while (!shouldStop) { + try { + writer.write(queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + writer.write(data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> { + return builder.build() + } + + /** + * Write the specified metrics to the database. + */ + public fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) + } + + /** + * Signal the writer to stop. + */ + override fun close() { + writerThread.interrupt() + writerThread.join() + } + + init { + writerThread.start() + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt new file mode 100644 index 00000000..cf0a3bf2 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.export.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.trace.util.parquet.LocalParquetWriter +import java.io.File +import java.util.* + +/** + * A Parquet event writer for [HostTableReader]s. + */ +public class ParquetHostDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) { + + override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun toString(): String = "host-writer" + + /** + * A [WriteSupport] implementation for a [HostTableReader]. + */ + private class HostDataWriteSupport : WriteSupport<HostTableReader>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: HostTableReader) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, data: HostTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("host_id", 1) + consumer.addBinary(UUID.fromString(data.host.id).toBinary()) + consumer.endField("host_id", 1) + + consumer.startField("uptime", 2) + consumer.addLong(data.uptime) + consumer.endField("uptime", 2) + + consumer.startField("downtime", 3) + consumer.addLong(data.downtime) + consumer.endField("downtime", 3) + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 4) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 4) + } + + consumer.startField("cpu_count", 5) + consumer.addInteger(data.host.cpuCount) + consumer.endField("cpu_count", 5) + + consumer.startField("cpu_limit", 6) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 6) + + consumer.startField("cpu_time_active", 7) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 7) + + consumer.startField("cpu_time_idle", 8) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 8) + + consumer.startField("cpu_time_steal", 9) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 9) + + consumer.startField("cpu_time_lost", 10) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 10) + + consumer.startField("mem_limit", 11) + consumer.addLong(data.host.memCapacity) + consumer.endField("mem_limit", 11) + + consumer.startField("power_total", 12) + consumer.addDouble(data.powerTotal) + consumer.endField("power_total", 12) + + consumer.startField("guests_terminated", 13) + consumer.addInteger(data.guestsTerminated) + consumer.endField("guests_terminated", 13) + + consumer.startField("guests_running", 14) + consumer.addInteger(data.guestsRunning) + consumer.endField("guests_running", 14) + + consumer.startField("guests_error", 15) + consumer.addInteger(data.guestsError) + consumer.endField("guests_error", 15) + + consumer.startField("guests_invalid", 16) + consumer.addInteger(data.guestsInvalid) + consumer.endField("guests_invalid", 16) + + consumer.endMessage() + } + } + + private companion object { + /** + * The schema of the host data. + */ + val SCHEMA: MessageType = Types + .buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("power_total"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_terminated"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_running"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_error"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_invalid"), + ) + .named("host") + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..1622289e --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.export.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* +import org.opendc.experiments.compute.telemetry.table.ServerTableReader +import org.opendc.trace.util.parquet.LocalParquetWriter +import java.io.File +import java.util.* + +/** + * A Parquet event writer for [ServerTableReader]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) { + + override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun toString(): String = "server-writer" + + /** + * A [WriteSupport] implementation for a [ServerTableReader]. + */ + private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ServerTableReader) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, data: ServerTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("server_id", 1) + consumer.addBinary(UUID.fromString(data.server.id).toBinary()) + consumer.endField("server_id", 1) + + val hostId = data.host?.id + if (hostId != null) { + consumer.startField("host_id", 2) + consumer.addBinary(UUID.fromString(hostId).toBinary()) + consumer.endField("host_id", 2) + } + + consumer.startField("uptime", 3) + consumer.addLong(data.uptime) + consumer.endField("uptime", 3) + + consumer.startField("downtime", 4) + consumer.addLong(data.downtime) + consumer.endField("downtime", 4) + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 5) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 5) + } + + val provisionTime = data.provisionTime + if (provisionTime != null) { + consumer.startField("provision_time", 6) + consumer.addLong(provisionTime.toEpochMilli()) + consumer.endField("provision_time", 6) + } + + consumer.startField("cpu_count", 7) + consumer.addInteger(data.server.cpuCount) + consumer.endField("cpu_count", 7) + + consumer.startField("cpu_limit", 8) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 8) + + consumer.startField("cpu_time_active", 9) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 9) + + consumer.startField("cpu_time_idle", 10) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 10) + + consumer.startField("cpu_time_steal", 11) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 11) + + consumer.startField("cpu_time_lost", 12) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 12) + + consumer.startField("mem_limit", 13) + consumer.addLong(data.server.memCapacity) + consumer.endField("mem_limit", 13) + + consumer.endMessage() + } + } + + private companion object { + /** + * The schema of the server data. + */ + val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("server_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("provision_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_limit") + ) + .named("server") + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt new file mode 100644 index 00000000..0c466d39 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.export.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader +import java.io.File + +/** + * A Parquet event writer for [ServiceTableReader]s. + */ +public class ParquetServiceDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) { + + override fun toString(): String = "service-writer" + + /** + * A [WriteSupport] implementation for a [ServiceTableReader]. + */ + private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ServiceTableReader) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, data: ServiceTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("hosts_up", 1) + consumer.addInteger(data.hostsUp) + consumer.endField("hosts_up", 1) + + consumer.startField("hosts_down", 2) + consumer.addInteger(data.hostsDown) + consumer.endField("hosts_down", 2) + + consumer.startField("servers_pending", 3) + consumer.addInteger(data.serversPending) + consumer.endField("servers_pending", 3) + + consumer.startField("servers_active", 4) + consumer.addInteger(data.serversActive) + consumer.endField("servers_active", 4) + + consumer.startField("attempts_success", 5) + consumer.addInteger(data.attemptsSuccess) + consumer.endField("attempts_pending", 5) + + consumer.startField("attempts_failure", 6) + consumer.addInteger(data.attemptsFailure) + consumer.endField("attempts_failure", 6) + + consumer.startField("attempts_error", 7) + consumer.addInteger(data.attemptsError) + consumer.endField("attempts_error", 7) + + consumer.endMessage() + } + } + + private companion object { + private val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_up"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_down"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_pending"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_success"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_failure"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_error"), + ) + .named("service") + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt new file mode 100644 index 00000000..a3f2d597 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.export.parquet + +import org.apache.parquet.io.api.Binary +import java.nio.ByteBuffer +import java.util.UUID + +/** + * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet. + */ +internal fun UUID.toBinary(): Binary { + val bb = ByteBuffer.allocate(16) + bb.putLong(mostSignificantBits) + bb.putLong(leastSignificantBits) + bb.rewind() + return Binary.fromConstantByteBuffer(bb) +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt new file mode 100644 index 00000000..75779088 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.internal + +import mu.KotlinLogging +import org.opendc.experiments.compute.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkloadLoader +import org.opendc.experiments.compute.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads. + */ +internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } + + val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } + + val res = mutableListOf<VirtualMachine>() + + for ((fraction, vms) in traces) { + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + } + + val vmCount = traces.sumOf { (_, vms) -> vms.size } + logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt new file mode 100644 index 00000000..23efb154 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.internal + +import mu.KotlinLogging +import org.opendc.experiments.compute.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkloadLoader +import org.opendc.experiments.compute.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples HPC VMs in the workload. + * + * @param fraction The fraction of load/virtual machines to sample + * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs. + */ +internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + /** + * The pattern to match compute nodes in the workload. + */ + private val pattern = Regex("^(ComputeNode|cn).*") + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + + val (hpc, nonHpc) = vms.partition { entry -> + val name = entry.name + name.matches(pattern) + } + + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + hpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + nonHpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } + + val totalLoad = vms.sumOf { it.totalLoad } + + logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 + + val res = mutableListOf<VirtualMachine>() + + if (sampleLoad) { + var currentLoad = 0.0 + for (entry in hpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + hpcLoad += entryLoad + hpcCount += 1 + currentLoad += entryLoad + res += entry + } + + for (entry in nonHpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > 1) { + break + } + + nonHpcLoad += entryLoad + nonHpcCount += 1 + currentLoad += entryLoad + res += entry + } + } else { + hpcSequence + .take((fraction * vms.size).toInt()) + .forEach { entry -> + hpcLoad += entry.totalLoad + hpcCount += 1 + res.add(entry) + } + + nonHpcSequence + .take(((1 - fraction) * vms.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.totalLoad + nonHpcCount += 1 + res.add(entry) + } + } + + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } + + /** + * Sample a random trace entry. + */ + private fun sample(entry: VirtualMachine, i: Int): VirtualMachine { + val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) + return entry.copy(uid = uid) + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt new file mode 100644 index 00000000..4663c59e --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.internal + +import mu.KotlinLogging +import org.opendc.experiments.compute.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkloadLoader +import org.opendc.experiments.compute.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that is sampled based on total load. + */ +internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + val res = mutableListOf<VirtualMachine>() + + val totalLoad = vms.sumOf { it.totalLoad } + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt new file mode 100644 index 00000000..1cfee3bd --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.internal + +import org.opendc.experiments.compute.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkloadLoader +import org.opendc.experiments.compute.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] from a trace. + */ +internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + return loader.get(name, format) + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt new file mode 100644 index 00000000..088f98e9 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt @@ -0,0 +1,427 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.telemetry + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.experiments.compute.telemetry.table.* +import java.time.Clock +import java.time.Duration +import java.time.Instant + +/** + * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every + * export interval. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param clock The virtual clock. + * @param service The [ComputeService] to monitor. + * @param monitor The monitor to export the metrics to. + * @param exportInterval The export interval. + */ +public class ComputeMetricReader( + scope: CoroutineScope, + clock: Clock, + private val service: ComputeService, + private val monitor: ComputeMonitor, + private val exportInterval: Duration = Duration.ofMinutes(5) +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + + /** + * Aggregator for service metrics. + */ + private val serviceTableReader = ServiceTableReaderImpl(service) + + /** + * Mapping from [Host] instances to [HostTableReaderImpl] + */ + private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>() + + /** + * Mapping from [Server] instances to [ServerTableReaderImpl] + */ + private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>() + + /** + * The background job that is responsible for collecting the metrics every cycle. + */ + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + val service = service + val monitor = monitor + val hostTableReaders = hostTableReaders + val serverTableReaders = serverTableReaders + val serviceTableReader = serviceTableReader + + try { + while (isActive) { + delay(intervalMs) + + try { + val now = clock.instant() + + for (host in service.hosts) { + val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + for (server in service.servers) { + val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + serviceTableReader.record(now) + monitor.record(serviceTableReader) + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + } + } + } finally { + if (monitor is AutoCloseable) { + monitor.close() + } + } + } + + override fun close() { + job.cancel() + } + + /** + * An aggregator for service metrics before they are reported. + */ + private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val serversPending: Int + get() = _serversPending + private var _serversPending = 0 + + override val serversActive: Int + get() = _serversActive + private var _serversActive = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + private var _attemptsError = 0 + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + _timestamp = now + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _serversPending = stats.serversPending + _serversActive = stats.serversActive + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + _attemptsError = stats.attemptsError.toInt() + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostTableReaderImpl(host: Host) : HostTableReader { + private val _host = host + + override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + private var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerUsage: Double + get() = _powerUsage + private var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + private var _powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerUsage = hostSysStats.powerUsage + _powerTotal = hostSysStats.energyUsage + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + private val _server = server + + /** + * The static information about this server. + */ + override val server = ServerInfo( + server.uid.toString(), + server.name, + "vm", + "x86", + server.image.uid.toString(), + server.image.name, + server.flavor.cpuCount, + server.flavor.memorySize + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + override var host: HostInfo? = null + private var _host: Host? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val newHost = service.lookupHost(_server) + if (newHost != null && newHost.uid != _host?.uid) { + _host = newHost + host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) + } + + val cpuStats = _host?.getCpuStats(_server) + val sysStats = _host?.getSystemStats(_server) + + _timestamp = now + _cpuLimit = cpuStats?.capacity ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = _server.launchedAt + _bootTime = sysStats?.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0 + } + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt new file mode 100644 index 00000000..ff36bef3 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.telemetry + +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServerTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader + +/** + * A monitor that tracks the metrics and events of the OpenDC Compute service. + */ +public interface ComputeMonitor { + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: ServerTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: HostTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: ServiceTableReader) {} +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt index 0be4953b..68ca5ae8 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeMonitorProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt @@ -20,14 +20,12 @@ * SOFTWARE. */ -package org.opendc.experiments.compute +package org.opendc.experiments.compute.telemetry import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import org.opendc.compute.service.ComputeService -import org.opendc.compute.workload.telemetry.ComputeMetricReader -import org.opendc.compute.workload.telemetry.ComputeMonitor import org.opendc.experiments.provisioner.ProvisioningContext import org.opendc.experiments.provisioner.ProvisioningStep import java.time.Duration diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt new file mode 100644 index 00000000..84dd7a4f --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.telemetry.table + +/** + * Information about a host exposed to the telemetry service. + */ +public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt new file mode 100644 index 00000000..e6953550 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface HostTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [HostInfo] of the host to which the row belongs to. + */ + public val host: HostInfo + + /** + * The number of guests that are in a terminated state. + */ + public val guestsTerminated: Int + + /** + * The number of guests that are in a running state. + */ + public val guestsRunning: Int + + /** + * The number of guests that are in an error state. + */ + public val guestsError: Int + + /** + * The number of guests that are in an unknown state. + */ + public val guestsInvalid: Int + + /** + * The capacity of the CPUs in the host (in MHz). + */ + public val cpuLimit: Double + + /** + * The usage of all CPUs in the host (in MHz). + */ + public val cpuUsage: Double + + /** + * The demand of all vCPUs of the guests (in MHz) + */ + public val cpuDemand: Double + + /** + * The CPU utilization of the host. + */ + public val cpuUtilization: Double + + /** + * The duration (in seconds) that a CPU was active in the host. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the host. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The current power usage of the host in W. + */ + public val powerUsage: Double + + /** + * The total power consumption of the host since last time in J. + */ + public val powerTotal: Double + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the host booted. + */ + public val bootTime: Instant? +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt new file mode 100644 index 00000000..fc360fee --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.telemetry.table + +/** + * Static information about a server exposed to the telemetry service. + */ +public data class ServerInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long +) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt new file mode 100644 index 00000000..c4e2fb4c --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a server trace entry. + */ +public interface ServerTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [ServerInfo] of the server to which the row belongs to. + */ + public val server: ServerInfo + + /** + * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the server was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the server booted. + */ + public val bootTime: Instant? + + /** + * The capacity of the CPUs of the servers (in MHz). + */ + public val cpuLimit: Double + + /** + * The duration (in seconds) that a CPU was active in the server. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the server. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt new file mode 100644 index 00000000..394c6bd6 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.telemetry.table + +import java.time.Instant + +/** + * A trace entry for the compute service. + */ +public data class ServiceData( + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val serversPending: Int, + val serversActive: Int, + val attemptsSuccess: Int, + val attemptsFailure: Int, + val attemptsError: Int +) + +/** + * Convert a [ServiceTableReader] into a persistent object. + */ +public fun ServiceTableReader.toServiceData(): ServiceData { + return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt new file mode 100644 index 00000000..0155a879 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a service trace entry. + */ +public interface ServiceTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The number of hosts that are up at this instant. + */ + public val hostsUp: Int + + /** + * The number of hosts that are down at this instant. + */ + public val hostsDown: Int + + /** + * The number of servers that are pending to be scheduled. + */ + public val serversPending: Int + + /** + * The number of servers that are currently active. + */ + public val serversActive: Int + + /** + * The scheduling attempts that were successful. + */ + public val attemptsSuccess: Int + + /** + * The scheduling attempts that were unsuccessful due to client error. + */ + public val attemptsFailure: Int + + /** + * The scheduling attempts that were unsuccessful due to scheduler error. + */ + public val attemptsError: Int +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt new file mode 100644 index 00000000..c0ac0fbf --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.topology + +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory +import java.util.* + +/** + * Description of a physical host that will be simulated by OpenDC and host the virtual machines. + * + * @param uid Unique identifier of the host. + * @param name The name of the host. + * @param meta The metadata of the host. + * @param model The physical model of the machine. + * @param powerDriver The [PowerDriver] to model the power consumption of the machine. + * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host. + */ +public data class HostSpec( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: MachineModel, + val powerDriver: PowerDriver, + val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer() +) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/Topology.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/Topology.kt new file mode 100644 index 00000000..bd29110f --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/Topology.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.topology + +/** + * Representation of the environment of the compute service, describing the physical details of every host. + */ +public interface Topology { + /** + * Resolve the [Topology] into a list of [HostSpec]s. + */ + public fun resolve(): List<HostSpec> +} diff --git a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt new file mode 100644 index 00000000..52b94324 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.experiments.compute.telemetry.table.HostInfo +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetHostDataWriter] + */ +class HostDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : HostTableReader { + override val timestamp: Instant = Instant.now() + override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) + override val guestsTerminated: Int = 0 + override val guestsRunning: Int = 0 + override val guestsError: Int = 0 + override val guestsInvalid: Int = 0 + override val cpuLimit: Double = 4096.0 + override val cpuUsage: Double = 1.0 + override val cpuDemand: Double = 1.0 + override val cpuUtilization: Double = 0.0 + override val cpuActiveTime: Long = 1 + override val cpuIdleTime: Long = 1 + override val cpuStealTime: Long = 1 + override val cpuLostTime: Long = 1 + override val powerUsage: Double = 1.0 + override val powerTotal: Double = 1.0 + override val uptime: Long = 1 + override val downtime: Long = 1 + override val bootTime: Instant? = null + }) + } + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt new file mode 100644 index 00000000..0ba93173 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.experiments.compute.telemetry.table.HostInfo +import org.opendc.experiments.compute.telemetry.table.ServerInfo +import org.opendc.experiments.compute.telemetry.table.ServerTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetServerDataWriter] + */ +class ServerDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : ServerTableReader { + override val timestamp: Instant = Instant.now() + override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096) + override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) + override val cpuLimit: Double = 4096.0 + override val cpuActiveTime: Long = 1 + override val cpuIdleTime: Long = 1 + override val cpuStealTime: Long = 1 + override val cpuLostTime: Long = 1 + override val uptime: Long = 1 + override val downtime: Long = 1 + override val provisionTime: Instant = timestamp + override val bootTime: Instant? = null + }) + } + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt new file mode 100644 index 00000000..20301185 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetServiceDataWriter] + */ +class ServiceDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : ServiceTableReader { + override val timestamp: Instant = Instant.now() + override val hostsUp: Int = 1 + override val hostsDown: Int = 0 + override val serversPending: Int = 1 + override val serversActive: Int = 1 + override val attemptsSuccess: Int = 1 + override val attemptsFailure: Int = 0 + override val attemptsError: Int = 0 + }) + } + } +} |
