diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-compute')
35 files changed, 0 insertions, 3425 deletions
diff --git a/opendc-experiments/opendc-experiments-compute/build.gradle.kts b/opendc-experiments/opendc-experiments-compute/build.gradle.kts deleted file mode 100644 index 5cae1d43..00000000 --- a/opendc-experiments/opendc-experiments-compute/build.gradle.kts +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Support library for simulating VM-based workloads with OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` - `testing-conventions` - `jacoco-conventions` -} - -dependencies { - api(projects.opendcCompute.opendcComputeService) - api(projects.opendcExperiments.opendcExperimentsBase) - api(projects.opendcCompute.opendcComputeSimulator) - - implementation(projects.opendcTrace.opendcTraceApi) - implementation(projects.opendcTrace.opendcTraceParquet) - implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcSimulator.opendcSimulatorCompute) - - implementation(libs.kotlin.logging) - - testImplementation(libs.slf4j.simple) -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt deleted file mode 100644 index 125ba6ef..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("ComputeSchedulers") - -package org.opendc.experiments.compute - -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.ReplayScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher -import org.opendc.compute.service.scheduler.weights.RamWeigher -import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import java.util.SplittableRandom -import java.util.random.RandomGenerator - -/** - * Create a [ComputeScheduler] for the experiment. - */ -public fun createComputeScheduler(name: String, seeder: RandomGenerator, placements: Map<String, String> = emptyMap()): ComputeScheduler { - val cpuAllocationRatio = 16.0 - val ramAllocationRatio = 1.5 - return when (name) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = 1.0)) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = -1.0)) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = -1.0)) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = SplittableRandom(seeder.nextLong()) - ) - "replay" -> ReplayScheduler(placements) - else -> throw IllegalArgumentException("Unknown policy $name") - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt deleted file mode 100644 index 4470c418..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute - -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.experiments.provisioner.ProvisioningContext -import org.opendc.experiments.provisioner.ProvisioningStep -import java.time.Duration - -/** - * A [ProvisioningStep] that provisions a [ComputeService] without any hosts. - * - * @param serviceDomain The domain name under which to register the compute service. - * @param scheduler A function to construct the compute scheduler. - * @param schedulingQuantum The scheduling quantum of the compute scheduler. - */ -public class ComputeServiceProvisioningStep internal constructor( - private val serviceDomain: String, - private val scheduler: (ProvisioningContext) -> ComputeScheduler, - private val schedulingQuantum: Duration -) : ProvisioningStep { - override fun apply(ctx: ProvisioningContext): AutoCloseable { - val service = ComputeService.builder(ctx.dispatcher, scheduler(ctx)) - .withQuantum(schedulingQuantum) - .build() - ctx.registry.register(serviceDomain, ComputeService::class.java, service) - - return AutoCloseable { service.close() } - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt deleted file mode 100644 index 690156e2..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("ComputeSteps") - -package org.opendc.experiments.compute - -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.experiments.compute.telemetry.ComputeMonitor -import org.opendc.experiments.compute.telemetry.ComputeMonitorProvisioningStep -import org.opendc.experiments.compute.topology.HostSpec -import org.opendc.experiments.provisioner.ProvisioningContext -import org.opendc.experiments.provisioner.ProvisioningStep -import java.time.Duration - -/** - * Return a [ProvisioningStep] that provisions a [ComputeService] without any hosts. - * - * @param serviceDomain The domain name under which to register the compute service. - * @param scheduler A function to construct the compute scheduler. - * @param schedulingQuantum The scheduling quantum of the compute scheduler. - */ -public fun setupComputeService( - serviceDomain: String, - scheduler: (ProvisioningContext) -> ComputeScheduler, - schedulingQuantum: Duration = Duration.ofMinutes(5) -): ProvisioningStep { - return ComputeServiceProvisioningStep(serviceDomain, scheduler, schedulingQuantum) -} - -/** - * Return a [ProvisioningStep] that installs a [ComputeMetricReader] to periodically collect the metrics of a - * [ComputeService] and report them to a [ComputeMonitor]. - * - * @param serviceDomain The service domain at which the [ComputeService] is located. - * @param monitor The [ComputeMonitor] to install. - * @param exportInterval The interval between which to collect the metrics. - */ -public fun registerComputeMonitor( - serviceDomain: String, - monitor: ComputeMonitor, - exportInterval: Duration = Duration.ofMinutes(5) -): ProvisioningStep { - return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval) -} - -/** - * Return a [ProvisioningStep] that sets up the specified list of hosts (based on [specs]) for the specified compute - * service. - * - * @param serviceDomain The domain name under which the compute service is registered. - * @param specs A list of [HostSpec] objects describing the simulated hosts to provision. - * @param optimize A flag to indicate that the CPU resources of the host should be merged into a single CPU resource. - */ -public fun setupHosts(serviceDomain: String, specs: List<HostSpec>, optimize: Boolean = false): ProvisioningStep { - return HostsProvisioningStep(serviceDomain, specs, optimize) -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt deleted file mode 100644 index b7884293..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute - -import java.util.random.RandomGenerator - -/** - * An interface that describes how a workload is resolved. - */ -public interface ComputeWorkload { - /** - * Resolve the workload into a list of [VirtualMachine]s to simulate. - */ - public fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt deleted file mode 100644 index 29f012cd..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute - -import mu.KotlinLogging -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.workload.SimTrace -import org.opendc.trace.Trace -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.conv.RESOURCE_STOP_TIME -import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS -import org.opendc.trace.conv.TABLE_RESOURCES -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import java.io.File -import java.lang.ref.SoftReference -import java.time.Duration -import java.time.Instant -import java.util.UUID -import java.util.concurrent.ConcurrentHashMap -import kotlin.math.roundToLong - -/** - * A helper class for loading compute workload traces into memory. - * - * @param baseDir The directory containing the traces. - */ -public class ComputeWorkloadLoader(private val baseDir: File) { - /** - * The logger for this instance. - */ - private val logger = KotlinLogging.logger {} - - /** - * The cache of workloads. - */ - private val cache = ConcurrentHashMap<String, SoftReference<List<VirtualMachine>>>() - - /** - * Read the fragments into memory. - */ - private fun parseFragments(trace: Trace): Map<String, Builder> { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - - val idCol = reader.resolve(RESOURCE_ID) - val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) - val durationCol = reader.resolve(RESOURCE_STATE_DURATION) - val coresCol = reader.resolve(RESOURCE_CPU_COUNT) - val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) - - val fragments = mutableMapOf<String, Builder>() - - return try { - while (reader.nextRow()) { - val id = reader.getString(idCol)!! - val time = reader.getInstant(timestampCol)!! - val durationMs = reader.getDuration(durationCol)!! - val cores = reader.getInt(coresCol) - val cpuUsage = reader.getDouble(usageCol) - - val builder = fragments.computeIfAbsent(id) { Builder() } - builder.add(time, durationMs, cpuUsage, cores) - } - - fragments - } finally { - reader.close() - } - } - - /** - * Read the metadata into a workload. - */ - private fun parseMeta(trace: Trace, fragments: Map<String, Builder>, interferenceModel: VmInterferenceModel): List<VirtualMachine> { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() - - val idCol = reader.resolve(RESOURCE_ID) - val startTimeCol = reader.resolve(RESOURCE_START_TIME) - val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) - val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) - val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) - val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) - - var counter = 0 - val entries = mutableListOf<VirtualMachine>() - - return try { - while (reader.nextRow()) { - val id = reader.getString(idCol)!! - if (!fragments.containsKey(id)) { - continue - } - - val submissionTime = reader.getInstant(startTimeCol)!! - val endTime = reader.getInstant(stopTimeCol)!! - val cpuCount = reader.getInt(cpuCountCol) - val cpuCapacity = reader.getDouble(cpuCapacityCol) - val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB - val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - - val builder = fragments.getValue(id) - val totalLoad = builder.totalLoad - - entries.add( - VirtualMachine( - uid, - id, - cpuCount, - cpuCapacity, - memCapacity.roundToLong(), - totalLoad, - submissionTime, - endTime, - builder.build(), - interferenceModel.getProfile(id) - ) - ) - } - - // Make sure the virtual machines are ordered by start time - entries.sortBy { it.startTime } - - entries - } catch (e: Exception) { - e.printStackTrace() - throw e - } finally { - reader.close() - } - } - - /** - * Read the interference model associated with the specified [trace]. - */ - private fun parseInterferenceModel(trace: Trace): VmInterferenceModel { - val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader() - - return try { - val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS) - val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET) - val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE) - - val modelBuilder = VmInterferenceModel.builder() - - while (reader.nextRow()) { - val members = reader.getSet(membersCol, String::class.java)!! - val target = reader.getDouble(targetCol) - val score = reader.getDouble(scoreCol) - - modelBuilder - .addGroup(members, target, score) - } - - modelBuilder.build() - } finally { - reader.close() - } - } - - /** - * Load the trace with the specified [name] and [format]. - */ - public fun get(name: String, format: String): List<VirtualMachine> { - val ref = cache.compute(name) { key, oldVal -> - val inst = oldVal?.get() - if (inst == null) { - val path = baseDir.resolve(key) - - logger.info { "Loading trace $key at $path" } - - val trace = Trace.open(path, format) - val fragments = parseFragments(trace) - val interferenceModel = parseInterferenceModel(trace) - val vms = parseMeta(trace, fragments, interferenceModel) - - SoftReference(vms) - } else { - oldVal - } - } - - return checkNotNull(ref?.get()) { "Memory pressure" } - } - - /** - * Clear the workload cache. - */ - public fun reset() { - cache.clear() - } - - /** - * A builder for a VM trace. - */ - private class Builder { - /** - * The total load of the trace. - */ - @JvmField var totalLoad: Double = 0.0 - - /** - * The internal builder for the trace. - */ - private val builder = SimTrace.builder() - - /** - * The deadline of the previous fragment. - */ - private var previousDeadline = Long.MIN_VALUE - - /** - * Add a fragment to the trace. - * - * @param timestamp Timestamp at which the fragment starts (in epoch millis). - * @param deadline Timestamp at which the fragment ends (in epoch millis). - * @param usage CPU usage of this fragment. - * @param cores Number of cores used. - */ - fun add(deadline: Instant, duration: Duration, usage: Double, cores: Int) { - val startTimeMs = (deadline - duration).toEpochMilli() - totalLoad += (usage * duration.toMillis()) / 1000.0 // avg MHz * duration = MFLOPs - - if ((startTimeMs != previousDeadline) && (previousDeadline != Long.MIN_VALUE)) { - // There is a gap between the previous and current fragment; fill the gap - builder.add(startTimeMs, 0.0, cores) - } - - builder.add(deadline.toEpochMilli(), usage, cores) - previousDeadline = deadline.toEpochMilli() - } - - /** - * Build the trace. - */ - fun build(): SimTrace = builder.build() - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt deleted file mode 100644 index 4b3a6089..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("ComputeWorkloads") - -package org.opendc.experiments.compute - -import org.opendc.experiments.compute.internal.CompositeComputeWorkload -import org.opendc.experiments.compute.internal.HpcSampledComputeWorkload -import org.opendc.experiments.compute.internal.LoadSampledComputeWorkload -import org.opendc.experiments.compute.internal.TraceComputeWorkload - -/** - * Construct a workload from a trace. - */ -public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format) - -/** - * Construct a composite workload with the specified fractions. - */ -public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload { - return CompositeComputeWorkload(pairs.toMap()) -} - -/** - * Sample a workload by a [fraction] of the total load. - */ -public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload { - return LoadSampledComputeWorkload(this, fraction) -} - -/** - * Sample a workload by a [fraction] of the HPC VMs (count) - */ -public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload { - return HpcSampledComputeWorkload(this, fraction) -} - -/** - * Sample a workload by a [fraction] of the HPC load - */ -public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload { - return HpcSampledComputeWorkload(this, fraction, sampleLoad = true) -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt deleted file mode 100644 index eb85dbb8..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute - -import org.opendc.compute.service.ComputeService -import org.opendc.compute.simulator.failure.HostFaultInjector -import java.time.InstantSource -import java.util.random.RandomGenerator -import kotlin.coroutines.CoroutineContext - -/** - * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. - */ -public interface FailureModel { - /** - * Construct a [HostFaultInjector] for the specified [service]. - */ - public fun createInjector( - context: CoroutineContext, - clock: InstantSource, - service: ComputeService, - random: RandomGenerator - ): HostFaultInjector -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt deleted file mode 100644 index 679e370a..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("FailureModels") - -package org.opendc.experiments.compute - -import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.commons.math3.random.Well19937c -import org.opendc.compute.service.ComputeService -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.failure.HostFaultInjector -import org.opendc.compute.simulator.failure.StartStopHostFault -import org.opendc.compute.simulator.failure.StochasticVictimSelector -import java.time.Duration -import java.time.InstantSource -import java.util.random.RandomGenerator -import kotlin.coroutines.CoroutineContext -import kotlin.math.ln - -/** - * Obtain a [FailureModel] based on the GRID'5000 failure trace. - * - * This fault injector uses parameters from the GRID'5000 failure trace as described in - * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. - */ -public fun grid5000(failureInterval: Duration): FailureModel { - return object : FailureModel { - override fun createInjector( - context: CoroutineContext, - clock: InstantSource, - service: ComputeService, - random: RandomGenerator - ): HostFaultInjector { - val rng = Well19937c(random.nextLong()) - val hosts = service.hosts.map { it as SimHost }.toSet() - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) - } - - override fun toString(): String = "Grid5000FailureModel" - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt deleted file mode 100644 index 310aa54c..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute - -import org.opendc.compute.service.ComputeService -import org.opendc.compute.simulator.SimHost -import org.opendc.experiments.compute.topology.HostSpec -import org.opendc.experiments.provisioner.ProvisioningContext -import org.opendc.experiments.provisioner.ProvisioningStep -import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.flow2.FlowEngine -import java.util.SplittableRandom - -/** - * A [ProvisioningStep] that provisions a list of hosts for a [ComputeService]. - * - * @param serviceDomain The domain name under which the compute service is registered. - * @param specs A list of [HostSpec] objects describing the simulated hosts to provision. - * @param optimize A flag to indicate that the CPU resources of the host should be merged into a single CPU resource. - */ -public class HostsProvisioningStep internal constructor( - private val serviceDomain: String, - private val specs: List<HostSpec>, - private val optimize: Boolean -) : ProvisioningStep { - override fun apply(ctx: ProvisioningContext): AutoCloseable { - val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } - val engine = FlowEngine.create(ctx.dispatcher) - val graph = engine.newGraph() - val hosts = mutableSetOf<SimHost>() - - for (spec in specs) { - val machine = SimBareMetalMachine.create(graph, spec.model, spec.psuFactory) - val hypervisor = SimHypervisor.create(spec.multiplexerFactory, SplittableRandom(ctx.seeder.nextLong())) - - val host = SimHost( - spec.uid, - spec.name, - spec.meta, - ctx.dispatcher.timeSource, - machine, - hypervisor, - optimize = optimize - ) - - require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" } - service.addHost(host) - } - - return AutoCloseable { - for (host in hosts) { - host.close() - } - } - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt deleted file mode 100644 index 8de4fdef..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("TraceHelpers") - -package org.opendc.experiments.compute - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.yield -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher -import org.opendc.compute.service.ComputeService -import java.time.InstantSource -import java.util.Random -import kotlin.coroutines.coroutineContext -import kotlin.math.max - -public class RunningServerWatcher : ServerWatcher { - - private val _mutex: Mutex = Mutex() - - public suspend fun lock() { - _mutex.lock() - } - - public suspend fun wait() { - // TODO: look at the better way to wait for an unlock - this.lock() - } - - override fun onStateChanged(server: Server, newState: ServerState) { - when (newState) { - ServerState.TERMINATED -> { - _mutex.unlock() - } - ServerState.ERROR -> { - _mutex.unlock() - } - ServerState.DELETED -> { - _mutex.unlock() - } - else -> {} - } - } -} - -/** - * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. - * - * @param clock The simulation clock. - * @param trace The trace to simulate. - * @param seed The seed to use for randomness. - * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). - * @param failureModel A failure model to use for injecting failures. - * @param interference A flag to indicate that VM interference needs to be enabled. - */ -public suspend fun ComputeService.replay( - clock: InstantSource, - trace: List<VirtualMachine>, - seed: Long, - submitImmediately: Boolean = false, - failureModel: FailureModel? = null, - interference: Boolean = false -) { - val injector = failureModel?.createInjector(coroutineContext, clock, this, Random(seed)) - val client = newClient() - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - // Start the fault injector - injector?.start() - - var simulationOffset = Long.MIN_VALUE - - for (entry in trace.sortedBy { it.startTime }) { - val now = clock.millis() - val start = entry.startTime.toEpochMilli() - - // Set the simulationOffset based on the starting time of the first server - if (simulationOffset == Long.MIN_VALUE) { - simulationOffset = start - now - } - - // Make sure the trace entries are ordered by submission time -// assert(start - simulationOffset >= 0) { "Invalid trace order" } - - // Delay the server based on the startTime given by the trace. - if (!submitImmediately) { - delay(max(0, (start - now - simulationOffset))) - } - - val workload = entry.trace.createWorkload(start) - val meta = mutableMapOf<String, Any>("workload" to workload) - - val interferenceProfile = entry.interferenceProfile - if (interference && interferenceProfile != null) { - meta["interference-profile"] = interferenceProfile - } - - launch { - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.cpuCount, - entry.memCapacity, - meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() - ), - meta = meta - ) - - val serverWatcher = RunningServerWatcher() - serverWatcher.lock() - server.watch(serverWatcher) - - // Wait until the server is terminated - serverWatcher.wait() - - // Stop the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - yield() - } finally { - injector?.close() - client.close() - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt deleted file mode 100644 index 509af59f..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute - -import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile -import org.opendc.simulator.compute.workload.SimTrace -import java.time.Instant -import java.util.UUID - -/** - * A virtual machine workload. - * - * @param uid The unique identifier of the virtual machine. - * @param name The name of the virtual machine. - * @param cpuCapacity The required CPU capacity for the VM in MHz. - * @param cpuCount The number of vCPUs in the VM. - * @param memCapacity The provisioned memory for the VM in MB. - * @param startTime The start time of the VM. - * @param stopTime The stop time of the VM. - * @param trace The trace that belong to this VM. - * @param interferenceProfile The interference profile of this virtual machine. - */ -public data class VirtualMachine( - val uid: UUID, - val name: String, - val cpuCount: Int, - val cpuCapacity: Double, - val memCapacity: Long, - val totalLoad: Double, - val startTime: Instant, - val stopTime: Instant, - val trace: SimTrace, - val interferenceProfile: VmInterferenceProfile? -) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt deleted file mode 100644 index a104851f..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.export.parquet - -import org.opendc.experiments.compute.telemetry.ComputeMonitor -import org.opendc.experiments.compute.telemetry.table.HostTableReader -import org.opendc.experiments.compute.telemetry.table.ServerTableReader -import org.opendc.experiments.compute.telemetry.table.ServiceTableReader -import java.io.File - -/** - * A [ComputeMonitor] that logs the events to a Parquet file. - */ -public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { - private val serverWriter = ParquetServerDataWriter( - File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val hostWriter = ParquetHostDataWriter( - File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val serviceWriter = ParquetServiceDataWriter( - File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - override fun record(reader: ServerTableReader) { - serverWriter.write(reader) - } - - override fun record(reader: HostTableReader) { - hostWriter.write(reader) - } - - override fun record(reader: ServiceTableReader) { - serviceWriter.write(reader) - } - - override fun close() { - hostWriter.close() - serviceWriter.close() - serverWriter.close() - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt deleted file mode 100644 index 60629a95..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.export.parquet - -import mu.KotlinLogging -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetFileWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread - -/** - * A writer that writes data in Parquet format. - * - * @param path The path to the file to write the data to. - * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format. - */ -public abstract class ParquetDataWriter<in T>( - path: File, - private val writeSupport: WriteSupport<T>, - bufferSize: Int = 4096 -) : AutoCloseable { - /** - * The logging instance to use. - */ - private val logger = KotlinLogging.logger {} - - /** - * The queue of records to process. - */ - private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) - - /** - * An exception to be propagated to the actual writer. - */ - private var exception: Throwable? = null - - /** - * The thread that is responsible for writing the Parquet records. - */ - private val writerThread = thread(start = false, name = this.toString()) { - val writer = let { - val builder = LocalParquetWriter.builder(path.toPath(), writeSupport) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - buildWriter(builder) - } - - val queue = queue - val buf = mutableListOf<T>() - var shouldStop = false - - try { - while (!shouldStop) { - try { - writer.write(queue.take()) - } catch (e: InterruptedException) { - shouldStop = true - } - - if (queue.drainTo(buf) > 0) { - for (data in buf) { - writer.write(data) - } - buf.clear() - } - } - } catch (e: Throwable) { - logger.error(e) { "Failure in Parquet data writer" } - exception = e - } finally { - writer.close() - } - } - - /** - * Build the [ParquetWriter] used to write the Parquet files. - */ - protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> { - return builder.build() - } - - /** - * Write the specified metrics to the database. - */ - public fun write(data: T) { - val exception = exception - if (exception != null) { - throw IllegalStateException("Writer thread failed", exception) - } - - queue.put(data) - } - - /** - * Signal the writer to stop. - */ - override fun close() { - writerThread.interrupt() - writerThread.join() - } - - init { - writerThread.start() - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt deleted file mode 100644 index 735101df..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.experiments.compute.telemetry.table.HostTableReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File - -/** - * A Parquet event writer for [HostTableReader]s. - */ -public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) { - - override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> { - return builder - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun toString(): String = "host-writer" - - /** - * A [WriteSupport] implementation for a [HostTableReader]. - */ - private class HostDataWriteSupport : WriteSupport<HostTableReader>() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: HostTableReader) { - write(recordConsumer, record) - } - - private fun write(consumer: RecordConsumer, data: HostTableReader) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("host_id", 1) - consumer.addBinary(Binary.fromString(data.host.id)) - consumer.endField("host_id", 1) - - consumer.startField("cpu_count", 2) - consumer.addInteger(data.host.cpuCount) - consumer.endField("cpu_count", 2) - - consumer.startField("mem_capacity", 3) - consumer.addLong(data.host.memCapacity) - consumer.endField("mem_capacity", 3) - - consumer.startField("guests_terminated", 4) - consumer.addInteger(data.guestsTerminated) - consumer.endField("guests_terminated", 4) - - consumer.startField("guests_running", 5) - consumer.addInteger(data.guestsRunning) - consumer.endField("guests_running", 5) - - consumer.startField("guests_error", 6) - consumer.addInteger(data.guestsError) - consumer.endField("guests_error", 6) - - consumer.startField("guests_invalid", 7) - consumer.addInteger(data.guestsInvalid) - consumer.endField("guests_invalid", 7) - - consumer.startField("cpu_limit", 8) - consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 8) - - consumer.startField("cpu_usage", 9) - consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_usage", 9) - - consumer.startField("cpu_demand", 10) - consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_demand", 10) - - consumer.startField("cpu_utilization", 11) - consumer.addDouble(data.cpuUtilization) - consumer.endField("cpu_utilization", 11) - - consumer.startField("cpu_time_active", 12) - consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 12) - - consumer.startField("cpu_time_idle", 13) - consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 13) - - consumer.startField("cpu_time_steal", 14) - consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 14) - - consumer.startField("cpu_time_lost", 15) - consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 15) - - consumer.startField("power_total", 16) - consumer.addDouble(data.powerTotal) - consumer.endField("power_total", 16) - - consumer.startField("uptime", 17) - consumer.addLong(data.uptime) - consumer.endField("uptime", 17) - - consumer.startField("downtime", 18) - consumer.addLong(data.downtime) - consumer.endField("downtime", 18) - - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 19) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 19) - } - - consumer.endMessage() - } - } - - private companion object { - /** - * The schema of the host data. - */ - val SCHEMA: MessageType = Types - .buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_terminated"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_running"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_error"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_invalid"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_demand"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_utilization"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("power_total"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("boot_time") - ) - .named("host") - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt deleted file mode 100644 index e4c369fa..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.experiments.compute.telemetry.table.ServerTableReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File - -/** - * A Parquet event writer for [ServerTableReader]s. - */ -public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) { - - override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> { - return builder - .withDictionaryEncoding("server_id", true) - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun toString(): String = "server-writer" - - /** - * A [WriteSupport] implementation for a [ServerTableReader]. - */ - private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ServerTableReader) { - write(recordConsumer, record) - } - - private fun write(consumer: RecordConsumer, data: ServerTableReader) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("server_id", 1) - consumer.addBinary(Binary.fromString(data.server.id)) - consumer.endField("server_id", 1) - - consumer.startField("server_name", 2) - consumer.addBinary(Binary.fromString(data.server.name)) - consumer.endField("server_name", 2) - - val hostId = data.host?.id - if (hostId != null) { - consumer.startField("host_id", 3) - consumer.addBinary(Binary.fromString(hostId)) - consumer.endField("host_id", 3) - } - - consumer.startField("mem_capacity", 4) - consumer.addLong(data.server.memCapacity) - consumer.endField("mem_capacity", 4) - - consumer.startField("cpu_count", 5) - consumer.addInteger(data.server.cpuCount) - consumer.endField("cpu_count", 5) - - consumer.startField("cpu_limit", 6) - consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 6) - - consumer.startField("cpu_time_active", 7) - consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 7) - - consumer.startField("cpu_time_idle", 8) - consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 8) - - consumer.startField("cpu_time_steal", 9) - consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 9) - - consumer.startField("cpu_time_lost", 10) - consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 10) - - consumer.startField("uptime", 11) - consumer.addLong(data.uptime) - consumer.endField("uptime", 11) - - consumer.startField("downtime", 12) - consumer.addLong(data.downtime) - consumer.endField("downtime", 12) - - val provisionTime = data.provisionTime - if (provisionTime != null) { - consumer.startField("provision_time", 13) - consumer.addLong(provisionTime.toEpochMilli()) - consumer.endField("provision_time", 13) - } - - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 14) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 14) - } - - consumer.endMessage() - } - } - - private companion object { - /** - * The schema of the server data. - */ - val SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_name"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("provision_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("boot_time") - - ) - .named("server") - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt deleted file mode 100644 index 39bc2d11..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.experiments.compute.telemetry.table.ServiceTableReader -import java.io.File - -/** - * A Parquet event writer for [ServiceTableReader]s. - */ -public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) { - - override fun toString(): String = "service-writer" - - /** - * A [WriteSupport] implementation for a [ServiceTableReader]. - */ - private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ServiceTableReader) { - write(recordConsumer, record) - } - - private fun write(consumer: RecordConsumer, data: ServiceTableReader) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("hosts_up", 1) - consumer.addInteger(data.hostsUp) - consumer.endField("hosts_up", 1) - - consumer.startField("hosts_down", 2) - consumer.addInteger(data.hostsDown) - consumer.endField("hosts_down", 2) - - consumer.startField("servers_pending", 3) - consumer.addInteger(data.serversPending) - consumer.endField("servers_pending", 3) - - consumer.startField("servers_active", 4) - consumer.addInteger(data.serversActive) - consumer.endField("servers_active", 4) - - consumer.startField("attempts_success", 5) - consumer.addInteger(data.attemptsSuccess) - consumer.endField("attempts_pending", 5) - - consumer.startField("attempts_failure", 6) - consumer.addInteger(data.attemptsFailure) - consumer.endField("attempts_failure", 6) - - consumer.startField("attempts_error", 7) - consumer.addInteger(data.attemptsError) - consumer.endField("attempts_error", 7) - - consumer.endMessage() - } - } - - private companion object { - private val SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_up"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_down"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_pending"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_success"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_failure"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_error") - ) - .named("service") - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt deleted file mode 100644 index a3f2d597..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.export.parquet - -import org.apache.parquet.io.api.Binary -import java.nio.ByteBuffer -import java.util.UUID - -/** - * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet. - */ -internal fun UUID.toBinary(): Binary { - val bb = ByteBuffer.allocate(16) - bb.putLong(mostSignificantBits) - bb.putLong(leastSignificantBits) - bb.rewind() - return Binary.fromConstantByteBuffer(bb) -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt deleted file mode 100644 index ca23a7c5..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.internal - -import mu.KotlinLogging -import org.opendc.experiments.compute.ComputeWorkload -import org.opendc.experiments.compute.ComputeWorkloadLoader -import org.opendc.experiments.compute.VirtualMachine -import java.util.random.RandomGenerator - -/** - * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads. - */ -internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload { - /** - * The logging instance of this class. - */ - private val logger = KotlinLogging.logger {} - - override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { - val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } - - val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } - - val res = mutableListOf<VirtualMachine>() - - for ((fraction, vms) in traces) { - var currentLoad = 0.0 - - for (entry in vms) { - val entryLoad = entry.totalLoad - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - } - - val vmCount = traces.sumOf { (_, vms) -> vms.size } - logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } - - return res - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt deleted file mode 100644 index 583405da..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.internal - -import mu.KotlinLogging -import org.opendc.experiments.compute.ComputeWorkload -import org.opendc.experiments.compute.ComputeWorkloadLoader -import org.opendc.experiments.compute.VirtualMachine -import java.util.UUID -import java.util.random.RandomGenerator - -/** - * A [ComputeWorkload] that samples HPC VMs in the workload. - * - * @param fraction The fraction of load/virtual machines to sample - * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs. - */ -internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload { - /** - * The logging instance of this class. - */ - private val logger = KotlinLogging.logger {} - - /** - * The pattern to match compute nodes in the workload. - */ - private val pattern = Regex("^(ComputeNode|cn).*") - - override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { - val vms = source.resolve(loader, random) - - val (hpc, nonHpc) = vms.partition { entry -> - val name = entry.name - name.matches(pattern) - } - - val hpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<VirtualMachine>() - hpc.mapTo(res) { sample(it, index) } - res - } - .flatten() - - val nonHpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<VirtualMachine>() - nonHpc.mapTo(res) { sample(it, index) } - res - } - .flatten() - - logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } - - val totalLoad = vms.sumOf { it.totalLoad } - - logger.debug { "Total trace load: $totalLoad" } - var hpcCount = 0 - var hpcLoad = 0.0 - var nonHpcCount = 0 - var nonHpcLoad = 0.0 - - val res = mutableListOf<VirtualMachine>() - - if (sampleLoad) { - var currentLoad = 0.0 - for (entry in hpcSequence) { - val entryLoad = entry.totalLoad - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - hpcLoad += entryLoad - hpcCount += 1 - currentLoad += entryLoad - res += entry - } - - for (entry in nonHpcSequence) { - val entryLoad = entry.totalLoad - if ((currentLoad + entryLoad) / totalLoad > 1) { - break - } - - nonHpcLoad += entryLoad - nonHpcCount += 1 - currentLoad += entryLoad - res += entry - } - } else { - hpcSequence - .take((fraction * vms.size).toInt()) - .forEach { entry -> - hpcLoad += entry.totalLoad - hpcCount += 1 - res.add(entry) - } - - nonHpcSequence - .take(((1 - fraction) * vms.size).toInt()) - .forEach { entry -> - nonHpcLoad += entry.totalLoad - nonHpcCount += 1 - res.add(entry) - } - } - - logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } - logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } - logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res - } - - /** - * Sample a random trace entry. - */ - private fun sample(entry: VirtualMachine, i: Int): VirtualMachine { - val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) - return entry.copy(uid = uid) - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt deleted file mode 100644 index ffb7e0c6..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.internal - -import mu.KotlinLogging -import org.opendc.experiments.compute.ComputeWorkload -import org.opendc.experiments.compute.ComputeWorkloadLoader -import org.opendc.experiments.compute.VirtualMachine -import java.util.random.RandomGenerator - -/** - * A [ComputeWorkload] that is sampled based on total load. - */ -internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload { - /** - * The logging instance of this class. - */ - private val logger = KotlinLogging.logger {} - - override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { - val vms = source.resolve(loader, random) - val res = mutableListOf<VirtualMachine>() - - val totalLoad = vms.sumOf { it.totalLoad } - var currentLoad = 0.0 - - for (entry in vms) { - val entryLoad = entry.totalLoad - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - - logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt deleted file mode 100644 index d9e311cd..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.internal - -import org.opendc.experiments.compute.ComputeWorkload -import org.opendc.experiments.compute.ComputeWorkloadLoader -import org.opendc.experiments.compute.VirtualMachine -import java.util.random.RandomGenerator - -/** - * A [ComputeWorkload] from a trace. - */ -internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { - override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { - return loader.get(name, format) - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt deleted file mode 100644 index 995432d4..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt +++ /dev/null @@ -1,508 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.telemetry - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import mu.KotlinLogging -import org.opendc.common.Dispatcher -import org.opendc.common.asCoroutineDispatcher -import org.opendc.compute.api.Server -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.driver.Host -import org.opendc.experiments.compute.telemetry.table.HostInfo -import org.opendc.experiments.compute.telemetry.table.HostTableReader -import org.opendc.experiments.compute.telemetry.table.ServerInfo -import org.opendc.experiments.compute.telemetry.table.ServerTableReader -import org.opendc.experiments.compute.telemetry.table.ServiceTableReader -import java.time.Duration -import java.time.Instant - -/** - * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every - * export interval. - * - * @param dispatcher A [Dispatcher] for scheduling the future events. - * @param service The [ComputeService] to monitor. - * @param monitor The monitor to export the metrics to. - * @param exportInterval The export interval. - */ -public class ComputeMetricReader( - dispatcher: Dispatcher, - private val service: ComputeService, - private val monitor: ComputeMonitor, - private val exportInterval: Duration = Duration.ofMinutes(5) -) : AutoCloseable { - private val logger = KotlinLogging.logger {} - private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) - private val clock = dispatcher.timeSource - - /** - * Aggregator for service metrics. - */ - private val serviceTableReader = ServiceTableReaderImpl(service) - - /** - * Mapping from [Host] instances to [HostTableReaderImpl] - */ - private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>() - - /** - * Mapping from [Server] instances to [ServerTableReaderImpl] - */ - private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>() - - /** - * The background job that is responsible for collecting the metrics every cycle. - */ - private val job = scope.launch { - val intervalMs = exportInterval.toMillis() - try { - while (isActive) { - delay(intervalMs) - - loggState() - } - } finally { - loggState() - - if (monitor is AutoCloseable) { - monitor.close() - } - } - } - - private fun loggState() { - try { - val now = this.clock.instant() - - for (host in this.service.hosts) { - val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() - } - - for (server in this.service.servers) { - val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() - } - - this.serviceTableReader.record(now) - monitor.record(this.serviceTableReader.copy()) - } catch (cause: Throwable) { - this.logger.warn(cause) { "Exporter threw an Exception" } - } - } - - override fun close() { - job.cancel() - } - - /** - * An aggregator for service metrics before they are reported. - */ - private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { - - override fun copy(): ServiceTableReader { - val newServiceTable = ServiceTableReaderImpl(service) - newServiceTable.setValues(this) - - return newServiceTable - } - - override fun setValues(table: ServiceTableReader) { - _timestamp = table.timestamp - - _hostsUp = table.hostsUp - _hostsDown = table.hostsDown - _serversTotal = table.serversTotal - _serversPending = table.serversPending - _serversActive = table.serversActive - _attemptsSuccess = table.attemptsSuccess - _attemptsFailure = table.attemptsFailure - _attemptsError = table.attemptsError - } - - private var _timestamp: Instant = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - override val hostsUp: Int - get() = _hostsUp - private var _hostsUp = 0 - - override val hostsDown: Int - get() = _hostsDown - private var _hostsDown = 0 - - override val serversTotal: Int - get() = _serversTotal - private var _serversTotal = 0 - - override val serversPending: Int - get() = _serversPending - private var _serversPending = 0 - - override val serversActive: Int - get() = _serversActive - private var _serversActive = 0 - - override val attemptsSuccess: Int - get() = _attemptsSuccess - private var _attemptsSuccess = 0 - - override val attemptsFailure: Int - get() = _attemptsFailure - private var _attemptsFailure = 0 - - override val attemptsError: Int - get() = _attemptsError - private var _attemptsError = 0 - - /** - * Record the next cycle. - */ - fun record(now: Instant) { - _timestamp = now - - val stats = service.getSchedulerStats() - _hostsUp = stats.hostsAvailable - _hostsDown = stats.hostsUnavailable - _serversTotal = stats.serversTotal - _serversPending = stats.serversPending - _serversActive = stats.serversActive - _attemptsSuccess = stats.attemptsSuccess.toInt() - _attemptsFailure = stats.attemptsFailure.toInt() - _attemptsError = stats.attemptsError.toInt() - } - } - - /** - * An aggregator for host metrics before they are reported. - */ - private class HostTableReaderImpl(host: Host) : HostTableReader { - override fun copy(): HostTableReader { - val newHostTable = HostTableReaderImpl(_host) - newHostTable.setValues(this) - - return newHostTable - } - - override fun setValues(table: HostTableReader) { - _timestamp = table.timestamp - _guestsTerminated = table.guestsTerminated - _guestsRunning = table.guestsRunning - _guestsError = table.guestsError - _guestsInvalid = table.guestsInvalid - _cpuLimit = table.cpuLimit - _cpuDemand = table.cpuDemand - _cpuUsage = table.cpuUsage - _cpuUtilization = table.cpuUtilization - _cpuActiveTime = table.cpuActiveTime - _cpuIdleTime = table.cpuIdleTime - _cpuStealTime = table.cpuStealTime - _cpuLostTime = table.cpuLostTime - _powerUsage = table.powerUsage - _powerTotal = table.powerTotal - _uptime = table.uptime - _downtime = table.downtime - _bootTime = table.bootTime - } - - private val _host = host - - override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) - - override val timestamp: Instant - get() = _timestamp - private var _timestamp = Instant.MIN - - override val guestsTerminated: Int - get() = _guestsTerminated - private var _guestsTerminated = 0 - - override val guestsRunning: Int - get() = _guestsRunning - private var _guestsRunning = 0 - - override val guestsError: Int - get() = _guestsError - private var _guestsError = 0 - - override val guestsInvalid: Int - get() = _guestsInvalid - private var _guestsInvalid = 0 - - override val cpuLimit: Double - get() = _cpuLimit - private var _cpuLimit = 0.0 - - override val cpuUsage: Double - get() = _cpuUsage - private var _cpuUsage = 0.0 - - override val cpuDemand: Double - get() = _cpuDemand - private var _cpuDemand = 0.0 - - override val cpuUtilization: Double - get() = _cpuUtilization - private var _cpuUtilization = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - private var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - private var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - private var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - private var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - override val powerUsage: Double - get() = _powerUsage - private var _powerUsage = 0.0 - - override val powerTotal: Double - get() = _powerTotal - previousPowerTotal - private var _powerTotal = 0.0 - private var previousPowerTotal = 0.0 - - override val uptime: Long - get() = _uptime - previousUptime - private var _uptime = 0L - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - private var _downtime = 0L - private var previousDowntime = 0L - - override val bootTime: Instant? - get() = _bootTime - private var _bootTime: Instant? = null - - /** - * Record the next cycle. - */ - fun record(now: Instant) { - val hostCpuStats = _host.getCpuStats() - val hostSysStats = _host.getSystemStats() - - _timestamp = now - _guestsTerminated = hostSysStats.guestsTerminated - _guestsRunning = hostSysStats.guestsRunning - _guestsError = hostSysStats.guestsError - _guestsInvalid = hostSysStats.guestsInvalid - _cpuLimit = hostCpuStats.capacity - _cpuDemand = hostCpuStats.demand - _cpuUsage = hostCpuStats.usage - _cpuUtilization = hostCpuStats.utilization - _cpuActiveTime = hostCpuStats.activeTime - _cpuIdleTime = hostCpuStats.idleTime - _cpuStealTime = hostCpuStats.stealTime - _cpuLostTime = hostCpuStats.lostTime - _powerUsage = hostSysStats.powerUsage - _powerTotal = hostSysStats.energyUsage - _uptime = hostSysStats.uptime.toMillis() - _downtime = hostSysStats.downtime.toMillis() - _bootTime = hostSysStats.bootTime - } - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - // Reset intermediate state for next aggregation - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - previousPowerTotal = _powerTotal - previousUptime = _uptime - previousDowntime = _downtime - - _guestsTerminated = 0 - _guestsRunning = 0 - _guestsError = 0 - _guestsInvalid = 0 - - _cpuLimit = 0.0 - _cpuUsage = 0.0 - _cpuDemand = 0.0 - _cpuUtilization = 0.0 - - _powerUsage = 0.0 - } - } - - /** - * An aggregator for server metrics before they are reported. - */ - private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { - override fun copy(): ServerTableReader { - val newServerTable = ServerTableReaderImpl(service, _server) - newServerTable.setValues(this) - - return newServerTable - } - - override fun setValues(table: ServerTableReader) { - host = table.host - - _timestamp = table.timestamp - _cpuLimit = table.cpuLimit - _cpuActiveTime = table.cpuActiveTime - _cpuIdleTime = table.cpuIdleTime - _cpuStealTime = table.cpuStealTime - _cpuLostTime = table.cpuLostTime - _uptime = table.uptime - _downtime = table.downtime - _provisionTime = table.provisionTime - _bootTime = table.bootTime - } - - private val _server = server - - /** - * The static information about this server. - */ - override val server = ServerInfo( - server.uid.toString(), - server.name, - "vm", - "x86", - server.image.uid.toString(), - server.image.name, - server.flavor.cpuCount, - server.flavor.memorySize - ) - - /** - * The [HostInfo] of the host on which the server is hosted. - */ - override var host: HostInfo? = null - private var _host: Host? = null - - private var _timestamp = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - override val uptime: Long - get() = _uptime - previousUptime - private var _uptime: Long = 0 - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - private var _downtime: Long = 0 - private var previousDowntime = 0L - - override val provisionTime: Instant? - get() = _provisionTime - private var _provisionTime: Instant? = null - - override val bootTime: Instant? - get() = _bootTime - private var _bootTime: Instant? = null - - override val cpuLimit: Double - get() = _cpuLimit - private var _cpuLimit = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - private var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - private var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - private var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - private var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - /** - * Record the next cycle. - */ - fun record(now: Instant) { - val newHost = service.lookupHost(_server) - if (newHost != null && newHost.uid != _host?.uid) { - _host = newHost - host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) - } - - val cpuStats = _host?.getCpuStats(_server) - val sysStats = _host?.getSystemStats(_server) - - _timestamp = now - _cpuLimit = cpuStats?.capacity ?: 0.0 - _cpuActiveTime = cpuStats?.activeTime ?: 0 - _cpuIdleTime = cpuStats?.idleTime ?: 0 - _cpuStealTime = cpuStats?.stealTime ?: 0 - _cpuLostTime = cpuStats?.lostTime ?: 0 - _uptime = sysStats?.uptime?.toMillis() ?: 0 - _downtime = sysStats?.downtime?.toMillis() ?: 0 - _provisionTime = _server.launchedAt - _bootTime = sysStats?.bootTime - } - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - previousUptime = _uptime - previousDowntime = _downtime - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - - _host = null - _cpuLimit = 0.0 - } - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt deleted file mode 100644 index ff36bef3..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.telemetry - -import org.opendc.experiments.compute.telemetry.table.HostTableReader -import org.opendc.experiments.compute.telemetry.table.ServerTableReader -import org.opendc.experiments.compute.telemetry.table.ServiceTableReader - -/** - * A monitor that tracks the metrics and events of the OpenDC Compute service. - */ -public interface ComputeMonitor { - /** - * Record an entry with the specified [reader]. - */ - public fun record(reader: ServerTableReader) {} - - /** - * Record an entry with the specified [reader]. - */ - public fun record(reader: HostTableReader) {} - - /** - * Record an entry with the specified [reader]. - */ - public fun record(reader: ServiceTableReader) {} -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt deleted file mode 100644 index 665611dd..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.telemetry - -import org.opendc.compute.service.ComputeService -import org.opendc.experiments.provisioner.ProvisioningContext -import org.opendc.experiments.provisioner.ProvisioningStep -import java.time.Duration - -/** - * A [ProvisioningStep] that provisions a [ComputeMetricReader] to periodically collect the metrics of a [ComputeService] - * and report them to a [ComputeMonitor]. - */ -public class ComputeMonitorProvisioningStep internal constructor( - private val serviceDomain: String, - private val monitor: ComputeMonitor, - private val exportInterval: Duration -) : ProvisioningStep { - override fun apply(ctx: ProvisioningContext): AutoCloseable { - val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } - val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval) - return AutoCloseable { metricReader.close() } - } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt deleted file mode 100644 index 84dd7a4f..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.telemetry.table - -/** - * Information about a host exposed to the telemetry service. - */ -public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt deleted file mode 100644 index 66ed0454..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.telemetry.table - -import java.time.Instant - -/** - * An interface that is used to read a row of a host trace entry. - */ -public interface HostTableReader { - - public fun copy(): HostTableReader - - public fun setValues(table: HostTableReader) - - /** - * The [HostInfo] of the host to which the row belongs to. - */ - public val host: HostInfo - - /** - * The timestamp of the current entry of the reader. - */ - public val timestamp: Instant - - /** - * The number of guests that are in a terminated state. - */ - public val guestsTerminated: Int - - /** - * The number of guests that are in a running state. - */ - public val guestsRunning: Int - - /** - * The number of guests that are in an error state. - */ - public val guestsError: Int - - /** - * The number of guests that are in an unknown state. - */ - public val guestsInvalid: Int - - /** - * The capacity of the CPUs in the host (in MHz). - */ - public val cpuLimit: Double - - /** - * The usage of all CPUs in the host (in MHz). - */ - public val cpuUsage: Double - - /** - * The demand of all vCPUs of the guests (in MHz) - */ - public val cpuDemand: Double - - /** - * The CPU utilization of the host. - */ - public val cpuUtilization: Double - - /** - * The duration (in seconds) that a CPU was active in the host. - */ - public val cpuActiveTime: Long - - /** - * The duration (in seconds) that a CPU was idle in the host. - */ - public val cpuIdleTime: Long - - /** - * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. - */ - public val cpuStealTime: Long - - /** - * The duration (in seconds) of CPU time that was lost due to interference. - */ - public val cpuLostTime: Long - - /** - * The current power usage of the host in W. - */ - public val powerUsage: Double - - /** - * The total power consumption of the host since last time in J. - */ - public val powerTotal: Double - - /** - * The uptime of the host since last time in ms. - */ - public val uptime: Long - - /** - * The downtime of the host since last time in ms. - */ - public val downtime: Long - - /** - * The [Instant] at which the host booted. - */ - public val bootTime: Instant? -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt deleted file mode 100644 index fc360fee..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.telemetry.table - -/** - * Static information about a server exposed to the telemetry service. - */ -public data class ServerInfo( - val id: String, - val name: String, - val type: String, - val arch: String, - val imageId: String, - val imageName: String, - val cpuCount: Int, - val memCapacity: Long -) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt deleted file mode 100644 index de3a884a..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.telemetry.table - -import java.time.Instant - -/** - * An interface that is used to read a row of a server trace entry. - */ -public interface ServerTableReader { - - public fun copy(): ServerTableReader - - public fun setValues(table: ServerTableReader) - - /** - * The timestamp of the current entry of the reader. - */ - public val timestamp: Instant - - /** - * The [ServerInfo] of the server to which the row belongs to. - */ - public val server: ServerInfo - - /** - * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. - */ - public val host: HostInfo? - - /** - * The uptime of the host since last time in ms. - */ - public val uptime: Long - - /** - * The downtime of the host since last time in ms. - */ - public val downtime: Long - - /** - * The [Instant] at which the server was enqueued for the scheduler. - */ - public val provisionTime: Instant? - - /** - * The [Instant] at which the server booted. - */ - public val bootTime: Instant? - - /** - * The capacity of the CPUs of the servers (in MHz). - */ - public val cpuLimit: Double - - /** - * The duration (in seconds) that a CPU was active in the server. - */ - public val cpuActiveTime: Long - - /** - * The duration (in seconds) that a CPU was idle in the server. - */ - public val cpuIdleTime: Long - - /** - * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. - */ - public val cpuStealTime: Long - - /** - * The duration (in seconds) of CPU time that was lost due to interference. - */ - public val cpuLostTime: Long -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt deleted file mode 100644 index e19d7c68..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.telemetry.table - -import java.time.Instant - -/** - * A trace entry for the compute service. - */ -public data class ServiceData( - val timestamp: Instant, - val hostsUp: Int, - val hostsDown: Int, - val serversTotal: Int, - val serversPending: Int, - val serversActive: Int, - val attemptsSuccess: Int, - val attemptsFailure: Int, - val attemptsError: Int -) - -/** - * Convert a [ServiceTableReader] into a persistent object. - */ -public fun ServiceTableReader.toServiceData(): ServiceData { - return ServiceData( - timestamp, - hostsUp, - hostsDown, - serversTotal, - serversPending, - serversActive, - attemptsSuccess, - attemptsFailure, - attemptsError - ) -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt deleted file mode 100644 index a077a476..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.telemetry.table - -import java.time.Instant - -/** - * An interface that is used to read a row of a service trace entry. - */ -public interface ServiceTableReader { - - public fun copy(): ServiceTableReader - - public fun setValues(table: ServiceTableReader) - - /** - * The timestamp of the current entry of the reader. - */ - public val timestamp: Instant - - /** - * The number of hosts that are up at this instant. - */ - public val hostsUp: Int - - /** - * The number of hosts that are down at this instant. - */ - public val hostsDown: Int - - /** - * The number of servers that are registered with the compute service.. - */ - public val serversTotal: Int - - /** - * The number of servers that are pending to be scheduled. - */ - public val serversPending: Int - - /** - * The number of servers that are currently active. - */ - public val serversActive: Int - - /** - * The scheduling attempts that were successful. - */ - public val attemptsSuccess: Int - - /** - * The scheduling attempts that were unsuccessful due to client error. - */ - public val attemptsFailure: Int - - /** - * The scheduling attempts that were unsuccessful due to scheduler error. - */ - public val attemptsError: Int -} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt deleted file mode 100644 index 08c3dca2..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.topology - -import org.opendc.simulator.compute.SimPsuFactories -import org.opendc.simulator.compute.SimPsuFactory -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory -import java.util.UUID - -/** - * Description of a physical host that will be simulated by OpenDC and host the virtual machines. - * - * @param uid Unique identifier of the host. - * @param name The name of the host. - * @param meta The metadata of the host. - * @param model The physical model of the machine. - * @param psuFactory The [SimPsuFactory] to construct the PSU that models the power consumption of the machine. - * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host. - */ -public data class HostSpec( - val uid: UUID, - val name: String, - val meta: Map<String, Any>, - val model: MachineModel, - val psuFactory: SimPsuFactory = SimPsuFactories.noop(), - val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer() -) diff --git a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt deleted file mode 100644 index 1cd9f20b..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.export.parquet - -import org.junit.jupiter.api.AfterEach -import java.nio.file.Files - -/** - * Test suite for [ParquetHostDataWriter] - */ -class HostDataWriterTest { - /** - * The path to write the data file to. - */ - private val path = Files.createTempFile("opendc", "parquet") - - /** - * The writer used to write the data. - */ - private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096) - - @AfterEach - fun tearDown() { - writer.close() - Files.deleteIfExists(path) - } - -// @Test -// fun testSmoke() { -// assertDoesNotThrow { -// writer.write(object : HostTableReader { -// override val timestamp: Instant = Instant.now() -// override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) -// override val guestsTerminated: Int = 0 -// override val guestsRunning: Int = 0 -// override val guestsError: Int = 0 -// override val guestsInvalid: Int = 0 -// override val cpuLimit: Double = 4096.0 -// override val cpuUsage: Double = 1.0 -// override val cpuDemand: Double = 1.0 -// override val cpuUtilization: Double = 0.0 -// override val cpuActiveTime: Long = 1 -// override val cpuIdleTime: Long = 1 -// override val cpuStealTime: Long = 1 -// override val cpuLostTime: Long = 1 -// override val powerUsage: Double = 1.0 -// override val powerTotal: Double = 1.0 -// override val uptime: Long = 1 -// override val downtime: Long = 1 -// override val bootTime: Instant? = null -// -// // override fun copy(): HostTableReader {return HostTableReader} -// -// override fun setValues(table: HostTableReader) {} -// }) -// } -// } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt deleted file mode 100644 index 21bc799f..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.export.parquet - -import org.junit.jupiter.api.AfterEach -import java.nio.file.Files - -/** - * Test suite for [ParquetServerDataWriter] - */ -class ServerDataWriterTest { - /** - * The path to write the data file to. - */ - private val path = Files.createTempFile("opendc", "parquet") - - /** - * The writer used to write the data. - */ - private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096) - - @AfterEach - fun tearDown() { - writer.close() - Files.deleteIfExists(path) - } - -// @Test -// fun testSmoke() { -// assertDoesNotThrow { -// writer.write(object : ServerTableReader { -// override val timestamp: Instant = Instant.now() -// override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096) -// override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) -// override val cpuLimit: Double = 4096.0 -// override val cpuActiveTime: Long = 1 -// override val cpuIdleTime: Long = 1 -// override val cpuStealTime: Long = 1 -// override val cpuLostTime: Long = 1 -// override val uptime: Long = 1 -// override val downtime: Long = 1 -// override val provisionTime: Instant = timestamp -// override val bootTime: Instant? = null -// }) -// } -// } -} diff --git a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt deleted file mode 100644 index 0cbb0812..00000000 --- a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.compute.export.parquet - -import org.junit.jupiter.api.AfterEach -import java.nio.file.Files - -/** - * Test suite for [ParquetServiceDataWriter] - */ -class ServiceDataWriterTest { - /** - * The path to write the data file to. - */ - private val path = Files.createTempFile("opendc", "parquet") - - /** - * The writer used to write the data. - */ - private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096) - - @AfterEach - fun tearDown() { - writer.close() - Files.deleteIfExists(path) - } - -// @Test -// fun testSmoke() { -// assertDoesNotThrow { -// writer.write(object : ServiceTableReader { -// override val timestamp: Instant = Instant.now() -// override val hostsUp: Int = 1 -// override val hostsDown: Int = 0 -// override val serversTotal: Int = 1 -// override val serversPending: Int = 1 -// override val serversActive: Int = 1 -// override val attemptsSuccess: Int = 1 -// override val attemptsFailure: Int = 0 -// override val attemptsError: Int = 0 -// }) -// } -// } -} |
