diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-19 14:33:05 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-19 14:33:05 +0200 |
| commit | 453c25c4b453fa0af26bebbd8863abfb79218119 (patch) | |
| tree | 7977e81ec34ce7f57d8b14a717ccb63f54cd03cb | |
| parent | c1b9719aad10566c9d17f9eb757236c58a602b89 (diff) | |
| parent | 76a0f8889a4990108bc7906556dec6381647404b (diff) | |
merge: Enable re-use of virtual machine workload helpers
This pull request enables re-use of virtual machine workload helpers by extracting the helpers into a
separate module which may be used by other experiments.
- Support workload/machine CPU count mismatch
- Extract common code out of Capelin experiments
- Support flexible topology creation
- Add option for optimizing SimHost simulation
- Support creating CPU-optimized topology
- Make workload sampling model extensible
- Add support for extended Bitbrains trace format
- Add support for Azure VM trace format
- Add support for internal OpenDC VM trace format
- Optimize OpenDC VM trace format
- Add tool for converting workload traces
- Remove dependency on SnakeYaml
**Breaking API Changes**
- `RESOURCE_NCPU` and `RESOURCE_STATE_NCPU` are renamed to `RESOURCE_CPU_COUNT` and `RESOURCE_STATE_CPU_COUNT` respectively.
97 files changed, 2008 insertions, 1332 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3f0e180b..82da905c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -20,7 +20,6 @@ parquet = "1.12.0" progressbar = "0.9.0" sentry = "5.1.2" slf4j = "1.7.32" -yaml = "1.29" [libraries] # Kotlin @@ -53,11 +52,11 @@ progressbar = { module = "me.tongfei:progressbar", version.ref = "progressbar" } # Format jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson" } +jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" } jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" } parquet = { module = "org.apache.parquet:parquet-avro", version.ref = "parquet" } -yaml = { module = "org.yaml:snakeyaml", version.ref = "yaml" } config = { module = "com.typesafe:config", version.ref = "config" } # HTTP client diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 793db907..ff55c585 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -68,7 +68,8 @@ public class SimHost( scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - interferenceDomain: VmInterferenceDomain? = null + interferenceDomain: VmInterferenceDomain? = null, + private val optimize: Boolean = false ) : Host, AutoCloseable { /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. @@ -98,7 +99,7 @@ public class SimHost( /** * The machine to run on. */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, powerDriver) + public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model.optimize(), powerDriver) /** * The hypervisor to run multiple workloads. @@ -319,6 +320,25 @@ public class SimHost( val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + return MachineModel(processingUnits, memoryUnits).optimize() + } + + /** + * Optimize the [MachineModel] for simulation. + */ + private fun MachineModel.optimize(): MachineModel { + if (!optimize) { + return this + } + + val originalCpu = cpus[0] + val freq = cpus.sumOf { it.frequency } + val processingNode = originalCpu.node.copy(coreCount = 1) + val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode)) + + val memorySize = memory.sumOf { it.size } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + return MachineModel(processingUnits, memoryUnits) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt index 87903623..fcd9dd7e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt @@ -24,8 +24,8 @@ package org.opendc.compute.simulator.failure import org.apache.commons.math3.distribution.RealDistribution import org.opendc.compute.simulator.SimHost +import java.util.* import kotlin.math.roundToInt -import kotlin.random.Random /** * A [VictimSelector] that stochastically selects a set of hosts to be failed. diff --git a/opendc-experiments/opendc-experiments-radice/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 0c716183..e82cf203 100644 --- a/opendc-experiments/opendc-experiments-radice/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -20,28 +20,28 @@ * SOFTWARE. */ -description = "Experiments for the Risk Analysis work" +description = "Support library for simulating VM-based workloads with OpenDC" /* Build configuration */ plugins { - `experiment-conventions` + `kotlin-library-conventions` `testing-conventions` } dependencies { api(platform(projects.opendcPlatform)) - api(projects.opendcHarness.opendcHarnessApi) - implementation(projects.opendcFormat) + api(projects.opendcCompute.opendcComputeSimulator) + + implementation(projects.opendcTrace.opendcTraceOpendc) + implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) + implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) - implementation(libs.config) - implementation(libs.progressbar) - implementation(libs.clikt) - - implementation(libs.parquet) - testImplementation(libs.log4j.slf4j) + implementation(libs.jackson.databind) + implementation(libs.jackson.module.kotlin) + implementation(kotlin("reflect")) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index 08304edc..78002c2f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -20,13 +20,16 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace +package org.opendc.compute.workload + +import java.util.* /** - * An interface for reading workloads into memory. - * - * This interface must guarantee that the entries are delivered in order of submission time. - * - * @param T The shape of the workloads supported by this reader. + * An interface that describes how a workload is resolved. */ -public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable +public interface ComputeWorkload { + /** + * Resolve the workload into a list of [VirtualMachine]s to simulate. + */ + public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index ca937328..c92b212f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,30 +20,42 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace +package org.opendc.compute.workload -import org.opendc.experiments.capelin.trace.bp.BPTraceFormat +import mu.KotlinLogging import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.trace.* +import org.opendc.trace.opendc.OdcVmTraceFormat import java.io.File -import java.util.UUID +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import kotlin.math.roundToLong /** - * A [TraceReader] for the internal VM workload trace format. + * A helper class for loading compute workload traces into memory. * - * @param path The directory of the traces. + * @param baseDir The directory containing the traces. */ -class RawParquetTraceReader(private val path: File) { +public class ComputeWorkloadLoader(private val baseDir: File) { /** - * The [Trace] that represents this trace. + * The logger for this instance. */ - private val trace = BPTraceFormat().open(path.toURI().toURL()) + private val logger = KotlinLogging.logger {} + + /** + * The [OdcVmTraceFormat] instance to load the traces + */ + private val format = OdcVmTraceFormat() + + /** + * The cache of workloads. + */ + private val cache = ConcurrentHashMap<String, List<VirtualMachine>>() /** * Read the fragments into memory. */ - private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> { + private fun parseFragments(trace: Trace): Map<String, List<SimTraceWorkload.Fragment>> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>() @@ -53,7 +65,7 @@ class RawParquetTraceReader(private val path: File) { val id = reader.get(RESOURCE_STATE_ID) val time = reader.get(RESOURCE_STATE_TIMESTAMP) val duration = reader.get(RESOURCE_STATE_DURATION) - val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val cores = reader.getInt(RESOURCE_STATE_CPU_COUNT) val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val fragment = SimTraceWorkload.Fragment( @@ -63,7 +75,7 @@ class RawParquetTraceReader(private val path: File) { cores ) - fragments.getOrPut(id) { mutableListOf() }.add(fragment) + fragments.computeIfAbsent(id) { mutableListOf() }.add(fragment) } fragments @@ -75,11 +87,11 @@ class RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { + private fun parseMeta(trace: Trace, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<VirtualMachine> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() var counter = 0 - val entries = mutableListOf<TraceEntry<SimWorkload>>() + val entries = mutableListOf<VirtualMachine>() return try { while (reader.nextRow()) { @@ -91,28 +103,30 @@ class RawParquetTraceReader(private val path: File) { val submissionTime = reader.get(RESOURCE_START_TIME) val endTime = reader.get(RESOURCE_STOP_TIME) - val maxCores = reader.getInt(RESOURCE_NCPUS) + val maxCores = reader.getInt(RESOURCE_CPU_COUNT) val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val workload = SimTraceWorkload(vmFragments) + val totalLoad = vmFragments.sumOf { (it.usage * it.duration) / 1000.0 } // avg MHz * duration = MFLOPs + entries.add( - TraceEntry( - uid, id, submissionTime.toEpochMilli(), workload, - mapOf( - "submit-time" to submissionTime.toEpochMilli(), - "end-time" to endTime.toEpochMilli(), - "total-load" to totalLoad, - "cores" to maxCores, - "required-memory" to requiredMemory.toLong(), - "workload" to workload - ) + VirtualMachine( + uid, + id, + maxCores, + requiredMemory.roundToLong(), + totalLoad, + submissionTime, + endTime, + vmFragments ) ) } + // Make sure the virtual machines are ordered by start time + entries.sortBy { it.startTime } + entries } catch (e: Exception) { e.printStackTrace() @@ -123,17 +137,24 @@ class RawParquetTraceReader(private val path: File) { } /** - * The entries in the trace. + * Load the trace with the specified [name]. */ - private val entries: List<TraceEntry<SimWorkload>> + public fun get(name: String): List<VirtualMachine> { + return cache.computeIfAbsent(name) { + val path = baseDir.resolve(it) + + logger.info { "Loading trace $it at $path" } - init { - val fragments = parseFragments() - entries = parseMeta(fragments) + val trace = format.open(path.toURI().toURL()) + val fragments = parseFragments(trace) + parseMeta(trace, fragments) + } } /** - * Read the entries in the trace. + * Clear the workload cache. */ - fun read(): List<TraceEntry<SimWorkload>> = entries + public fun reset() { + cache.clear() + } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt index 065a8c93..ed45bd8a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.util +package org.opendc.compute.workload import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer @@ -33,42 +33,42 @@ import kotlinx.coroutines.yield import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost -import org.opendc.experiments.capelin.env.MachineDef -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.compute.* import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock +import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max /** - * Helper class to manage a [ComputeService] simulation. + * Helper class to simulated VM-based workloads in OpenDC. + * + * @param context [CoroutineContext] to run the simulation in. + * @param clock [Clock] instance tracking simulation time. + * @param scheduler [ComputeScheduler] implementation to use for the service. + * @param failureModel A failure model to use for injecting failures. + * @param interferenceModel The model to use for performance interference. */ -class ComputeServiceSimulator( +public class ComputeWorkloadRunner( private val context: CoroutineContext, private val clock: Clock, scheduler: ComputeScheduler, - machines: List<MachineDef>, private val failureModel: FailureModel? = null, - interferenceModel: VmInterferenceModel? = null, - hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() + private val interferenceModel: VmInterferenceModel? = null, ) : AutoCloseable { /** * The [ComputeService] that has been configured by the manager. */ - val service: ComputeService + public val service: ComputeService /** * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. */ - val producers: List<MetricProducer> + public val producers: List<MetricProducer> get() = _metricProducers private val _metricProducers = mutableListOf<MetricProducer>() @@ -86,20 +86,14 @@ class ComputeServiceSimulator( val (service, serviceMeterProvider) = createService(scheduler) this._metricProducers.add(serviceMeterProvider) this.service = service - - for (def in machines) { - val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) - this._metricProducers.add(hostMeterProvider) - hosts.add(host) - this.service.addHost(host) - } } /** - * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. + * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. */ - suspend fun run(reader: TraceReader<SimWorkload>) { - val injector = failureModel?.createInjector(context, clock, service) + public suspend fun run(trace: List<VirtualMachine>, seed: Long) { + val random = Random(seed) + val injector = failureModel?.createInjector(context, clock, service, random) val client = service.newClient() // Create new image for the virtual machine @@ -112,35 +106,36 @@ class ComputeServiceSimulator( var offset = Long.MIN_VALUE - while (reader.hasNext()) { - val entry = reader.next() + for (entry in trace.sortedBy { it.startTime }) { + val now = clock.millis() + val start = entry.startTime.toEpochMilli() if (offset < 0) { - offset = entry.start - clock.millis() + offset = start - now } // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) + assert(start - offset >= 0) { "Invalid trace order" } + delay(max(0, (start - offset) - now)) launch { val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + val workload = SimTraceWorkload(entry.trace, workloadOffset) val server = client.newServer( entry.name, image, client.newFlavor( entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long + entry.cpuCount, + entry.memCapacity ), - meta = entry.meta + mapOf("workload" to workload) + meta = mapOf("workload" to workload) ) // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) + val endTime = entry.stopTime.toEpochMilli() + delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) // Delete the server after reaching the end-time of the virtual machine server.delete() @@ -151,11 +146,52 @@ class ComputeServiceSimulator( yield() } finally { injector?.close() - reader.close() client.close() } } + /** + * Register a host for this simulation. + * + * @param spec The definition of the host. + * @param optimize Merge the CPU resources of the host into a single CPU resource. + * @return The [SimHost] that has been constructed by the runner. + */ + public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { + val resource = Resource.builder() + .put(HOST_ID, spec.uid.toString()) + .put(HOST_NAME, spec.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, spec.model.cpus.size) + .put(HOST_MEM_CAPACITY, spec.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + _metricProducers.add(meterProvider) + + val host = SimHost( + spec.uid, + spec.name, + spec.model, + spec.meta, + context, + interpreter, + meterProvider, + spec.hypervisor, + powerDriver = spec.powerDriver, + interferenceDomain = interferenceModel?.newDomain(), + optimize = optimize + ) + + hosts.add(host) + service.addHost(host) + + return host + } + override fun close() { service.close() @@ -182,41 +218,4 @@ class ComputeServiceSimulator( val service = ComputeService(context, clock, meterProvider, scheduler) return service to meterProvider } - - /** - * Construct a [SimHost] instance for the specified [MachineDef]. - */ - private fun createHost( - def: MachineDef, - hypervisorProvider: SimHypervisorProvider, - interferenceModel: VmInterferenceModel? = null - ): Pair<SimHost, SdkMeterProvider> { - val resource = Resource.builder() - .put(HOST_ID, def.uid.toString()) - .put(HOST_NAME, def.name) - .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(HOST_NCPUS, def.model.cpus.size) - .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) - .build() - - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .build() - - val host = SimHost( - def.uid, - def.name, - def.model, - def.meta, - context, - interpreter, - meterProvider, - hypervisorProvider, - powerDriver = SimplePowerDriver(def.powerModel), - interferenceDomain = interferenceModel?.newDomain() - ) - - return host to meterProvider - } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt new file mode 100644 index 00000000..f58ce587 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeWorkloads") +package org.opendc.compute.workload + +import org.opendc.compute.workload.internal.CompositeComputeWorkload +import org.opendc.compute.workload.internal.HpcSampledComputeWorkload +import org.opendc.compute.workload.internal.LoadSampledComputeWorkload +import org.opendc.compute.workload.internal.TraceComputeWorkload + +/** + * Construct a workload from a trace. + */ +public fun trace(name: String): ComputeWorkload = TraceComputeWorkload(name) + +/** + * Construct a composite workload with the specified fractions. + */ +public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload { + return CompositeComputeWorkload(pairs.toMap()) +} + +/** + * Sample a workload by a [fraction] of the total load. + */ +public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload { + return LoadSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC VMs (count) + */ +public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC load + */ +public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction, sampleLoad = true) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt index 83393896..4d9ef15d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt @@ -20,19 +20,20 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.util +package org.opendc.compute.workload import org.opendc.compute.service.ComputeService import org.opendc.compute.simulator.failure.HostFaultInjector import java.time.Clock +import java.util.* import kotlin.coroutines.CoroutineContext /** * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. */ -interface FailureModel { +public interface FailureModel { /** * Construct a [HostFaultInjector] for the specified [service]. */ - fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector + public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt index 89b4a31c..be7120b9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt @@ -21,7 +21,7 @@ */ @file:JvmName("FailureModels") -package org.opendc.experiments.capelin +package org.opendc.compute.workload import org.apache.commons.math3.distribution.LogNormalDistribution import org.apache.commons.math3.random.Well19937c @@ -30,12 +30,11 @@ import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.failure.HostFaultInjector import org.opendc.compute.simulator.failure.StartStopHostFault import org.opendc.compute.simulator.failure.StochasticVictimSelector -import org.opendc.experiments.capelin.util.FailureModel import java.time.Clock import java.time.Duration +import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.ln -import kotlin.random.Random /** * Obtain a [FailureModel] based on the GRID'5000 failure trace. @@ -43,14 +42,15 @@ import kotlin.random.Random * This fault injector uses parameters from the GRID'5000 failure trace as described in * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. */ -fun grid5000(failureInterval: Duration, seed: Int): FailureModel { +public fun grid5000(failureInterval: Duration): FailureModel { return object : FailureModel { override fun createInjector( context: CoroutineContext, clock: Clock, - service: ComputeService + service: ComputeService, + random: Random ): HostFaultInjector { - val rng = Well19937c(seed) + val rng = Well19937c(random.nextLong()) val hosts = service.hosts.map { it as SimHost }.toSet() // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 @@ -60,7 +60,7 @@ fun grid5000(failureInterval: Duration, seed: Int): FailureModel { clock, hosts, iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random), fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) ) } @@ -68,30 +68,3 @@ fun grid5000(failureInterval: Duration, seed: Int): FailureModel { override fun toString(): String = "Grid5000FailureModel" } } - -/** - * Obtain the [HostFaultInjector] to use for the experiments. - * - * This fault injector uses parameters from the GRID'5000 failure trace as described in - * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. - */ -fun createFaultInjector( - context: CoroutineContext, - clock: Clock, - hosts: Set<SimHost>, - seed: Int, - failureInterval: Double -): HostFaultInjector { - val rng = Well19937c(seed) - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 303a6a8c..40484b68 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,23 +20,30 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace +package org.opendc.compute.workload -import java.util.UUID +import org.opendc.simulator.compute.workload.SimTraceWorkload +import java.time.Instant +import java.util.* /** - * An entry in a workload trace. + * A virtual machine workload. * - * @param uid The unique identifier of the entry. - * @param name The name of the entry. - * @param start The start time of the workload. - * @param workload The workload of the entry. - * @param meta The meta-data associated with the workload. + * @param uid The unique identifier of the virtual machine. + * @param name The name of the virtual machine. + * @param cpuCount The number of vCPUs in the VM. + * @param memCapacity The provisioned memory for the VM. + * @param startTime The start time of the VM. + * @param stopTime The stop time of the VM. + * @param trace The trace fragments that belong to this VM. */ -public data class TraceEntry<out T>( +public data class VirtualMachine( val uid: UUID, val name: String, - val start: Long, - val workload: T, - val meta: Map<String, Any> + val cpuCount: Int, + val memCapacity: Long, + val totalLoad: Double, + val startTime: Instant, + val stopTime: Instant, + val trace: Sequence<SimTraceWorkload.Fragment>, ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt index e3d15c3b..4172d729 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.export.parquet +package org.opendc.compute.workload.export.parquet import mu.KotlinLogging import org.apache.avro.Schema @@ -39,7 +39,7 @@ import kotlin.concurrent.thread /** * A writer that writes data in Parquet format. */ -abstract class ParquetDataWriter<in T>( +public abstract class ParquetDataWriter<in T>( path: File, private val schema: Schema, bufferSize: Int = 4096 @@ -113,7 +113,7 @@ abstract class ParquetDataWriter<in T>( /** * Write the specified metrics to the database. */ - fun write(data: T) { + public fun write(data: T) { val exception = exception if (exception != null) { throw IllegalStateException("Writer thread failed", exception) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt index b057e932..f41a2241 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.export.parquet +package org.opendc.compute.workload.export.parquet import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData @@ -31,7 +31,7 @@ import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { +public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { private val serverWriter = ParquetServerDataWriter( File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 58388cb1..37066a0d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.export.parquet +package org.opendc.compute.workload.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder @@ -29,6 +29,9 @@ import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.HostData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.UUID_SCHEMA +import org.opendc.trace.util.parquet.optional import java.io.File /** @@ -74,7 +77,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : override fun toString(): String = "host-writer" - companion object { + private companion object { private val SCHEMA: Schema = SchemaBuilder .record("host") .namespace("org.opendc.telemetry.compute") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 43b5f469..bea23d32 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.export.parquet +package org.opendc.compute.workload.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder @@ -29,8 +29,10 @@ import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.ServerData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.UUID_SCHEMA +import org.opendc.trace.util.parquet.optional import java.io.File -import java.util.* /** * A Parquet event writer for [ServerData]s. @@ -71,7 +73,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : override fun toString(): String = "server-writer" - companion object { + private companion object { private val SCHEMA: Schema = SchemaBuilder .record("server") .namespace("org.opendc.telemetry.compute") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 2928f445..47824b29 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,13 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.export.parquet +package org.opendc.compute.workload.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecordBuilder import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import java.io.File /** @@ -47,7 +48,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : override fun toString(): String = "service-writer" - companion object { + private companion object { private val SCHEMA: Schema = SchemaBuilder .record("service") .namespace("org.opendc.telemetry.compute") diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt new file mode 100644 index 00000000..9b2bec55 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads. + */ +internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } + + val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } + + val res = mutableListOf<VirtualMachine>() + + for ((fraction, vms) in traces) { + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + } + + val vmCount = traces.sumOf { (_, vms) -> vms.size } + logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt new file mode 100644 index 00000000..52f4c672 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples HPC VMs in the workload. + * + * @param fraction The fraction of load/virtual machines to sample + * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs. + */ +internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + /** + * The pattern to match compute nodes in the workload. + */ + private val pattern = Regex("^(ComputeNode|cn).*") + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + + val (hpc, nonHpc) = vms.partition { entry -> + val name = entry.name + name.matches(pattern) + } + + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + hpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + nonHpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } + + val totalLoad = vms.sumOf { it.totalLoad } + + logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 + + val res = mutableListOf<VirtualMachine>() + + if (sampleLoad) { + var currentLoad = 0.0 + for (entry in hpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + hpcLoad += entryLoad + hpcCount += 1 + currentLoad += entryLoad + res += entry + } + + for (entry in nonHpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > 1) { + break + } + + nonHpcLoad += entryLoad + nonHpcCount += 1 + currentLoad += entryLoad + res += entry + } + } else { + hpcSequence + .take((fraction * vms.size).toInt()) + .forEach { entry -> + hpcLoad += entry.totalLoad + hpcCount += 1 + res.add(entry) + } + + nonHpcSequence + .take(((1 - fraction) * vms.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.totalLoad + nonHpcCount += 1 + res.add(entry) + } + } + + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } + + /** + * Sample a random trace entry. + */ + private fun sample(entry: VirtualMachine, i: Int): VirtualMachine { + val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) + return entry.copy(uid = uid) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt new file mode 100644 index 00000000..ef6de729 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that is sampled based on total load. + */ +internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + val res = mutableListOf<VirtualMachine>() + + val totalLoad = vms.sumOf { it.totalLoad } + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt new file mode 100644 index 00000000..d657ff01 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] from a trace. + */ +internal class TraceComputeWorkload(val name: String) : ComputeWorkload { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + return loader.get(name) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt new file mode 100644 index 00000000..f3dc1e9e --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.topology + +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerDriver +import java.util.* + +/** + * Description of a physical host that will be simulated by OpenDC and host the virtual machines. + * + * @param uid Unique identifier of the host. + * @param name The name of the host. + * @param meta The metadata of the host. + * @param model The physical model of the machine. + * @param powerDriver The [PowerDriver] to model the power consumption of the machine. + * @param hypervisor The hypervisor implementation to use. + */ +public data class HostSpec( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: MachineModel, + val powerDriver: PowerDriver, + val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider() +) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt index a968b043..3b8dc918 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt @@ -20,16 +20,14 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.env - -import java.io.Closeable +package org.opendc.compute.workload.topology /** - * An interface for reading descriptions of topology environments into memory. + * Representation of the environment of the compute service, describing the physical details of every host. */ -public interface EnvironmentReader : Closeable { +public interface Topology { /** - * Read the environment into a list. + * Resolve the [Topology] into a list of [HostSpec]s. */ - public fun read(): List<MachineDef> + public fun resolve(): List<HostSpec> } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt new file mode 100644 index 00000000..74f9a1f8 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TopologyHelpers") +package org.opendc.compute.workload.topology + +import org.opendc.compute.workload.ComputeWorkloadRunner + +/** + * Apply the specified [topology] to the given [ComputeWorkloadRunner]. + */ +public fun ComputeWorkloadRunner.apply(topology: Topology, optimize: Boolean = false) { + val hosts = topology.resolve() + for (spec in hosts) { + registerHost(spec, optimize) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt index 9549af42..67f9626c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt @@ -20,19 +20,20 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace +package org.opendc.compute.workload.util import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup +import java.io.File import java.io.InputStream /** * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. */ -class PerformanceInterferenceReader { +public class PerformanceInterferenceReader { /** * The [ObjectMapper] to use. */ @@ -43,10 +44,17 @@ class PerformanceInterferenceReader { } /** + * Read the performance interface model from [file]. + */ + public fun read(file: File): List<VmInterferenceGroup> { + return mapper.readValue(file) + } + + /** * Read the performance interface model from the input. */ - fun read(input: InputStream): List<VmInterferenceGroup> { - return input.use { mapper.readValue(input) } + public fun read(input: InputStream): List<VmInterferenceGroup> { + return mapper.readValue(input) } private data class GroupMixin( diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt index fbc39b87..c79f0584 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace +package org.opendc.compute.workload.util import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json index 1be5852b..1be5852b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json +++ b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 7dadd14d..4bcbaf61 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -31,6 +31,8 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) + api(projects.opendcCompute.opendcComputeWorkload) + implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) @@ -38,16 +40,14 @@ dependencies { implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(projects.opendcTelemetry.opendcTelemetryCompute) - implementation(libs.opentelemetry.semconv) - implementation(libs.kotlin.logging) implementation(libs.config) - implementation(libs.progressbar) - implementation(libs.clikt) + implementation(libs.kotlin.logging) + implementation(libs.jackson.databind) implementation(libs.jackson.module.kotlin) implementation(libs.jackson.dataformat.csv) implementation(kotlin("reflect")) + implementation(libs.opentelemetry.semconv) - implementation(libs.parquet) testImplementation(libs.log4j.slf4j) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt index faabe5cb..31e8f961 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt @@ -22,7 +22,8 @@ package org.opendc.experiments.capelin -import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.compute.workload.composite +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") { ) override val workload: Workload by anyOf( - CompositeWorkload( + Workload( "all-azure", - listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), - totalSampleLoad + composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0) ), - CompositeWorkload( + Workload( "solvinity-25-azure-75", - listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), - totalSampleLoad + composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75) ), - CompositeWorkload( + Workload( "solvinity-50-azure-50", - listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), - totalSampleLoad + composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5) ), - CompositeWorkload( + Workload( "solvinity-75-azure-25", - listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), - totalSampleLoad + composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25) ), - CompositeWorkload( + Workload( "all-solvinity", - listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), - totalSampleLoad + composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0) ) ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt index e1cf8517..cd093e6c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt index a995e467..73e59a58 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt @@ -22,8 +22,10 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByHpc +import org.opendc.compute.workload.sampleByHpcLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.harness.dsl.anyOf @@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD) + Workload("solvinity", trace("solvinity").sampleByHpc(0.0)), + Workload("solvinity", trace("solvinity").sampleByHpc(0.25)), + Workload("solvinity", trace("solvinity").sampleByHpc(0.5)), + Workload("solvinity", trace("solvinity").sampleByHpc(1.0)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt index 49559e0e..9d5717bb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt index 1aac4f9e..7ab586b3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 6261ebbf..630b76c4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -24,16 +24,16 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory import mu.KotlinLogging -import org.opendc.experiments.capelin.env.ClusterEnvironmentReader -import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor -import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.ComputeWorkloadRunner +import org.opendc.compute.workload.export.parquet.ParquetExportMonitor +import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.topology.apply +import org.opendc.compute.workload.util.PerformanceInterferenceReader import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.trace.ParquetTraceReader -import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader -import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf @@ -43,10 +43,8 @@ import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File -import java.io.FileInputStream import java.time.Duration import java.util.* -import java.util.concurrent.ConcurrentHashMap import kotlin.math.roundToLong /** @@ -91,33 +89,19 @@ abstract class Portfolio(name: String) : Experiment(name) { abstract val allocationPolicy: String /** - * A map of trace readers. + * A helper class to load workload traces. */ - private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>() + private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path"))) /** * Perform a single trial for this portfolio. */ override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) - val workload = workload - val workloadNames = if (workload is CompositeWorkload) { - workload.workloads.map { it.name } - } else { - listOf(workload.name) - } - val rawReaders = workloadNames.map { workloadName -> - traceReaders.computeIfAbsent(workloadName) { - logger.info { "Loading trace $workloadName" } - RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) - } - } - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() - .read(FileInputStream(config.getString("interference-model"))) + .read(File(config.getString("interference-model"))) .let { VmInterferenceModel(it, Random(seeder.nextLong())) } else null @@ -125,14 +109,13 @@ abstract class Portfolio(name: String) : Experiment(name) { val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) val failureModel = if (operationalPhenomena.failureFrequency > 0) - grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null - val simulator = ComputeServiceSimulator( + val runner = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, - environment.read(), failureModel, performanceInterferenceModel ) @@ -142,17 +125,22 @@ abstract class Portfolio(name: String) : Experiment(name) { "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) + val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) try { - simulator.run(trace) + // Instantiate the desired topology + runner.apply(topology) + + // Run the workload trace + runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { - simulator.close() + runner.close() metricReader.close() monitor.close() } - val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val monitorResults = collectServiceMetrics(clock.instant(), runner.producers[0]) logger.debug { "Scheduler " + "Success=${monitorResults.attemptsSuccess} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt index b6d3b30c..17ec48d4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") { ) override val workload: Workload by anyOf( - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity")) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt index 90840db8..98eb989d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") { ) override val workload: Workload by anyOf( - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity")) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt deleted file mode 100644 index babd8ada..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.env - -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.LinearPowerModel -import java.io.File -import java.io.FileInputStream -import java.io.InputStream -import java.util.* - -/** - * A [EnvironmentReader] for the internal environment format. - * - * @param input The input stream describing the physical cluster. - */ -class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader { - /** - * Construct a [ClusterEnvironmentReader] for the specified [file]. - */ - constructor(file: File) : this(FileInputStream(file)) - - override fun read(): List<MachineDef> { - var clusterIdCol = 0 - var speedCol = 0 - var numberOfHostsCol = 0 - var memoryPerHostCol = 0 - var coresPerHostCol = 0 - - var clusterIdx = 0 - var clusterId: String - var speed: Double - var numberOfHosts: Int - var memoryPerHost: Long - var coresPerHost: Int - - val nodes = mutableListOf<MachineDef>() - val random = Random(0) - - input.bufferedReader().use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the file - !line.startsWith("#") && line.isNotBlank() - } - .forEachIndexed { idx, line -> - val values = line.split(";") - - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - clusterIdCol = header["ClusterID"]!! - speedCol = header["Speed"]!! - numberOfHostsCol = header["numberOfHosts"]!! - memoryPerHostCol = header["memoryCapacityPerHost"]!! - coresPerHostCol = header["coreCountPerHost"]!! - return@forEachIndexed - } - - clusterIdx++ - clusterId = values[clusterIdCol].trim() - speed = values[speedCol].trim().toDouble() * 1000.0 - numberOfHosts = values[numberOfHostsCol].trim().toInt() - memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L - coresPerHost = values[coresPerHostCol].trim().toInt() - - val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) - val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) - - repeat(numberOfHosts) { - nodes.add( - MachineDef( - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$it", - mapOf("cluster" to clusterId), - MachineModel( - List(coresPerHost) { coreId -> - ProcessingUnit(unknownProcessingNode, coreId, speed) - }, - listOf(unknownMemoryUnit) - ), - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearPowerModel(350.0, idlePower = 200.0) - ) - ) - } - } - } - - return nodes - } - - override fun close() { - input.close() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt index c4ddd158..a2e71243 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt @@ -22,23 +22,12 @@ package org.opendc.experiments.capelin.model -public enum class SamplingStrategy { - REGULAR, - HPC, - HPC_LOAD -} +import org.opendc.compute.workload.ComputeWorkload /** - * A workload that is considered for a scenario. - */ -public open class Workload( - public open val name: String, - public val fraction: Double, - public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR -) - -/** - * A workload that is composed of multiple workloads. + * A single workload originating from a trace. + * + * @param name the name of the workload. + * @param source The source of the workload data. */ -public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) : - Workload(name, -1.0) +data class Workload(val name: String, val source: ComputeWorkload) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt new file mode 100644 index 00000000..b8b65d28 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.topology + +/** + * Definition of a compute cluster modeled in the simulation. + * + * @param id A unique identifier representing the compute cluster. + * @param name The name of the cluster. + * @param cpuCount The total number of CPUs in the cluster. + * @param cpuSpeed The speed of a CPU in the cluster in MHz. + * @param memCapacity The total memory capacity of the cluster (in MiB). + * @param hostCount The number of hosts in the cluster. + * @param memCapacityPerHost The memory capacity per host in the cluster (MiB). + * @param cpuCountPerHost The number of CPUs per host in the cluster. + */ +public data class ClusterSpec( + val id: String, + val name: String, + val cpuCount: Int, + val cpuSpeed: Double, + val memCapacity: Double, + val hostCount: Int, + val memCapacityPerHost: Double, + val cpuCountPerHost: Int +) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt new file mode 100644 index 00000000..5a175f2c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.topology + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.MappingIterator +import com.fasterxml.jackson.databind.ObjectReader +import com.fasterxml.jackson.dataformat.csv.CsvMapper +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import java.io.File +import java.io.InputStream + +/** + * A helper class for reading a cluster specification file. + */ +class ClusterSpecReader { + /** + * The [CsvMapper] to map the environment file to an object. + */ + private val mapper = CsvMapper() + + /** + * The [ObjectReader] to convert the lines into objects. + */ + private val reader: ObjectReader = mapper.readerFor(Entry::class.java).with(schema) + + /** + * Read the specified [file]. + */ + fun read(file: File): List<ClusterSpec> { + return reader.readValues<Entry>(file).use { read(it) } + } + + /** + * Read the specified [input]. + */ + fun read(input: InputStream): List<ClusterSpec> { + return reader.readValues<Entry>(input).use { read(it) } + } + + /** + * Convert the specified [MappingIterator] into a list of [ClusterSpec]s. + */ + private fun read(it: MappingIterator<Entry>): List<ClusterSpec> { + val result = mutableListOf<ClusterSpec>() + + for (entry in it) { + val def = ClusterSpec( + entry.id, + entry.name, + entry.cpuCount, + entry.cpuSpeed * 1000, // Convert to MHz + entry.memCapacity * 1000, // Convert to MiB + entry.hostCount, + entry.memCapacityPerHost * 1000, + entry.cpuCountPerHost + ) + result.add(def) + } + + return result + } + + private open class Entry( + @JsonProperty("ClusterID") + val id: String, + @JsonProperty("ClusterName") + val name: String, + @JsonProperty("Cores") + val cpuCount: Int, + @JsonProperty("Speed") + val cpuSpeed: Double, + @JsonProperty("Memory") + val memCapacity: Double, + @JsonProperty("numberOfHosts") + val hostCount: Int, + @JsonProperty("memoryCapacityPerHost") + val memCapacityPerHost: Double, + @JsonProperty("coreCountPerHost") + val cpuCountPerHost: Int + ) + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("ClusterID", CsvSchema.ColumnType.STRING) + .addColumn("ClusterName", CsvSchema.ColumnType.STRING) + .addColumn("Cores", CsvSchema.ColumnType.NUMBER) + .addColumn("Speed", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory", CsvSchema.ColumnType.NUMBER) + .addColumn("numberOfHosts", CsvSchema.ColumnType.NUMBER) + .addColumn("memoryCapacityPerHost", CsvSchema.ColumnType.NUMBER) + .addColumn("coreCountPerHost", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .setColumnSeparator(';') + .setUseHeader(true) + .build() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt new file mode 100644 index 00000000..5ab4261a --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TopologyFactories") +package org.opendc.experiments.capelin.topology + +import org.opendc.compute.workload.topology.HostSpec +import org.opendc.compute.workload.topology.Topology +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.LinearPowerModel +import org.opendc.simulator.compute.power.PowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import java.io.File +import java.io.InputStream +import java.util.* +import kotlin.math.roundToLong + +/** + * A [ClusterSpecReader] that is used to read the cluster definition file. + */ +private val reader = ClusterSpecReader() + +/** + * Construct a [Topology] from the specified [file]. + */ +fun clusterTopology( + file: File, + powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0), + random: Random = Random(0) +): Topology = clusterTopology(reader.read(file), powerModel, random) + +/** + * Construct a [Topology] from the specified [input]. + */ +fun clusterTopology( + input: InputStream, + powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0), + random: Random = Random(0) +): Topology = clusterTopology(reader.read(input), powerModel, random) + +/** + * Construct a [Topology] from the given list of [clusters]. + */ +fun clusterTopology( + clusters: List<ClusterSpec>, + powerModel: PowerModel, + random: Random = Random(0) +): Topology { + return object : Topology { + override fun resolve(): List<HostSpec> { + val hosts = mutableListOf<HostSpec>() + for (cluster in clusters) { + val cpuSpeed = cluster.cpuSpeed + val memoryPerHost = cluster.memCapacityPerHost.roundToLong() + + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cluster.cpuCountPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + val machineModel = MachineModel( + List(cluster.cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) }, + listOf(unknownMemoryUnit) + ) + + repeat(cluster.hostCount) { + val spec = HostSpec( + UUID(random.nextLong(), it.toLong()), + "node-${cluster.name}-$it", + mapOf("cluster" to cluster.id), + machineModel, + SimplePowerDriver(powerModel) + ) + + hosts += spec + } + } + + return hosts + } + + override fun toString(): String = "ClusterSpecTopology" + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt deleted file mode 100644 index 0bf4ada6..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.Workload -import org.opendc.simulator.compute.workload.SimWorkload - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param rawReaders The internal raw trace readers to use. - * @param workload The workload to read. - * @param seed The seed to use for sampling. - */ -public class ParquetTraceReader( - rawReaders: List<RawParquetTraceReader>, - workload: Workload, - seed: Int -) : TraceReader<SimWorkload> { - /** - * The iterator over the actual trace. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> = - rawReaders - .map { it.read() } - .run { - if (workload is CompositeWorkload) { - this.zip(workload.workloads) - } else { - this.zip(listOf(workload)) - } - } - .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) } - .iterator() - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt deleted file mode 100644 index ed82217d..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.filter2.predicate.Statistics -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.trace.util.parquet.LocalInputFile -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -/** - * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. - * - * @param traceFile The directory of the traces. - * @param selectedVms The list of VMs to read from the trace. - */ -class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> { - private val logger = KotlinLogging.logger {} - - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * The intermediate buffer to store the read records in. - */ - private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024) - - /** - * An optional filter for filtering the selected VMs - */ - private val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get( - FilterApi.userDefined( - FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) - ) - ) - ) - - /** - * A poisonous fragment. - */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0)) - - /** - * The thread to read the records in. - */ - private val readerThread = thread(start = true, name = "sc20-reader") { - val reader = AvroParquetReader - .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - try { - while (true) { - val record = reader.read() - - if (record == null) { - queue.put(poison) - break - } - - val id = record["id"].toString() - val time = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - - val fragment = SimTraceWorkload.Fragment( - time, - duration, - cpuUsage, - cores - ) - - queue.put(id to fragment) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - reader.close() - } - } - - /** - * Fill the buffers with the VMs - */ - private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) { - if (!hasNext) { - return - } - - val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>() - queue.drainTo(fragments) - - for ((id, fragment) in fragments) { - if (id == poison.first) { - hasNext = false - return - } - buffers[id]?.forEach { it.add(fragment) } - } - } - - /** - * A flag to indicate whether the reader has more entries. - */ - private var hasNext: Boolean = true - - /** - * Initialize the reader. - */ - init { - val takenIds = mutableSetOf<UUID>() - val entries = mutableMapOf<String, GenericData.Record>() - val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() - - val metaReader = AvroParquetReader - .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - while (true) { - val record = metaReader.read() ?: break - val id = record["id"].toString() - entries[id] = record - } - - metaReader.close() - - val selection = selectedVms.ifEmpty { entries.keys } - - // Create the entry iterator - iterator = selection.asSequence() - .mapNotNull { entries[it] } - .mapIndexed { index, record -> - val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) - - assert(uid !in takenIds) - takenIds += uid - - logger.info { "Processing VM $id" } - - val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>() - val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>() - buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { - var time = submissionTime - repeat@ while (true) { - if (externalBuffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break - } - } - - internalBuffer.addAll(externalBuffer) - externalBuffer.clear() - - for (fragment in internalBuffer) { - yield(fragment) - - time += fragment.duration - if (time >= endTime) { - break@repeat - } - } - - internalBuffer.clear() - } - - buffers.remove(id) - } - val workload = SimTraceWorkload(fragments) - val meta = mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - - TraceEntry(uid, id, submissionTime, workload, meta) - } - .sortedBy { it.start } - .toList() - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() { - readerThread.interrupt() - } - - private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable { - override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) - - override fun canDrop(statistics: Statistics<Binary>): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() - } - - override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() - } - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt deleted file mode 100644 index cb32ce88..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.SamplingStrategy -import org.opendc.experiments.capelin.model.Workload -import org.opendc.simulator.compute.workload.SimWorkload -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Sample the workload for the specified [run]. - */ -public fun sampleWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List<TraceEntry<SimWorkload>> { - return when { - workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) - workload.samplingStrategy == SamplingStrategy.HPC -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false) - workload.samplingStrategy == SamplingStrategy.HPC_LOAD -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true) - else -> - sampleRegularWorkload(trace, workload, workload, seed) - } -} - -/** - * Sample a regular (non-HPC) workload. - */ -public fun sampleRegularWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List<TraceEntry<SimWorkload>> { - val fraction = subWorkload.fraction - - val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<SimWorkload>>() - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - shuffled.sumOf { it.meta.getValue("total-load") as Double } - } - var currentLoad = 0.0 - - for (entry in shuffled) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a HPC workload. - */ -public fun sampleHpcWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - seed: Int, - sampleOnLoad: Boolean -): List<TraceEntry<SimWorkload>> { - val pattern = Regex("^vm__workload__(ComputeNode|cn).*") - val random = Random(seed) - - val fraction = workload.fraction - val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.name - name.matches(pattern) - } - - val hpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<TraceEntry<SimWorkload>>() - hpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - val nonHpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<TraceEntry<SimWorkload>>() - nonHpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } - - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - trace.sumOf { it.meta.getValue("total-load") as Double } - } - - logger.debug { "Total trace load: $totalLoad" } - var hpcCount = 0 - var hpcLoad = 0.0 - var nonHpcCount = 0 - var nonHpcLoad = 0.0 - - val res = mutableListOf<TraceEntry<SimWorkload>>() - - if (sampleOnLoad) { - var currentLoad = 0.0 - for (entry in hpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - hpcLoad += entryLoad - hpcCount += 1 - currentLoad += entryLoad - res += entry - } - - for (entry in nonHpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > 1) { - break - } - - nonHpcLoad += entryLoad - nonHpcCount += 1 - currentLoad += entryLoad - res += entry - } - } else { - hpcSequence - .take((fraction * trace.size).toInt()) - .forEach { entry -> - hpcLoad += entry.meta.getValue("total-load") as Double - hpcCount += 1 - res.add(entry) - } - - nonHpcSequence - .take(((1 - fraction) * trace.size).toInt()) - .forEach { entry -> - nonHpcLoad += entry.meta.getValue("total-load") as Double - nonHpcCount += 1 - res.add(entry) - } - } - - logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } - logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a random trace entry. - */ -private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> { - val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) - return entry.copy(uid = uid) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt deleted file mode 100644 index 7dd8161d..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder - -/** - * Schema for the resources table in the trace. - */ -val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder - .record("meta") - .namespace("org.opendc.trace.capelin") - .fields() - .requiredString("id") - .requiredLong("submissionTime") - .requiredLong("endTime") - .requiredInt("maxCores") - .requiredLong("requiredMemory") - .endRecord() - -/** - * Schema for the resource states table in the trace. - */ -val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder - .record("meta") - .namespace("org.opendc.trace.capelin") - .fields() - .requiredString("id") - .requiredLong("time") - .requiredLong("duration") - .requiredInt("cores") - .requiredDouble("cpuUsage") - .requiredLong("flops") - .endRecord() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt index b55bd577..67de2777 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace +package org.opendc.experiments.capelin.util import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml index d1c01b8e..d46b50c3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml +++ b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml @@ -36,7 +36,7 @@ <Logger name="org.opendc.experiments.capelin" level="info" additivity="false"> <AppenderRef ref="Console"/> </Logger> - <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false"> + <Logger name="org.opendc.experiments.vm.trace" level="debug" additivity="false"> <AppenderRef ref="Console"/> </Logger> <Logger name="org.apache.hadoop" level="warn" additivity="false"> diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 727530e3..ac2ea646 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -31,16 +31,12 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.experiments.capelin.env.ClusterEnvironmentReader -import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.trace.ParquetTraceReader -import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader -import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.compute.workload.* +import org.opendc.compute.workload.topology.Topology +import org.opendc.compute.workload.topology.apply +import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor @@ -66,6 +62,11 @@ class CapelinIntegrationTest { private lateinit var computeScheduler: FilterScheduler /** + * The [ComputeWorkloadLoader] responsible for loading the traces. + */ + private lateinit var workloadLoader: ComputeWorkloadLoader + + /** * Setup the experimental environment. */ @BeforeEach @@ -75,6 +76,7 @@ class CapelinIntegrationTest { filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) ) + workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace")) } /** @@ -82,26 +84,24 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val traceReader = createTestTraceReader() - val environmentReader = createTestEnvironmentReader() - - val simulator = ComputeServiceSimulator( + val workload = createTestWorkload(1.0) + val runner = ComputeWorkloadRunner( coroutineContext, clock, - computeScheduler, - environmentReader.read(), + computeScheduler ) - - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val topology = createTopology() + val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) try { - simulator.run(traceReader) + runner.apply(topology) + runner.run(workload, 0) } finally { - simulator.close() + runner.close() metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -117,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(223331032, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(67006568, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3159379, monitor.stealTime) { "Incorrect steal time" } }, { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(5.841120890240688E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -131,20 +131,19 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val traceReader = createTestTraceReader(0.25, seed) - val environmentReader = createTestEnvironmentReader("single") + val workload = createTestWorkload(0.25, seed) - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, - computeScheduler, - environmentReader.read(), + computeScheduler ) - + val topology = createTopology("single") val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) try { - simulator.run(traceReader) + simulator.apply(topology) + simulator.run(workload, seed.toLong()) } finally { simulator.close() metricReader.close() @@ -162,9 +161,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(10998110, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9740290, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -174,28 +173,26 @@ class CapelinIntegrationTest { */ @Test fun testInterference() = runBlockingSimulation { - val seed = 1 - val traceReader = createTestTraceReader(0.25, seed) - val environmentReader = createTestEnvironmentReader("single") - + val seed = 0 + val workload = createTestWorkload(1.0, seed) val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) val performanceInterferenceModel = PerformanceInterferenceReader() .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, - environmentReader.read(), interferenceModel = performanceInterferenceModel ) - + val topology = createTopology("single") val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) try { - simulator.run(traceReader) + simulator.apply(topology) + simulator.run(workload, seed.toLong()) } finally { simulator.close() metricReader.close() @@ -213,10 +210,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(6013899, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14724501, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12530742, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(473394, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -226,21 +223,19 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val traceReader = createTestTraceReader(0.25, seed) - val environmentReader = createTestEnvironmentReader("single") - - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, - environmentReader.read(), - grid5000(Duration.ofDays(7), seed) + grid5000(Duration.ofDays(7)) ) - + val topology = createTopology("single") + val workload = createTestWorkload(0.25, seed) val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) try { - simulator.run(traceReader) + simulator.apply(topology) + simulator.run(workload, seed.toLong()) } finally { simulator.close() metricReader.close() @@ -258,31 +253,28 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(11134319, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9604081, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } } + { assertEquals(2559005056, monitor.uptime) { "Uptime incorrect" } } ) } /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { - return ParquetTraceReader( - listOf(RawParquetTraceReader(File("src/test/resources/trace"))), - Workload("test", fraction), - seed - ) + private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> { + val source = trace("bitbrains-small").sampleByLoad(fraction) + return source.resolve(workloadLoader, Random(seed.toLong())) } /** - * Obtain the environment reader for the test. + * Obtain the topology factory for the test. */ - private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { + private fun createTopology(name: String = "topology"): Topology { val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt")) - return ClusterEnvironmentReader(stream) + return stream.use { clusterTopology(stream) } } class TestExperimentReporter : ComputeMonitor { diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet Binary files differnew file mode 100644 index 00000000..da6e5330 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet Binary files differnew file mode 100644 index 00000000..fe0a254c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet Binary files differdeleted file mode 100644 index ee76d38f..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet +++ /dev/null diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet Binary files differdeleted file mode 100644 index 9b1cde13..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet +++ /dev/null diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index 74384480..7d06ee62 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -36,5 +36,4 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorNetwork) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcUtils) - implementation(libs.yaml) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/InterpolationPowerModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/InterpolationPowerModel.kt index 0c995f06..2694700c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/InterpolationPowerModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/InterpolationPowerModel.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute.power -import org.yaml.snakeyaml.Yaml import kotlin.math.ceil import kotlin.math.floor import kotlin.math.max @@ -37,8 +36,6 @@ import kotlin.math.min * @see <a href="http://www.spec.org/power_ssj2008/results/res2011q1/">Machines used in the SPEC benchmark</a> */ public class InterpolationPowerModel(private val powerValues: List<Double>) : PowerModel { - public constructor(hardwareName: String) : this(loadAveragePowerValue(hardwareName)) - public override fun computePower(utilization: Double): Double { val clampedUtilization = min(1.0, max(0.0, utilization)) val utilizationFlr = floor(clampedUtilization * 10).toInt() @@ -63,14 +60,4 @@ public class InterpolationPowerModel(private val powerValues: List<Double>) : Po * @return the power consumption for the given utilization percentage */ private fun getAveragePowerValue(index: Int): Double = powerValues[index] - - private companion object { - private fun loadAveragePowerValue(hardwareName: String, path: String = "spec_machines.yml"): List<Double> { - val content = this::class - .java.classLoader - .getResourceAsStream(path) - val hardwareToAveragePowerValues: Map<String, List<Double>> = Yaml().load(content) - return hardwareToAveragePowerValues.getOrDefault(hardwareName, listOf()) - } - } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 48be8e1a..5a4c4f44 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -27,6 +27,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext +import kotlin.math.min /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource @@ -89,15 +90,16 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val return SimResourceCommand.Idle(timestamp) } + val cores = min(cpu.node.coreCount, fragment.cores) val usage = if (fragment.cores > 0) - fragment.usage / fragment.cores + fragment.usage / cores else 0.0 val deadline = timestamp + fragment.duration val duration = deadline - now val work = duration * usage / 1000 - return if (cpu.id < fragment.cores && work > 0.0) + return if (cpu.id < cores && work > 0.0) SimResourceCommand.Consume(work, usage, deadline) else SimResourceCommand.Idle(deadline) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PowerModelTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PowerModelTest.kt index ac2ed303..7852534a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PowerModelTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PowerModelTest.kt @@ -61,7 +61,8 @@ internal class PowerModelTest { @Test fun `compute power draw by the SPEC benchmark model`() { - val powerModel = InterpolationPowerModel("IBMx3550M3_XeonX5675") + val ibm = listOf(58.4, 98.0, 109.0, 118.0, 128.0, 140.0, 153.0, 170.0, 189.0, 205.0, 222.0) + val powerModel = InterpolationPowerModel(ibm) assertAll( { assertEquals(58.4, powerModel.computePower(0.0)) }, diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index e2e5ea6d..219002e0 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -47,7 +47,7 @@ public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus") +public val RESOURCE_CPU_COUNT: TableColumn<Int> = intColumn("resource:cpu_count") /** * Memory capacity for the resource in KB. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt index 1933967e..b683923b 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt @@ -60,7 +60,7 @@ public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("reso * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_STATE_NCPUS: TableColumn<Int> = intColumn("resource_state:ncpus") +public val RESOURCE_STATE_CPU_COUNT: TableColumn<Int> = intColumn("resource_state:cpu_count") /** * Total CPU capacity of the resource in MHz. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-trace/opendc-trace-azure/build.gradle.kts index b0c0318f..8bde56cb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt +++ b/opendc-trace/opendc-trace-azure/build.gradle.kts @@ -20,19 +20,17 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.env +description = "Support for Azure VM traces in OpenDC" -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.power.PowerModel -import java.util.* +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} -/** - * A definition of a machine in a cluster. - */ -public data class MachineDef( - val uid: UUID, - val name: String, - val meta: Map<String, Any>, - val model: MachineModel, - val powerModel: PowerModel -) +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + implementation(libs.jackson.dataformat.csv) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt index f98f4b2c..84c9b347 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.azure +package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.* @@ -37,7 +37,7 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa /** * The partitions that belong to the table. */ - private val partitions = Files.walk(path, 1) + private val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) .filter { !Files.isDirectory(it) && it.extension == "csv" } .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) .toSortedMap() @@ -68,9 +68,9 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa delegate.close() delegate = nextDelegate() + this.delegate = delegate } - this.delegate = delegate return delegate != null } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt index f80c0e82..c17a17ab 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.azure +package org.opendc.trace.azure import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser @@ -53,7 +53,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta when (parser.currentName) { "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue) "vm id" -> id = parser.text - "avg cpu" -> cpuUsagePct = parser.doubleValue + "CPU avg cpu" -> cpuUsagePct = parser.doubleValue } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt index c9d4f7eb..96ee3158 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.azure +package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.* @@ -38,7 +38,7 @@ internal class AzureResourceTable(private val factory: CsvFactory, private val p RESOURCE_ID, RESOURCE_START_TIME, RESOURCE_STOP_TIME, - RESOURCE_NCPUS, + RESOURCE_CPU_COUNT, RESOURCE_MEM_CAPACITY ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index b712b854..5ea97483 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -20,12 +20,11 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.azure +package org.opendc.trace.azure import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.apache.parquet.example.Paper.schema import org.opendc.trace.* import java.time.Instant @@ -68,7 +67,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe RESOURCE_ID -> true RESOURCE_START_TIME -> true RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true + RESOURCE_CPU_COUNT -> true RESOURCE_MEM_CAPACITY -> true else -> false } @@ -79,8 +78,8 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe RESOURCE_ID -> id RESOURCE_START_TIME -> startTime RESOURCE_STOP_TIME -> stopTime - RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -94,7 +93,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_NCPUS -> cpuCores + RESOURCE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt index 24c60bab..c7e7dc36 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.azure +package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.* @@ -29,7 +29,7 @@ import java.nio.file.Path /** * [Trace] implementation for the Azure v1 VM traces. */ -class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { +public class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) override fun containsTable(name: String): Boolean = name in tables diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 744e43a0..1230d857 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.azure +package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser @@ -32,11 +32,11 @@ import kotlin.io.path.exists /** * A format implementation for the Azure v1 format. */ -class AzureTraceFormat : TraceFormat { +public class AzureTraceFormat : TraceFormat { /** * The name of this trace format. */ - override val name: String = "azure-v1" + override val name: String = "azure" /** * The [CsvFactory] used to create the parser. diff --git a/opendc-trace/opendc-trace-azure/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-azure/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..08e75529 --- /dev/null +++ b/opendc-trace/opendc-trace-azure/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.azure.AzureTraceFormat diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt new file mode 100644 index 00000000..e5735f0d --- /dev/null +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.azure + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.* +import java.io.File +import java.net.URL + +/** + * Test suite for the [AzureTraceFormat] class. + */ +class AzureTraceFormatTest { + private val format = AzureTraceFormat() + + @Test + fun testTraceExists() { + val url = File("src/test/resources/trace").toURI().toURL() + assertDoesNotThrow { + format.open(url) + } + } + + @Test + fun testTraceDoesNotExists() { + val url = File("src/test/resources/trace").toURI().toURL() + assertThrows<IllegalArgumentException> { + format.open(URL(url.toString() + "help")) + } + } + + @Test + fun testTables() { + val url = File("src/test/resources/trace").toURI().toURL() + val trace = format.open(url) + + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + } + + @Test + fun testTableExists() { + val url = File("src/test/resources/trace").toURI().toURL() + val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val url = File("src/test/resources/trace").toURI().toURL() + val trace = format.open(url) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + @Test + fun testResources() { + val url = File("src/test/resources/trace").toURI().toURL() + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, + ) + + reader.close() + } + + @Test + fun testSmoke() { + val url = File("src/test/resources/trace").toURI().toURL() + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_STATE_ID)) }, + { assertEquals(0, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals(2.86979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) } + ) + + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-azure/src/test/resources/trace/vm_cpu_readings/vm_cpu_readings-file-1-of-125.csv b/opendc-trace/opendc-trace-azure/src/test/resources/trace/vm_cpu_readings/vm_cpu_readings-file-1-of-125.csv new file mode 100644 index 00000000..db6ddf8a --- /dev/null +++ b/opendc-trace/opendc-trace-azure/src/test/resources/trace/vm_cpu_readings/vm_cpu_readings-file-1-of-125.csv @@ -0,0 +1,100 @@ +0,+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q,2.052803,3.911587,2.86979
+0,2zrgeOqUDy+l0GVi5NXudU+3sqZH+nLowfcz+D/JsCymTXbKrRf1Hr3OjAtxjnKm,1.64695,8.794403,3.254472
+0,/34Wh1Kq/qkNkW0tQrMiQ1eZ8hg9hHopydCzsXriefhgrn+0Rg1j22k1IHcV6PIQ,2.440088,6.941048,4.33624
+0,2lzdXk1Rqn1ibH2kZhGamYTMvVcRP6+x8b5zGiD/8t++5BQhzU18hGaL5sfR01Lo,0.302992,2.046712,0.970692
+0,0GrUQuLhCER5bWWcoJAgblPJWkaU4v3nf+NUrZnFTlXWEK99qgTRBTkjjUjJVAqA,1.515922,4.471657,2.438805
+0,2I8OpI6bMkdzL3HYLz4KlBcDhy2VTEm3skQbOvEo9rLoxryB0iB9iVh3rGd5DW2j,0.148552,0.315007,0.264341
+0,2IuuDcRMd97gln/+CrgPqI/fwffx67s87T1odKrA0wLYf8YuzGooHdKihitv2Q+s,0.169838,2.277277,0.859669
+0,2KaB1faO0ZB2KqB8MGwasWkqRLJHIE6+2wPuhzlzLNEUyeGzo0dU7brFa/cll/VJ,0.539162,1.371926,0.782212
+0,2BMVXt472mr/Y8m1vaaGoyGTSXcLvXk968PCHixwCDjPSgCm7yYSimGuBw7VPIiS,3.625195,7.211996,4.807884
+0,/3+EY60PnzKwod6nAUGBFSDDpBBOVEVUi90JWWWjPAlNyTUrGwlfQcSDoSkRumD7,0.180582,1.313473,0.43792
+0,+hulsuci78MKSG60G/gHJLqmz5/TFEB3WpS6HI1G1mm052le8oeemF3kz3eoPsnS,2.653344,9.983403,4.262461
+0,0O4otykohyRcsqqsg68kqo6ZCY6sL6eQLHUMYZxGVRhwQmTXRUN89izib3pOucrC,0.72983,4.516831,1.846142
+0,239KfRqrlUdyYuU0ubcASPKztu3q7hernahrolO5AczjUFI/QgoU+OoKzPuivFHQ,1.42953,11.553488,4.271241
+0,/SVzWHvPhr7KAIOUFr10EK8WdKXbrJojgcc4IGvutJ2S6HpRMD0zTfv/h0720+Q6,1.676634,6.703915,3.102252
+0,2a7bYEHqZvcgOeos5Q3J5qxpY4lXinv8M9mORfel5DlWRut0JynZtobNGNlBWn41,0.54178,8.239316,2.234877
+0,1NwFYwEAgv8qnCaWzzWv9hHj0TIJAZ2HT+iH+dsZKeSAPGoJGyVSDB+Zj4EuqWRC,2.933124,4.630701,3.975568
+0,3rg4SRyS/p6eMuGCJpjkz4oHzXSeeF16a7jJ9GAAYPiAQAsQNOEjHOe07on5RbjK,0.719536,3.383279,1.506528
+0,0DVV+uR/jr4XbwYQhVf2Yg0Kg7DfIDa7qJNzqvjVgEqGRJAUisrnYFv7AWr1k7by,0.949333,22.157649,3.751856
+0,3bHtb6EIFo9yXByIhpVDOJ7bzbIQvnGGb+jm8eOsEf0eKbrKMJvUiOYc6Wq3DXbR,3.504534,15.581505,6.388424
+0,0O5yc/ZVSHWxf4UHf/1b8Nut9raakorgqDwGV9k7TJdq55alNeMDB7CREuxZystP,7.587743,20.323464,16.540802
+0,0ZbYi+cMH7hCzT+8ICYVp5ZgcRUFNKsODuH09bbPdPioUPCPkBK2PM2oHhE3y4I6,2.694185,6.361789,4.55337
+0,1jw70sEl89jY2iRpd38PuYSBiOcuwe6tF4Q+YuGBJg20+gRIW3A7H3WZ+uL0EVmb,3.570395,5.707428,4.233997
+0,24MvpVXzcNO2qxwF4hMwCToFTBfoAE5xUQ4L6fwfWuBZ1GW06hHh5jWwWCu+8lPm,5.102273,8.678012,6.369649
+0,2gHzFAqM+fL7f1wtNETuzSoM7I6xlEWk2BJmj1SNXly/7z1RQFmwYRXU49DiYciJ,0.27374,54.447146,5.445003
+0,/hCom+lGMIkeE1wQi+VTFh+zzgbikbO0jQDzchDMCUNSgo6cEJfD1sIT2Ok4NlD6,0.170892,1.843549,0.737087
+0,2UwesOu8HXTdHyj0jd1agckz1KH5+Z4KOFe+wKFo9uvRI4GalozAPaxsMrBmx7Wo,3.349887,6.272554,4.425039
+0,1V8Fr/ZhjQcxql5s9p3hA1b0Wx6Sx9e+np1OImlp3GKyleH87bYjmQLZJouKYJR2,2.022219,4.724097,2.616506
+0,23E/SPMZKCUWz8nBmuCdbNBWf9ou6IQuZjmh0x2/icPrbLLvUk5SvbTjwqoLQxBX,0,0.46365,0.178483
+0,0Mj8nT0fnkeMIbcTBf27pOtUuTtMZH8uAZqAViSaye+9mBIjsNPmU6Z5hLK6f2I0,15.023186,23.297875,18.965327
+0,2xM0uOcqSowNzsbFbzhy5J1Ms2vv0jVQ5aM+J2E/LCBzTVKPrCCeWQ/r/cKmS1Tm,8.272075,9.415241,8.797159
+0,0MYQXyW75q9UURkn+O/V6iww0JaBl2qRG0Mh2bqRcuU5/Ws+7HJMPKSzVKlUEgcU,3.798828,8.915124,4.856879
+0,/HQfnMjgclpCxPod9jmGVQxfTnsjyNWA4KNkLMn4IKRlqheUo9AhhWv4vAumZNqg,4.788548,7.269977,6.640435
+0,0Q2PP+9O7LcnNI7AJQQR7pwM4ISG4024Z+INOw+TWgf2DCl8/prdGC7QJRGjc+Aa,0.10703,0.183798,0.136907
+0,/zLQxB1DGXC7iK7JeyYrUSguf6DjNA1MVTJzieRWmcobm0M+xgd28r842y3p5u5J,1.306953,3.22913,2.226509
+0,42cXpXkVqdXH/ok/tD46zKKCToy0k6HXoH3x7eeo4+zIva3IJKle5xfSEW3R45ON,1.018462,3.240817,2.196357
+0,+9HYwMx1Ckj15bJswEycBgiBSfrBw5NJE3p86IeFpFYKKxdw3NzMPTFKpg67XhsF,1.859664,7.255261,3.501303
+0,10KKTL95cApo6Pf24KZqgrM67v4M6rgZBoX+w/I3j4KS66FNhKomGnap9H8SVAvy,0.041225,2.593651,0.25894
+0,+LyaeKb1faiLEjAzynXF3xO/ZAho1R/Zyh1H4d45+NGsIJR6ryUTDmhyNvMh1wQ9,4.614357,11.692623,6.05005
+0,1SS5EeD9rxdWRFYBkR36PAd96w+Q7V2V4fDcc/2IJ1L07In7RGpQk/HVcOTKd78w,0.020435,0.515471,0.135453
+0,+HFoxb6Eu9kwzVkxs+A+9Q7zXa4aSIcOFm3AnYDCTQQMYyf6EST9nSHslGhUkgAD,8.53904,48.459572,16.166212
+0,+N+B5FPJIUVyH9v1Zcc+kjSTNvULkosDBM48N2JkDjhuVhQtWSfYQMQTQkGeVjLi,3.139119,99.036916,51.090982
+0,1ey9c7Hc1FyxLVbESoty7AkXbuENFSDXRAZiizFifRmJNM6IEx9eNu3bkUR+qCUJ,2.466582,5.842213,3.765056
+0,35F/52yPsKPGondM8xnzX68EKiKiKiZMDqsVnvc9ZOAc/rS3zvQ6YYj3QkLAHFhN,1.963258,43.494868,16.459037
+0,2KX+BTc0TPZOtCgbzKtKvP1yrM+Cc3WQU9DPkZDFD/5aNN/aPV40aQCKwW/HeTzh,1.040522,5.961609,3.305858
+0,+8X+qRHRLwwgj70uuXrkus7lrNtjMeTHfy5yQgymNJI+yFd5pbhRfStfS7lkVOhP,0.436353,15.995153,1.431229
+0,/g5MAtFnYaMO5MpJg40BsFmhS22s0tfwHiivGhPbcZ+KgEAtNxKkFdZYDtrDUUFO,6.905489,8.196952,7.527238
+0,/ke0seVq80UFQeXSTUh5hTrjghtn5qqWf38lQVTis+/ZR6Pdv5vdAotz4dvZcKDp,6.444482,23.136676,15.470455
+0,+tQeKqKqbAui7YXK0Efk3GUnvbzM+0pOpmOJ6OhkMSozjRyl5tHl7+mZwFznU3Mk,17.90259,20.095464,18.937014
+0,/hiC5yD45GhNtMpJTVwVF5ZnNNWfEHttESv/+KH6go9FBoncns+CuQ1M92c0xzFA,2.290396,2.609893,2.523336
+0,0i9+1LVd2t4m1KScDuoJnAAEL0bz9UGXh2iLAGV/8Eq5hTsAliyraV7j6wsf2MZX,4.266491,16.607137,6.929279
+0,2PVcv0/vy8mIjzH7CiB9cJU737jRi6kAO7PhqkxEWA4GrxvaCsK3ZDckhD8YR04U,1.048596,2.309172,1.447266
+0,/kbT+MIfY7jEW2Nn+TKf5BKkLAmBslDqKuZ8HI2Ire6eMKinGP7aTt6SY77vt8PK,2.409783,7.79851,5.552826
+0,2cCRKSXs9v9tPskjJn8UmV15qynI3I3GLPTor/i81nxh5Ocwb7Fq1zwEN5zmtXyx,0.356014,1.468193,0.781642
+0,2qsVNbcvPD0H3cs/p/6MTpuvUBtr5QN3iavAmkCQBCtrHcEpgskYVJf/6WQkEhOF,2.688901,85.501739,37.676562
+0,30FpxnoytvMKoGeJYqwnuL2mPbvKlxpjPIfVT8LKqqFl9smEksQjEzG3lgxhT4U7,2.499018,6.534664,3.508567
+0,/f1C+4xtoPaBxD+FoFdM52MiaWXZEqPqSnBxz4q4XMzoXabJvdddHchLrxc6SlYc,1.894231,15.683948000000001,3.199591
+0,30tz9NOV1bIKUB6uIOy4qZT8BVk3escZ0bWXBD9oedOQN1Qi06pplm7WM9iMvvvL,0.959278,63.599827,14.983399
+0,2q0sA6/4VZfksnucqVASzYgruD9T0219afuGrf3O/u8jpGHpn0k3oWvY35I7x8F/,2.694575,11.900751,5.254742
+0,/Qq/SKTnRJ4RZPWKIdCyPmYQUf+csOcFYS+rVD+kc1OkLboeKHK7CLV88wVVLlm9,55.553347,99.204744,93.215797
+0,2PJIXiy3/m1MNf4SQAQ9xU+LDqsHvyyCIWA2X0nB9kgLyVNh3g9xxpAeUpkXgvK6,0.591771,0.676084,0.628958
+0,/VIH23Tzi+711eCdsc7apDAoSBY6hcNqCu8oaZcPrUQmUXUyH8HJS7Z1DyhR6j/I,3.136726,5.477124,4.036594
+0,3/bNFRCZog1M2qwSCcwMYYos07f/9kRsfeFyaOmT0mNx3ldbNvRRbMBhoseq0DIg,2.993954,5.787727,4.272684
+0,3F+42xbLAiVPTJeHpyDwx6ZXcxArLFiMGGZTa9jmsLIpxxkBqC1QwN8mAwzDqWsU,3.488578,6.178318,4.692753
+0,08iqvtN8ilXeJdfiL86fde5JRTrjuLTp8guNabblV7QqkkAL23TwtLdwuFtg4P9G,3.64316,22.992153,10.256498
+0,0ZiQ/5P4mgnYud0uaI1lZCIJaCzrlEJdnAz8bcFMLDFryCrUJJDecbWQbLo6K69J,2.924592,4.261972,3.543138
+0,28JHlDFu72v9lIhjKLF+h9g1pyPq9+ruVET8NnBGKksclnvwx0WlQ066nh6doanS,1.2833,1.589682,1.353967
+0,3ClcWgHBEw8WzFSqnMYKUib9Abx6RDf3ITN8ivUilopa4t+UTJU0Y/U25sT/1okS,1.387814,2.764987,2.116221
+0,/qj8bL8dARqa83U6HwU/bUF5kLq12PKaebM0/2WrM2a3oH+BCC/IxFf1PjIWBNC5,23.139855,97.95723,75.918613
+0,17KWFIkHqLQpslptyD70Qof2iISdFN4IzZBc/WffQeds/tDjuZ/1O4KY68u10srE,2.374392,4.461708,3.201956
+0,3fNyZ1Bf9hUvTVDbHwh8Fh3E2i0BgPPL3QkkS9T0cjanDQA0u0z/Y5TSdXldEJM8,1.199056,3.188352,2.14033
+0,3DYNNYBvhBlVPHsg1uoo7ZVjKX5k1c0gZsfc8W0o0cJ1WJAI8f049TnSu/yIfp/m,1.305688,4.700476,2.216015
+0,2e9qO7smv0DTuXeR3VEzG2jztbM9wntJ3bMt6/LlN3RZBQzIY9vP7FFsphJC9bsW,0.087859,22.556549,10.203507
+0,3EeP6Vgbh292ahLWQJrInzehyR4Nuj2vNtdWuEbvFjKcmCc2i6VZVN4dQTRfIVxR,7.663198,22.199953,15.461753
+0,/Vi7oNg70eAzJHXwsCM9nzwBMg4l7cMyZhUT14V48AWjIAQzVYsbdI0KwNlBAXhK,0.61977,2.24158,1.181003
+0,4/c7nkT3SrtRRrRCsZxUJXxJjUr61iivwZxdihwPAtpCDUawKfPUzaq/05zFYBAk,2.667104,7.383679,4.050989
+0,1HYzfmk+s4SedWtOeHk4j5Zj52ateGX5bRFK5K3rwTVdB2A2m+3iwbL1IEzx8ir8,5.366892,12.404488,6.877072
+0,3vPq2HsXQ9SQT+URugEaQ3ezvstcGd5Bt9FIiFx1SrUfUrvvi/Gj8Nyw5DZhvyAR,3.014601,13.363316,4.535414
+0,2YbmUab2MqBMpvMaoaMP3zVxOhgqkNytraWdt/GG261oZ/tmgEB239WsbKJh1bE3,3.121409,98.73306,51.009852
+0,1IYQhDD8NGuAFnVPnffmt1yk20B9JHQI5DMC4Ny09pe6Sedik6YCIIVeBHIEo34W,1.512222,3.53396,2.379989
+0,1SSMSUcJ7qKM7q2yka80+ZP0yYWiYxGQxcJ8KBi4+TsDpv5FLUS6i2DHLMtXB3An,3.9704,4.345802,4.126586
+0,17CA6zpUCxW+Pdh2g5W0kTdlPlgWbBKz4YrHvbGP/Hmf13nZQBc/VZO7EL6nM75C,8.052588,16.023168,13.600106
+0,04rwScmEvRr0aU/mAE7aKtKFwowolGaTAPyQHuaVKEFmEVMAKxo+7UBCk3vRRRBd,2.221999,5.809178,3.021269
+0,1H9K/TW4c28Aob/H1O53cyQT7pHRww0L1ocyn19z1+MxC+k+5M/PgEx9B3zT/CNf,2.985884,7.584636,3.995057
+0,2fgXOaNZld/i7o20ULRNhCeL+o+vgZYzDOIhQ2n28TcGxXR047+F1b7QiD+l1Ypf,0.068074,0.884132,0.239792
+0,+0bAvqEMTl/RGyFmuz4zJH3DLMI6Q+iHapYn5BpbZI+0PNNfM7PXm/mojw+e8Xpn,3.238927,4.259525,3.611511
+0,3OdFPkhA5Q99wyfxmgyxPAhWyDLkV++XFtPL8pD3w5f8mBWbokeBwgk4gmNIxCOL,0.461767,10.466777,4.985617
+0,0UE8gxQAdCGY+WGN9yd9CL2ZGGqoyGQ2PzQGndwecce24GyTUnuvREbnMWBZZ7bG,0.730279,6.785359,3.363408
+0,/Uk/U5u4d+KNQVPD63pklfxeWc2zDAkUnrVmvxgRTuqNFbn90h8TuU5GZ+OamGQ5,0.105853,1.739301,0.262678
+0,2im96EJfLyxm7TPrtOR9m6Inq4E4/qR+AvP0TbnSdvzXI+N9gHh7C2fzppzcR0i8,0.325895,2.012216,0.802437
+0,+CrXBNhT3ch1hYU2e9IGs7wfjSLRkKYgidJYc42LlsH39cYtwdAX3wKm1OGlf+Kl,18.815771,40.850218,22.470045
+0,/hXRrrjPrAw8xDSsJnEwLdkRN1e42zJLE/HO5DXk5gbGLRmRx5H9n4T0UmraZ8uW,0.361838,0.831517,0.423214
+0,/sTadDDv8poFeLWS7lD/SEtEgWCBHXB1IaiitjCru4AcK8Z32hNXlccdY8hlFzTp,3.203254,5.682829,3.859569
+0,333YaK054AGlUYuw0XWxYn5K8NwzhfzJ3mm4YNwB1YXKjgnO64ZItBNaBRQoOgXn,0.124811,0.384592,0.257066
+0,+ZkQz7QrPZIODz45A+60ZFnG18jnyYlSY/IgEe1Yj8c4cU8h+L8WDIKMv2uB7EwD,1.022656,6.508863,3.368929
+0,+X4DW7zA6whRfOWSHHONJ1u3f0DyBvC9PqDmXGFfbxT4aUGCC6kVm6fuGu9IsQyL,3.428286,15.183059,5.743137
+0,2KXdN0Pb4iyu0jVPocTTf3dwk2Z1LjIlAcydV3HURGIUn1dTycCDDCHg5G6l6i9t,0.282044,0.40582,0.311669
+0,2lGxRtUbBrRZmIYagONMp6vj0zHk4EGhu0aSH5Ws/CAXwBNZpCavBFDNCEcPsOkt,3.662958,8.660027,5.281077
+0,+IR6CKA4zeO742dCx1l2hR0plhTanlaxPWAbckkZNo6UAti83TpYPRXrrfdmm9Ar,0.086237,2.450893,0.969819
+0,2/hWJ+i+1FSHiD44Rr3S4xWMUHC6hIgoVBX2XGZ7cOFyLn9FWQ3Kevsocw7CGaxJ,1.499537,2.832775,1.900258
+0,1WnALZnCvRlfqnuRyrIf0wxQOGLhGuvxInHelnMBM6cw9G9hydTBxqV60JSL/48p,0.717535,5.066802,1.448937
diff --git a/opendc-trace/opendc-trace-azure/src/test/resources/trace/vmtable/vmtable.csv b/opendc-trace/opendc-trace-azure/src/test/resources/trace/vmtable/vmtable.csv new file mode 100644 index 00000000..299c518c --- /dev/null +++ b/opendc-trace/opendc-trace-azure/src/test/resources/trace/vmtable/vmtable.csv @@ -0,0 +1,10 @@ +x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf,VDU4C8cqdr+ORcqquwMRcsBA2l0SC6lCPys0wdghKROuxPYysA2XYii9Y5ZkaYaq,Pc2VLB8aDxK2DCC96itq4vW/zVDp4wioAUiB3HoGSFYQ0o6/ZCegTpb9vEH4LeMTEWVObHTPRYEY81TYivZCMQ==,0,2591700,99.369869,3.4240942342446719,10.194309,Delay-insensitive,1,1.75
+H5CxmMoVcZSpjgGbohnVA3R+7uCTe/hM2ht2uIYi3t7KwXB4tkBxmZHBrt2A4x+n,BSXOcywx8pUU0DueDo6UMol1YzR6tn47KLEKaoXp0a1bf2PpzJ7n7lLlmhQ0OJf9,3J17LcV4gXjFat62qhVFRfoiWArHnY763HVqqI6orJCfV8h5j9yeotRMnCLlX1ooGkMyQ2MDOuY1oz111AGN9Q==,0,1539300,100,6.18178366757598,33.98136,Interactive,1,0.75
+wR/G1YUjpMP4zUbxGM/XJNhYS8cAK3SGKM2tqhF7VdeTUYHGktQiKQNoDTtYvnAc,VDU4C8cqdr+ORcqquwMRcsBA2l0SC6lCPys0wdghKROuxPYysA2XYii9Y5ZkaYaq,Pc2VLB8aDxK2DCC96itq4vW/zVDp4wioAUiB3HoGSFYQ0o6/ZCegTpb9vEH4LeMT+hzuAPZnYJMu61JNhTDF/Q==,2188800,2591700,99.569027,3.5736346071428589,7.92425,Delay-insensitive,1,1.75
+1XiU+KpvIa3T1XP8kk3ZY71Of03+ogFL5Pag9Mc2jBuh0YqeW0Zcb9lepKLdPEDg,8u+M3WcFp8pq183WoMB79PhK7xUzbaviOBv0qWN6Xn4mbuNVM1GYJlIjswgit+k1,DHbeI+pYTYFjH8JAF8SewM0z/4SqQctvxcBRGIRglBmeLW5VjISVEw7/IpY345kHwHtk7+SKlEwc1upnT3PigA==,0,2591700,99.405085,16.2876105408034,95.69789,Delay-insensitive,8,56
+z5i2HiSaz6ZdLR6PXdnDjGva3jIlkMPXx23VtfXx9q3dXFRBQrxCOj7sHUsrmFLa,VDU4C8cqdr+ORcqquwMRcsBA2l0SC6lCPys0wdghKROuxPYysA2XYii9Y5ZkaYaq,Pc2VLB8aDxK2DCC96itq4vW/zVDp4wioAUiB3HoGSFYQ0o6/ZCegTpb9vEH4LeMTEWVObHTPRYEY81TYivZCMQ==,0,2188500,98.967961,3.036037969572376,9.445484,Delay-insensitive,1,1.75
+n77nP00/UpJmT+Yx1ZkDphvAqPoHU8yUpDCwyUtPNlRENqvNp6Inya1eiy7VP1+x,8u+M3WcFp8pq183WoMB79PhK7xUzbaviOBv0qWN6Xn4mbuNVM1GYJlIjswgit+k1,DHbeI+pYTYFjH8JAF8SewM0z/4SqQctvxcBRGIRglBmeLW5VjISVEw7/IpY345kHwHtk7+SKlEwc1upnT3PigA==,0,2591700,99.448473,34.17401179027781,98.553018,Delay-insensitive,8,56
+aTSXW3N1KepxKYwKumd7T1+f7DkGolSKV8EArYAdctjD26YqSMKezCVSdvmSgqIQ,dBub/K+8I6jD9t2ExqUdRNlVxPPvDWqICA9Sr+yzcBZ/nNuC0W2swapPoBNIRoF+,C9GnRqFF2lzW/elUsLEwhyAQj9D/d5JIOOgvwfPL1aINf+m1f29G7nXhr6mRPGbiofmjfP9GkepcWz9LX5tp7Q==,2290500,2292300,94.113335,32.461745857142866,94.113335,Unkown,1,1.75
+uSkGH3DS6BVo3RFnw3GZb6WCFSmGgvgKi4HIj08yxO4f5ladUQc3pqDOtqRN0W9+,8u+M3WcFp8pq183WoMB79PhK7xUzbaviOBv0qWN6Xn4mbuNVM1GYJlIjswgit+k1,DHbeI+pYTYFjH8JAF8SewM0z/4SqQctvxcBRGIRglBmeLW5VjISVEw7/IpY345kHwHtk7+SKlEwc1upnT3PigA==,0,2591700,99.276369,1.3500837561060346,23.450372,Delay-insensitive,8,56
+ztRY/Sk5mrSFFcpy2usZ0YZZ7Eumq130/5BB8WVXfWaYvFkU+EhXUQ2kOFkCXuCw,dBub/K+8I6jD9t2ExqUdRNlVxPPvDWqICA9Sr+yzcBZ/nNuC0W2swapPoBNIRoF+,C9GnRqFF2lzW/elUsLEwhyAQj9D/d5JIOOgvwfPL1aINf+m1f29G7nXhr6mRPGbiofmjfP9GkepcWz9LX5tp7Q==,2281200,2300100,98.671595,43.724999781249991,98.13707,Unkown,1,1.75
+bJoIb8ras2ZNNSdAz3CAu4HYRd6k9MOqij/+6/+/5XaYw4+EoGdUEr74DCi974gJ,8u+M3WcFp8pq183WoMB79PhK7xUzbaviOBv0qWN6Xn4mbuNVM1GYJlIjswgit+k1,DHbeI+pYTYFjH8JAF8SewM0z/4SqQctvxcBRGIRglBmeLW5VjISVEw7/IpY345kHwHtk7+SKlEwc1upnT3PigA==,0,2591700,99.498748,18.989459534151351,94.751666,Interactive,8,56
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt index 67140fe9..4a60dff3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.sv +package org.opendc.trace.bitbrains import org.opendc.trace.* import java.nio.file.Files @@ -33,7 +33,7 @@ import kotlin.io.path.nameWithoutExtension /** * The resource state [Table] in the extended Bitbrains format. */ -internal class SvResourceStateTable(path: Path) : Table { +internal class BitbrainsExResourceStateTable(path: Path) : Table { /** * The partitions that belong to the table. */ @@ -50,7 +50,7 @@ internal class SvResourceStateTable(path: Path) : Table { RESOURCE_STATE_ID, RESOURCE_STATE_CLUSTER_ID, RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_CAPACITY, RESOURCE_STATE_CPU_USAGE, RESOURCE_STATE_CPU_USAGE_PCT, @@ -77,9 +77,9 @@ internal class SvResourceStateTable(path: Path) : Table { delegate.close() delegate = nextDelegate() + this.delegate = delegate } - this.delegate = delegate return delegate != null } @@ -118,7 +118,7 @@ internal class SvResourceStateTable(path: Path) : Table { return if (it.hasNext()) { val (_, path) = it.next() val reader = path.bufferedReader() - return SvResourceStateTableReader(reader) + return BitbrainsExResourceStateTableReader(reader) } else { null } @@ -131,8 +131,8 @@ internal class SvResourceStateTable(path: Path) : Table { override fun newReader(partition: String): TableReader { val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } val reader = path.bufferedReader() - return SvResourceStateTableReader(reader) + return BitbrainsExResourceStateTableReader(reader) } - override fun toString(): String = "SvResourceStateTable" + override fun toString(): String = "BitbrainsExResourceStateTable" } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index 6ea403fe..f1cf7307 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.sv +package org.opendc.trace.bitbrains import org.opendc.trace.* import java.io.BufferedReader @@ -29,7 +29,7 @@ import java.time.Instant /** * A [TableReader] for the Bitbrains resource state table. */ -internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader { +internal class BitbrainsExResourceStateTableReader(private val reader: BufferedReader) : TableReader { override fun nextRow(): Boolean { reset() @@ -81,7 +81,7 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() COL_ID -> id = field.trim() - COL_MEM_CAPACITY -> memCapacity = field.toDouble() + COL_MEM_CAPACITY -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB } } @@ -93,7 +93,7 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : RESOURCE_STATE_ID -> true RESOURCE_STATE_CLUSTER_ID -> true RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_CAPACITY -> true RESOURCE_STATE_CPU_USAGE -> true RESOURCE_STATE_CPU_USAGE_PCT -> true @@ -111,7 +111,7 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : RESOURCE_STATE_ID -> id RESOURCE_STATE_CLUSTER_ID -> cluster RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) @@ -134,7 +134,7 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt index dbd63de5..f16c493d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.sv +package org.opendc.trace.bitbrains import org.opendc.trace.* import java.nio.file.Path @@ -28,7 +28,7 @@ import java.nio.file.Path /** * [Trace] implementation for the extended Bitbrains format. */ -public class SvTrace internal constructor(private val path: Path) : Trace { +public class BitbrainsExTrace internal constructor(private val path: Path) : Trace { override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name @@ -38,8 +38,8 @@ public class SvTrace internal constructor(private val path: Path) : Trace { return null } - return SvResourceStateTable(path) + return BitbrainsExResourceStateTable(path) } - override fun toString(): String = "SvTrace[$path]" + override fun toString(): String = "BitbrainsExTrace[$path]" } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 0cce8559..06388a84 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.sv +package org.opendc.trace.bitbrains import org.opendc.trace.spi.TraceFormat import java.net.URL @@ -30,18 +30,18 @@ import kotlin.io.path.exists /** * A format implementation for the extended Bitbrains trace format. */ -public class SvTraceFormat : TraceFormat { +public class BitbrainsExTraceFormat : TraceFormat { /** * The name of this trace format. */ - override val name: String = "sv" + override val name: String = "bitbrains-ex" /** * Open the trace file. */ - override fun open(url: URL): SvTrace { + override fun open(url: URL): BitbrainsExTrace { val path = Paths.get(url.toURI()) require(path.exists()) { "URL $url does not exist" } - return SvTrace(path) + return BitbrainsExTrace(path) } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index c9e5954d..7241b18b 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -50,7 +50,7 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path override val columns: List<TableColumn<*>> = listOf( RESOURCE_STATE_ID, RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_CAPACITY, RESOURCE_STATE_CPU_USAGE, RESOURCE_STATE_CPU_USAGE_PCT, @@ -78,9 +78,9 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path delegate.close() delegate = nextDelegate() + this.delegate = delegate } - this.delegate = delegate return delegate != null } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index dab784c2..56e66f5c 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -115,7 +115,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, return when (column) { RESOURCE_STATE_ID -> true RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_CAPACITY -> true RESOURCE_STATE_CPU_USAGE -> true RESOURCE_STATE_CPU_USAGE_PCT -> true @@ -133,7 +133,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, val res: Any? = when (column) { RESOURCE_STATE_ID -> partition RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_COUNT -> cpuCores RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity RESOURCE_STATE_CPU_USAGE -> cpuUsage RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct @@ -156,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat index f18135d0..fd6a2180 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat +++ b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -1 +1,2 @@ org.opendc.trace.bitbrains.BitbrainsTraceFormat +org.opendc.trace.bitbrains.BitbrainsExTraceFormat diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt new file mode 100644 index 00000000..2e4f176a --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.* +import java.net.URL + +/** + * Test suite for the [BitbrainsExTraceFormat] class. + */ +class BitbrainsExTraceFormatTest { + private val format = BitbrainsExTraceFormat() + + @Test + fun testTraceExists() { + val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) + assertDoesNotThrow { + format.open(url) + } + } + + @Test + fun testTraceDoesNotExists() { + val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) + assertThrows<IllegalArgumentException> { + format.open(URL(url.toString() + "help")) + } + } + + @Test + fun testTables() { + val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) + val trace = format.open(url) + + assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + } + + @Test + fun testTableExists() { + val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) + val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) + val trace = format.open(url) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + @Test + fun testSmoke() { + val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(1631911500, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals(21.2, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } + ) + + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/resources/vm.txt b/opendc-trace/opendc-trace-bitbrains/src/test/resources/vm.txt new file mode 100644 index 00000000..28bebb0c --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/test/resources/vm.txt @@ -0,0 +1,2 @@ +1631911500 21.2 22.10 0.0 0.0 0.67 1.2 0.0 0.0 5 1 abc 1 0.01 1 10 0.0 0.0 2699 vm 4096 +1631911800 30.4 31.80 0.0 0.0 0.56 1.3 0.0 0.0 5 1 abc 1 0.02 1 10 0.0 0.0 2699 vm 4096 diff --git a/opendc-trace/opendc-trace-opendc/build.gradle.kts b/opendc-trace/opendc-trace-opendc/build.gradle.kts new file mode 100644 index 00000000..b9c242a1 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/build.gradle.kts @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support for OpenDC-specific trace formats" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + + implementation(projects.opendcTrace.opendcTraceParquet) + + testRuntimeOnly(libs.slf4j.simple) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt index f051bf88..bee4ba7e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.bp +package org.opendc.trace.opendc import org.apache.avro.generic.GenericRecord import org.opendc.trace.* @@ -28,9 +28,9 @@ import org.opendc.trace.util.parquet.LocalParquetReader import java.nio.file.Path /** - * The resource state [Table] in the Bitbrains Parquet format. + * The resource state [Table] in the OpenDC virtual machine trace format. */ -internal class BPResourceStateTable(private val path: Path) : Table { +internal class OdcVmResourceStateTable(private val path: Path) : Table { override val name: String = TABLE_RESOURCE_STATES override val isSynthetic: Boolean = false @@ -38,13 +38,13 @@ internal class BPResourceStateTable(private val path: Path) : Table { RESOURCE_STATE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_DURATION, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_USAGE, ) override fun newReader(): TableReader { val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) - return BPResourceStateTableReader(reader) + return OdcVmResourceStateTableReader(reader) } override fun newReader(partition: String): TableReader { diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index 0e7ee555..df3bcfa6 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -20,8 +20,9 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.bp +package org.opendc.trace.opendc +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader @@ -29,16 +30,28 @@ import java.time.Duration import java.time.Instant /** - * A [TableReader] implementation for the Bitbrains Parquet format. + * A [TableReader] implementation for the OpenDC virtual machine trace format. */ -internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { +internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { /** * The current record. */ private var record: GenericRecord? = null + /** + * A flag to indicate that the columns have been initialized. + */ + private var hasInitializedColumns = false + override fun nextRow(): Boolean { - record = reader.read() + val record = reader.read() + this.record = record + + if (!hasInitializedColumns && record != null) { + initColumns(record.schema) + hasInitializedColumns = true + } + return record != null } @@ -47,7 +60,7 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader RESOURCE_STATE_ID -> true RESOURCE_STATE_TIMESTAMP -> true RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_USAGE -> true else -> false } @@ -58,11 +71,11 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - RESOURCE_STATE_ID -> record["id"].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long) - RESOURCE_STATE_NCPUS -> record["cores"] - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + RESOURCE_STATE_ID -> record[COL_ID].toString() + RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) + RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) + RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) + RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) else -> throw IllegalArgumentException("Invalid column") } @@ -76,9 +89,8 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader override fun getInt(column: TableColumn<Int>): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_NCPUS -> record["cores"] as Int + RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } @@ -90,7 +102,7 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader override fun getDouble(column: TableColumn<Double>): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -99,5 +111,27 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader reader.close() } - override fun toString(): String = "BPResourceStateTableReader" + override fun toString(): String = "OdcVmResourceStateTableReader" + + /** + * Initialize the columns for the reader based on [schema]. + */ + private fun initColumns(schema: Schema) { + try { + COL_ID = schema.getField("id").pos() + COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() + COL_DURATION = schema.getField("duration").pos() + COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + } catch (e: NullPointerException) { + // This happens when the field we are trying to access does not exist + throw IllegalArgumentException("Invalid schema", e) + } + } + + private var COL_ID = -1 + private var COL_TIMESTAMP = -1 + private var COL_DURATION = -1 + private var COL_CPU_COUNT = -1 + private var COL_CPU_USAGE = -1 } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt index 5b0f013f..b1456560 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.bp +package org.opendc.trace.opendc import org.apache.avro.generic.GenericRecord import org.opendc.trace.* @@ -28,9 +28,9 @@ import org.opendc.trace.util.parquet.LocalParquetReader import java.nio.file.Path /** - * The resource [Table] in the Bitbrains Parquet format. + * The resource [Table] for the OpenDC virtual machine trace format. */ -internal class BPResourceTable(private val path: Path) : Table { +internal class OdcVmResourceTable(private val path: Path) : Table { override val name: String = TABLE_RESOURCES override val isSynthetic: Boolean = false @@ -38,13 +38,13 @@ internal class BPResourceTable(private val path: Path) : Table { RESOURCE_ID, RESOURCE_START_TIME, RESOURCE_STOP_TIME, - RESOURCE_NCPUS, - RESOURCE_MEM_CAPACITY + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY, ) override fun newReader(): TableReader { val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) - return BPResourceTableReader(reader) + return OdcVmResourceTableReader(reader) } override fun newReader(partition: String): TableReader { diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index 4416aae8..c52da62d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -20,24 +20,37 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.bp +package org.opendc.trace.opendc +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Instant /** - * A [TableReader] implementation for the Bitbrains Parquet format. + * A [TableReader] implementation for the resources table in the OpenDC virtual machine trace format. */ -internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { +internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { /** * The current record. */ private var record: GenericRecord? = null + /** + * A flag to indicate that the columns have been initialized. + */ + private var hasInitializedColumns = false + override fun nextRow(): Boolean { - record = reader.read() + val record = reader.read() + this.record = record + + if (!hasInitializedColumns && record != null) { + initColumns(record.schema) + hasInitializedColumns = true + } + return record != null } @@ -46,7 +59,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene RESOURCE_ID -> true RESOURCE_START_TIME -> true RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true + RESOURCE_CPU_COUNT -> true RESOURCE_MEM_CAPACITY -> true else -> false } @@ -57,10 +70,10 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - RESOURCE_ID -> record["id"].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_ID -> record[COL_ID].toString() + RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long) + RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -77,7 +90,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_NCPUS -> record["maxCores"] as Int + RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } @@ -90,7 +103,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB + RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -99,5 +112,27 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene reader.close() } - override fun toString(): String = "BPResourceTableReader" + override fun toString(): String = "OdcVmResourceTableReader" + + /** + * Initialize the columns for the reader based on [schema]. + */ + private fun initColumns(schema: Schema) { + try { + COL_ID = schema.getField("id").pos() + COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + } catch (e: NullPointerException) { + // This happens when the field we are trying to access does not exist + throw IllegalArgumentException("Invalid schema") + } + } + + private var COL_ID = -1 + private var COL_START_TIME = -1 + private var COL_STOP_TIME = -1 + private var COL_CPU_COUNT = -1 + private var COL_MEM_CAPACITY = -1 } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt index 486587b1..3e5029b4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.bp +package org.opendc.trace.opendc import org.opendc.trace.TABLE_RESOURCES import org.opendc.trace.TABLE_RESOURCE_STATES @@ -29,9 +29,9 @@ import org.opendc.trace.Trace import java.nio.file.Path /** - * A [Trace] in the Bitbrains Parquet format. + * A [Trace] in the OpenDC virtual machine trace format. */ -public class BPTrace internal constructor(private val path: Path) : Trace { +public class OdcVmTrace internal constructor(private val path: Path) : Trace { override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) override fun containsTable(name: String): Boolean = @@ -39,11 +39,11 @@ public class BPTrace internal constructor(private val path: Path) : Trace { override fun getTable(name: String): Table? { return when (name) { - TABLE_RESOURCES -> BPResourceTable(path) - TABLE_RESOURCE_STATES -> BPResourceStateTable(path) + TABLE_RESOURCES -> OdcVmResourceTable(path) + TABLE_RESOURCE_STATES -> OdcVmResourceStateTable(path) else -> null } } - override fun toString(): String = "BPTrace[$path]" + override fun toString(): String = "OdcVmTrace[$path]" } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt new file mode 100644 index 00000000..8edba725 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A [TraceFormat] implementation of the OpenDC virtual machine trace format. + */ +public class OdcVmTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "opendc-vm" + + /** + * Open a Bitbrains Parquet trace. + */ + override fun open(url: URL): OdcVmTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return OdcVmTrace(path) + } + + public companion object { + /** + * Schema for the resources table in the trace. + */ + @JvmStatic + public val RESOURCES_SCHEMA: Schema = SchemaBuilder + .record("resource") + .namespace("org.opendc.trace.opendc") + .fields() + .requiredString("id") + .name("start_time").type(TIMESTAMP_SCHEMA).noDefault() + .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault() + .requiredInt("cpu_count") + .requiredLong("mem_capacity") + .endRecord() + + /** + * Schema for the resource states table in the trace. + */ + @JvmStatic + public val RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder + .record("resource_state") + .namespace("org.opendc.trace.opendc") + .fields() + .requiredString("id") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .requiredLong("duration") + .requiredInt("cpu_count") + .requiredDouble("cpu_usage") + .endRecord() + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..94094af4 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.opendc.OdcVmTraceFormat diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt new file mode 100644 index 00000000..42eb369e --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import org.opendc.trace.* +import java.io.File +import java.net.URL + +/** + * Test suite for the [OdcVmTraceFormat] implementation. + */ +internal class OdcVmTraceFormatTest { + private val format = OdcVmTraceFormat() + + @Test + fun testTraceExists() { + val url = File("src/test/resources/trace-v2.1").toURI().toURL() + assertDoesNotThrow { format.open(url) } + } + + @Test + fun testTraceDoesNotExists() { + val url = File("src/test/resources/trace-v2.1").toURI().toURL() + assertThrows<IllegalArgumentException> { + format.open(URL(url.toString() + "help")) + } + } + + @Test + fun testTables() { + val url = File("src/test/resources/trace-v2.1").toURI().toURL() + val trace = format.open(url) + + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + } + + @Test + fun testTableExists() { + val url = File("src/test/resources/trace-v2.1").toURI().toURL() + val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val url = File("src/test/resources/trace-v2.1").toURI().toURL() + val trace = format.open(url) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + @ParameterizedTest + @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) + fun testResources(name: String) { + val url = File("src/test/resources/$name").toURI().toURL() + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertTrue(reader.nextRow()) }, + { assertEquals("1023", reader.get(RESOURCE_ID)) }, + { assertTrue(reader.nextRow()) }, + { assertEquals("1052", reader.get(RESOURCE_ID)) }, + { assertTrue(reader.nextRow()) }, + { assertEquals("1073", reader.get(RESOURCE_ID)) }, + { assertFalse(reader.nextRow()) } + ) + + reader.close() + } + + @ParameterizedTest + @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) + fun testSmoke(name: String) { + val url = File("src/test/resources/$name").toURI().toURL() + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_STATE_ID)) }, + { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } + ) + + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet Binary files differnew file mode 100644 index 00000000..d6ff09d8 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet Binary files differnew file mode 100644 index 00000000..5b6fa6b7 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet Binary files differnew file mode 100644 index 00000000..d8184945 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet Binary files differnew file mode 100644 index 00000000..00ab5835 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt index a4676f31..086b900b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt @@ -21,7 +21,7 @@ */ @file:JvmName("AvroUtils") -package org.opendc.experiments.capelin.export.parquet +package org.opendc.trace.util.parquet import org.apache.avro.LogicalTypes import org.apache.avro.Schema @@ -29,16 +29,16 @@ import org.apache.avro.Schema /** * Schema for UUID type. */ -internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) +public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) /** * Schema for timestamp type. */ -internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) +public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) /** * Helper function to make a [Schema] field optional. */ -internal fun Schema.optional(): Schema { +public fun Schema.optional(): Schema { return Schema.createUnion(Schema.create(Schema.Type.NULL), this) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-trace/opendc-trace-tools/build.gradle.kts index 49d5b4c5..35190dba 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -20,28 +20,28 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace.bp +description = "Tools for working with workload traces" -import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +/* Build configuration */ +plugins { + `kotlin-conventions` + application +} -/** - * A format implementation for the GWF trace format. - */ -public class BPTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "bitbrains-parquet" +application { + mainClass.set("org.opendc.trace.tools.TraceConverterKt") +} + +dependencies { + api(platform(projects.opendcPlatform)) + + implementation(projects.opendcTrace.opendcTraceParquet) + implementation(projects.opendcTrace.opendcTraceOpendc) + implementation(projects.opendcTrace.opendcTraceAzure) + implementation(projects.opendcTrace.opendcTraceBitbrains) + + implementation(libs.kotlin.logging) + implementation(libs.clikt) - /** - * Open a Bitbrains Parquet trace. - */ - override fun open(url: URL): BPTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BPTrace(path) - } + runtimeOnly(libs.log4j.slf4j) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 1f3878eb..322464cd 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace +package org.opendc.trace.tools import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument @@ -34,15 +34,15 @@ import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat -import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA -import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA -import org.opendc.experiments.capelin.trace.sv.SvTraceFormat import org.opendc.trace.* +import org.opendc.trace.azure.AzureTraceFormat +import org.opendc.trace.bitbrains.BitbrainsExTraceFormat import org.opendc.trace.bitbrains.BitbrainsTraceFormat +import org.opendc.trace.opendc.OdcVmTraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File import java.util.* +import kotlin.math.abs import kotlin.math.max import kotlin.math.min import kotlin.math.roundToLong @@ -50,12 +50,12 @@ import kotlin.math.roundToLong /** * A script to convert a trace in text format into a Parquet trace. */ -fun main(args: Array<String>): Unit = TraceConverterCli().main(args) +public fun main(args: Array<String>): Unit = TraceConverterCli().main(args) /** * Represents the command for converting traces */ -class TraceConverterCli : CliktCommand(name = "trace-converter") { +internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The logger instance for the converter. */ @@ -79,7 +79,7 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") { */ private val format by option("-f", "--format", help = "input format of trace") .choice( - "solvinity" to SvTraceFormat(), + "solvinity" to BitbrainsExTraceFormat(), "bitbrains" to BitbrainsTraceFormat(), "azure" to AzureTraceFormat() ) @@ -106,23 +106,28 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") { logger.info { "Building resources table" } val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet)) - .withSchema(BP_RESOURCES_SCHEMA) + .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA) .withCompressionCodec(CompressionCodecName.ZSTD) .enablePageWriteChecksum() .build() val selectedVms = metaWriter.use { convertResources(trace, it) } + if (selectedVms.isEmpty()) { + logger.warn { "No VMs selected" } + return + } + logger.info { "Wrote ${selectedVms.size} rows" } logger.info { "Building resource states table" } val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet)) - .withSchema(BP_RESOURCE_STATES_SCHEMA) + .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) .withCompressionCodec(CompressionCodecName.ZSTD) - .enableDictionaryEncoding() - .enablePageWriteChecksum() + .withDictionaryEncoding("id", true) .withBloomFilterEnabled("id", true) .withBloomFilterNDV("id", selectedVms.size.toLong()) + .enableValidation() .build() val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } @@ -155,7 +160,7 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") { startTime = min(startTime, timestamp) stopTime = max(stopTime, timestamp) - numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS)) + numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT)) memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { @@ -170,13 +175,13 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") { continue } - val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA) + val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA) builder["id"] = id - builder["submissionTime"] = startTime - builder["endTime"] = stopTime - builder["maxCores"] = numCpus - builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong() + builder["start_time"] = startTime + builder["stop_time"] = stopTime + builder["cpu_count"] = numCpus + builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong() logger.info { "Selecting VM $id" } @@ -195,44 +200,58 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") { var hasNextRow = reader.nextRow() var count = 0 + var lastId: String? = null + var lastTimestamp = 0L while (hasNextRow) { - var lastTimestamp = Long.MIN_VALUE + val id = reader.get(RESOURCE_STATE_ID) - do { - val id = reader.get(RESOURCE_STATE_ID) + if (id !in selectedVms) { + hasNextRow = reader.nextRow() + continue + } - if (id !in selectedVms) { - hasNextRow = reader.nextRow() - continue - } + val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA) - builder["id"] = id + val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + var timestamp = startTimestamp + var duration: Long - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L + // Check whether the previous entry is from a different VM + if (id != lastId) { + lastTimestamp = timestamp - 5 * 60 * 1000L + } + + do { + timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + + duration = timestamp - lastTimestamp + hasNextRow = reader.nextRow() + + if (!hasNextRow) { + break } - val duration = timestamp - lastTimestamp - val cores = reader.getInt(RESOURCE_STATE_NCPUS) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val flops = (cpuUsage * duration / 1000.0).roundToLong() + val shouldContinue = id == reader.get(RESOURCE_STATE_ID) && + abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 && + cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT) + } while (shouldContinue) - builder["time"] = timestamp - builder["duration"] = duration - builder["cores"] = cores - builder["cpuUsage"] = cpuUsage - builder["flops"] = flops + val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - writer.write(builder.build()) + builder["id"] = id + builder["timestamp"] = startTimestamp + builder["duration"] = duration + builder["cpu_count"] = cpuCount + builder["cpu_usage"] = cpuUsage - lastTimestamp = timestamp - hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + writer.write(builder.build()) count++ + + lastId = id + lastTimestamp = timestamp } return count diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 483558e1..1b518fee 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -28,14 +28,12 @@ import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import kotlinx.coroutines.* import mu.KotlinLogging -import org.opendc.experiments.capelin.* -import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.env.MachineDef +import org.opendc.compute.workload.* +import org.opendc.compute.workload.topology.HostSpec +import org.opendc.compute.workload.topology.Topology +import org.opendc.compute.workload.topology.apply +import org.opendc.compute.workload.util.PerformanceInterferenceReader import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.trace.ParquetTraceReader -import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader -import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel @@ -43,6 +41,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics @@ -50,12 +49,12 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.web.client.ApiClient import org.opendc.web.client.AuthConfiguration import org.opendc.web.client.model.Scenario -import org.opendc.web.client.model.Topology import java.io.File import java.net.URI import java.time.Duration import java.util.* import org.opendc.web.client.model.Portfolio as ClientPortfolio +import org.opendc.web.client.model.Topology as ClientTopology private val logger = KotlinLogging.logger {} @@ -129,18 +128,14 @@ class RunnerCli : CliktCommand(name = "runner") { /** * Run a single scenario. */ - private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebComputeMonitor.Result> { + private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMonitor.Result> { val id = scenario.id logger.info { "Constructing performance interference model" } - val traceDir = File( - tracePath, - scenario.trace.traceId - ) - val traceReader = RawParquetTraceReader(traceDir) + val workloadLoader = ComputeWorkloadLoader(tracePath) val interferenceGroups = let { - val path = File(traceDir, "performance-interference-model.json") + val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json") val operational = scenario.operationalPhenomena val enabled = operational.performanceInterferenceEnabled @@ -156,7 +151,7 @@ class RunnerCli : CliktCommand(name = "runner") { logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } - runRepeat(scenario, repeat, environment, traceReader, interferenceModel) + runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel) } } @@ -171,8 +166,8 @@ class RunnerCli : CliktCommand(name = "runner") { private suspend fun runRepeat( scenario: Scenario, repeat: Int, - environment: EnvironmentReader, - traceReader: RawParquetTraceReader, + topology: Topology, + workloadLoader: ComputeWorkloadLoader, interferenceModel: VmInterferenceModel? ): WebComputeMonitor.Result { val monitor = WebComputeMonitor() @@ -186,23 +181,18 @@ class RunnerCli : CliktCommand(name = "runner") { val operational = scenario.operationalPhenomena val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) + val workload = Workload(workloadName, trace(workloadName).sampleByLoad(workloadFraction)) - val trace = ParquetTraceReader( - listOf(traceReader), - Workload(workloadName, workloadFraction), - repeat - ) val failureModel = if (operational.failuresEnabled) - grid5000(Duration.ofDays(7), repeat) + grid5000(Duration.ofDays(7)) else null - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, - environment.read(), failureModel, interferenceModel.takeIf { operational.performanceInterferenceEnabled } ) @@ -210,7 +200,10 @@ class RunnerCli : CliktCommand(name = "runner") { val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1)) try { - simulator.run(trace) + // Instantiate the topology onto the simulator + simulator.apply(topology) + // Run workload trace + simulator.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { simulator.close() metricReader.close() @@ -292,56 +285,62 @@ class RunnerCli : CliktCommand(name = "runner") { } /** - * Convert the specified [topology] into an [EnvironmentReader] understood by Capelin. + * Convert the specified [topology] into an [Topology] understood by OpenDC. */ - private fun convert(topology: Topology): EnvironmentReader { - val nodes = mutableListOf<MachineDef>() - val random = Random(0) - - val machines = topology.rooms.asSequence() - .flatMap { room -> - room.tiles.flatMap { tile -> - tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList() - } - } - for ((rack, machine) in machines) { - val clusterId = rack.id - val position = machine.position - - val processors = machine.cpus.flatMap { cpu -> - val cores = cpu.numberOfCores - val speed = cpu.clockRateMhz - // TODO Remove hard coding of vendor - val node = ProcessingNode("Intel", "amd64", cpu.name, cores) - List(cores) { coreId -> - ProcessingUnit(node, coreId, speed) - } - } - val memoryUnits = machine.memory.map { memory -> - MemoryUnit( - "Samsung", - memory.name, - memory.speedMbPerS, - memory.sizeMb.toLong() - ) - } + private fun convert(topology: ClientTopology): Topology { + return object : Topology { + + override fun resolve(): List<HostSpec> { + val res = mutableListOf<HostSpec>() + val random = Random(0) + + val machines = topology.rooms.asSequence() + .flatMap { room -> + room.tiles.flatMap { tile -> + tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList() + } + } + for ((rack, machine) in machines) { + val clusterId = rack.id + val position = machine.position + + val processors = machine.cpus.flatMap { cpu -> + val cores = cpu.numberOfCores + val speed = cpu.clockRateMhz + // TODO Remove hard coding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.name, cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = machine.memory.map { memory -> + MemoryUnit( + "Samsung", + memory.name, + memory.speedMbPerS, + memory.sizeMb.toLong() + ) + } - val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } + val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } + val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) + val powerDriver = SimplePowerDriver(powerModel) - nodes.add( - MachineDef( - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$position", - mapOf("cluster" to clusterId), - MachineModel(processors, memoryUnits), - LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) - ) - ) - } + val spec = HostSpec( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf("cluster" to clusterId), + MachineModel(processors, memoryUnits), + powerDriver + ) + + res += spec + } + + return res + } - return object : EnvironmentReader { - override fun read(): List<MachineDef> = nodes - override fun close() {} + override fun toString(): String = "WebRunnerTopologyFactory" } } } diff --git a/settings.gradle.kts b/settings.gradle.kts index 933febe0..587f1cb2 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -25,6 +25,7 @@ include(":opendc-platform") include(":opendc-compute:opendc-compute-api") include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") +include(":opendc-compute:opendc-compute-workload") include(":opendc-workflow:opendc-workflow-api") include(":opendc-workflow:opendc-workflow-service") include(":opendc-faas:opendc-faas-api") @@ -50,7 +51,10 @@ include(":opendc-trace:opendc-trace-swf") include(":opendc-trace:opendc-trace-wtf") include(":opendc-trace:opendc-trace-wfformat") include(":opendc-trace:opendc-trace-bitbrains") +include(":opendc-trace:opendc-trace-azure") +include(":opendc-trace:opendc-trace-opendc") include(":opendc-trace:opendc-trace-parquet") +include(":opendc-trace:opendc-trace-tools") include(":opendc-harness:opendc-harness-api") include(":opendc-harness:opendc-harness-engine") include(":opendc-harness:opendc-harness-cli") diff --git a/traces/bitbrains-small/meta.parquet b/traces/bitbrains-small/meta.parquet Binary files differindex 43f51cb8..da6e5330 100644 --- a/traces/bitbrains-small/meta.parquet +++ b/traces/bitbrains-small/meta.parquet diff --git a/traces/bitbrains-small/trace.parquet b/traces/bitbrains-small/trace.parquet Binary files differindex f4dd5a50..fe0a254c 100644 --- a/traces/bitbrains-small/trace.parquet +++ b/traces/bitbrains-small/trace.parquet |
