From 859ce303f0b9110c7110b918e5957c2156fa8b26 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 17 Sep 2021 17:48:02 +0200 Subject: refactor(capelin): Extract common code out of Capelin experiments This change creates a new module for doing simulations with virtual machine workloads. We have found that a lot of code in the Capelin experiments code is being re-used by non-experiment modules. --- gradle/libs.versions.toml | 1 + .../opendc-compute-workload/build.gradle.kts | 49 ++++ .../compute/workload/ComputeWorkloadRunner.kt | 222 ++++++++++++++++++ .../org/opendc/compute/workload/FailureModel.kt | 38 +++ .../org/opendc/compute/workload/FailureModels.kt | 69 ++++++ .../org/opendc/compute/workload/env/MachineDef.kt | 38 +++ .../workload/export/parquet/ParquetDataWriter.kt | 145 ++++++++++++ .../export/parquet/ParquetExportMonitor.kt | 67 ++++++ .../export/parquet/ParquetHostDataWriter.kt | 104 ++++++++ .../export/parquet/ParquetServerDataWriter.kt | 97 ++++++++ .../export/parquet/ParquetServiceDataWriter.kt | 66 ++++++ .../workload/trace/RawParquetTraceReader.kt | 139 +++++++++++ .../workload/trace/StreamingParquetTraceReader.kt | 261 +++++++++++++++++++++ .../compute/workload/trace/TraceConverter.kt | 260 ++++++++++++++++++++ .../opendc/compute/workload/trace/TraceEntry.kt | 42 ++++ .../opendc/compute/workload/trace/TraceReader.kt | 32 +++ .../trace/azure/AzureResourceStateTable.kt | 127 ++++++++++ .../trace/azure/AzureResourceStateTableReader.kt | 149 ++++++++++++ .../workload/trace/azure/AzureResourceTable.kt | 54 +++++ .../trace/azure/AzureResourceTableReader.kt | 168 +++++++++++++ .../compute/workload/trace/azure/AzureTrace.kt | 46 ++++ .../workload/trace/azure/AzureTraceFormat.kt | 56 +++++ .../workload/trace/bp/BPResourceStateTable.kt | 53 +++++ .../trace/bp/BPResourceStateTableReader.kt | 103 ++++++++ .../compute/workload/trace/bp/BPResourceTable.kt | 53 +++++ .../workload/trace/bp/BPResourceTableReader.kt | 103 ++++++++ .../opendc/compute/workload/trace/bp/BPTrace.kt | 49 ++++ .../compute/workload/trace/bp/BPTraceFormat.kt | 47 ++++ .../opendc/compute/workload/trace/bp/Schemas.kt | 55 +++++ .../workload/trace/sv/SvResourceStateTable.kt | 138 +++++++++++ .../trace/sv/SvResourceStateTableReader.kt | 212 +++++++++++++++++ .../opendc/compute/workload/trace/sv/SvTrace.kt | 45 ++++ .../compute/workload/trace/sv/SvTraceFormat.kt | 47 ++++ .../workload/util/PerformanceInterferenceReader.kt | 68 ++++++ .../util/PerformanceInterferenceReaderTest.kt | 45 ++++ .../src/test/resources/perf-interference.json | 22 ++ .../opendc-experiments-capelin/build.gradle.kts | 11 +- .../org/opendc/experiments/capelin/Portfolio.kt | 21 +- .../capelin/env/ClusterEnvironmentReader.kt | 1 + .../experiments/capelin/env/EnvironmentReader.kt | 1 + .../opendc/experiments/capelin/env/MachineDef.kt | 38 --- .../capelin/export/parquet/AvroUtils.kt | 44 ---- .../capelin/export/parquet/ParquetDataWriter.kt | 145 ------------ .../capelin/export/parquet/ParquetExportMonitor.kt | 67 ------ .../export/parquet/ParquetHostDataWriter.kt | 101 -------- .../export/parquet/ParquetServerDataWriter.kt | 95 -------- .../export/parquet/ParquetServiceDataWriter.kt | 65 ----- .../capelin/trace/ParquetTraceReader.kt | 10 +- .../capelin/trace/PerformanceInterferenceReader.kt | 60 ----- .../capelin/trace/RawParquetTraceReader.kt | 139 ----------- .../capelin/trace/StreamingParquetTraceReader.kt | 261 --------------------- .../experiments/capelin/trace/TraceConverter.kt | 260 -------------------- .../opendc/experiments/capelin/trace/TraceEntry.kt | 44 ---- .../experiments/capelin/trace/TraceReader.kt | 32 --- .../experiments/capelin/trace/VmPlacementReader.kt | 47 ---- .../experiments/capelin/trace/WorkloadSampler.kt | 3 +- .../capelin/trace/azure/AzureResourceStateTable.kt | 127 ---------- .../trace/azure/AzureResourceStateTableReader.kt | 149 ------------ .../capelin/trace/azure/AzureResourceTable.kt | 54 ----- .../trace/azure/AzureResourceTableReader.kt | 169 ------------- .../experiments/capelin/trace/azure/AzureTrace.kt | 46 ---- .../capelin/trace/azure/AzureTraceFormat.kt | 56 ----- .../capelin/trace/bp/BPResourceStateTable.kt | 53 ----- .../capelin/trace/bp/BPResourceStateTableReader.kt | 103 -------- .../capelin/trace/bp/BPResourceTable.kt | 53 ----- .../capelin/trace/bp/BPResourceTableReader.kt | 103 -------- .../opendc/experiments/capelin/trace/bp/BPTrace.kt | 49 ---- .../experiments/capelin/trace/bp/BPTraceFormat.kt | 47 ---- .../opendc/experiments/capelin/trace/bp/Schemas.kt | 55 ----- .../capelin/trace/sv/SvResourceStateTable.kt | 138 ----------- .../capelin/trace/sv/SvResourceStateTableReader.kt | 212 ----------------- .../opendc/experiments/capelin/trace/sv/SvTrace.kt | 45 ---- .../experiments/capelin/trace/sv/SvTraceFormat.kt | 47 ---- .../capelin/util/ComputeServiceSimulator.kt | 222 ------------------ .../experiments/capelin/util/FailureModel.kt | 38 --- .../experiments/capelin/util/FailureModels.kt | 97 -------- .../experiments/capelin/util/VmPlacementReader.kt | 47 ++++ .../src/main/resources/log4j2.xml | 2 +- .../experiments/capelin/CapelinIntegrationTest.kt | 17 +- .../trace/PerformanceInterferenceReaderTest.kt | 45 ---- .../org/opendc/trace/util/parquet/AvroUtils.kt | 44 ++++ .../src/main/kotlin/org/opendc/web/runner/Main.kt | 12 +- settings.gradle.kts | 1 + 83 files changed, 3408 insertions(+), 3338 deletions(-) create mode 100644 opendc-compute/opendc-compute-workload/build.gradle.kts create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTable.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTableReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTable.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTableReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTrace.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTraceFormat.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTable.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTableReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTable.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTableReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTrace.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTraceFormat.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/Schemas.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTable.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTableReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTrace.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTraceFormat.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt create mode 100644 opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3f0e180b..11580338 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -53,6 +53,7 @@ progressbar = { module = "me.tongfei:progressbar", version.ref = "progressbar" } # Format jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson" } +jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" } jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" } diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts new file mode 100644 index 00000000..390c7455 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support library for simulating VM-based workloads with OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcCompute.opendcComputeSimulator) + + implementation(projects.opendcTrace.opendcTraceParquet) + implementation(projects.opendcTrace.opendcTraceBitbrains) + implementation(projects.opendcSimulator.opendcSimulatorCore) + implementation(projects.opendcSimulator.opendcSimulatorCompute) + implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) + implementation(libs.opentelemetry.semconv) + + implementation(libs.kotlin.logging) + implementation(libs.clikt) + implementation(libs.jackson.databind) + implementation(libs.jackson.module.kotlin) + implementation(libs.jackson.dataformat.csv) + implementation(kotlin("reflect")) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt new file mode 100644 index 00000000..cc9f2705 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.workload.env.MachineDef +import org.opendc.compute.workload.trace.TraceReader +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * Helper class to simulated VM-based workloads in OpenDC. + */ +public class ComputeWorkloadRunner( + private val context: CoroutineContext, + private val clock: Clock, + scheduler: ComputeScheduler, + machines: List, + private val failureModel: FailureModel? = null, + interferenceModel: VmInterferenceModel? = null, + hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() +) : AutoCloseable { + /** + * The [ComputeService] that has been configured by the manager. + */ + public val service: ComputeService + + /** + * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. + */ + public val producers: List + get() = _metricProducers + private val _metricProducers = mutableListOf() + + /** + * The [SimResourceInterpreter] to simulate the hosts. + */ + private val interpreter = SimResourceInterpreter(context, clock) + + /** + * The hosts that belong to this class. + */ + private val hosts = mutableSetOf() + + init { + val (service, serviceMeterProvider) = createService(scheduler) + this._metricProducers.add(serviceMeterProvider) + this.service = service + + for (def in machines) { + val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) + this._metricProducers.add(hostMeterProvider) + hosts.add(host) + this.service.addHost(host) + } + } + + /** + * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. + */ + public suspend fun run(reader: TraceReader) { + val injector = failureModel?.createInjector(context, clock, service) + val client = service.newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } + delay(max(0, (entry.start - offset) - clock.millis())) + + launch { + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + mapOf("workload" to workload) + ) + + // Wait for the server reach its end time + val endTime = entry.meta["end-time"] as Long + delay(endTime + workloadOffset - clock.millis() + 1) + + // Delete the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + + yield() + } finally { + injector?.close() + reader.close() + client.close() + } + } + + override fun close() { + service.close() + + for (host in hosts) { + host.close() + } + + hosts.clear() + } + + /** + * Construct a [ComputeService] instance. + */ + private fun createService(scheduler: ComputeScheduler): Pair { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val service = ComputeService(context, clock, meterProvider, scheduler) + return service to meterProvider + } + + /** + * Construct a [SimHost] instance for the specified [MachineDef]. + */ + private fun createHost( + def: MachineDef, + hypervisorProvider: SimHypervisorProvider, + interferenceModel: VmInterferenceModel? = null + ): Pair { + val resource = Resource.builder() + .put(HOST_ID, def.uid.toString()) + .put(HOST_NAME, def.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, def.model.cpus.size) + .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val host = SimHost( + def.uid, + def.name, + def.model, + def.meta, + context, + interpreter, + meterProvider, + hypervisorProvider, + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() + ) + + return host to meterProvider + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt new file mode 100644 index 00000000..43dd8321 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +public interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt new file mode 100644 index 00000000..55c61be1 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.compute.workload + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln +import kotlin.random.Random + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +public fun grid5000(failureInterval: Duration, seed: Int): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService + ): HostFaultInjector { + val rng = Well19937c(seed) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt new file mode 100644 index 00000000..c1695696 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.env + +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerModel +import java.util.* + +/** + * A definition of a machine in a cluster. + */ +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map, + val model: MachineModel, + val powerModel: PowerModel +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt new file mode 100644 index 00000000..4172d729 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import mu.KotlinLogging +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.util.parquet.LocalOutputFile +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * A writer that writes data in Parquet format. + */ +public abstract class ParquetDataWriter( + path: File, + private val schema: Schema, + bufferSize: Int = 4096 +) : AutoCloseable { + /** + * The logging instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + + /** + * An exception to be propagated to the actual writer. + */ + private var exception: Throwable? = null + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = AvroParquetWriter.builder(LocalOutputFile(path)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf() + var shouldStop = false + + try { + while (!shouldStop) { + try { + process(writer, queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + process(writer, data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + return builder.build() + } + + /** + * Convert the specified [data] into a Parquet record. + */ + protected abstract fun convert(builder: GenericRecordBuilder, data: T) + + /** + * Write the specified metrics to the database. + */ + public fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) + } + + /** + * Signal the writer to stop. + */ + override fun close() { + writerThread.interrupt() + writerThread.join() + } + + init { + writerThread.start() + } + + /** + * Process the specified [data] to be written to the Parquet file. + */ + private fun process(writer: ParquetWriter, data: T) { + val builder = GenericRecordBuilder(schema) + convert(builder, data) + writer.write(builder.build()) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt new file mode 100644 index 00000000..f41a2241 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServiceData +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(data: ServerData) { + serverWriter.write(data) + } + + override fun record(data: HostData) { + hostWriter.write(data) + } + + override fun record(data: ServiceData) { + serviceWriter.write(data) + } + + override fun close() { + hostWriter.close() + serviceWriter.close() + serverWriter.close() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt new file mode 100644 index 00000000..37066a0d --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.HostData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.UUID_SCHEMA +import org.opendc.trace.util.parquet.optional +import java.io.File + +/** + * A Parquet event writer for [HostData]s. + */ +public class ParquetHostDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: HostData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["host_id"] = data.host.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() + } + + builder["cpu_count"] = data.host.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.host.memCapacity + + builder["power_total"] = data.powerTotal + + builder["guests_terminated"] = data.guestsTerminated + builder["guests_running"] = data.guestsRunning + builder["guests_error"] = data.guestsError + builder["guests_invalid"] = data.guestsInvalid + } + + override fun toString(): String = "host-writer" + + private companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("host") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .requiredDouble("power_total") + .requiredInt("guests_terminated") + .requiredInt("guests_running") + .requiredInt("guests_error") + .requiredInt("guests_invalid") + .endRecord() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..bea23d32 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.UUID_SCHEMA +import org.opendc.trace.util.parquet.optional +import java.io.File + +/** + * A Parquet event writer for [ServerData]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: ServerData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["server_id"] = data.server.id + builder["host_id"] = data.host?.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() + } + builder["scheduling_latency"] = data.schedulingLatency + + builder["cpu_count"] = data.server.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.server.memCapacity + } + + override fun toString(): String = "server-writer" + + private companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("server") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("server_id").type(UUID_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA.optional()).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredLong("scheduling_latency") + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .endRecord() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt new file mode 100644 index 00000000..47824b29 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecordBuilder +import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import java.io.File + +/** + * A Parquet event writer for [ServiceData]s. + */ +public class ParquetServiceDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, SCHEMA, bufferSize) { + + override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + builder["hosts_up"] = data.hostsUp + builder["hosts_down"] = data.hostsDown + builder["servers_pending"] = data.serversPending + builder["servers_active"] = data.serversActive + builder["attempts_success"] = data.attemptsSuccess + builder["attempts_failure"] = data.attemptsFailure + builder["attempts_error"] = data.attemptsError + } + + override fun toString(): String = "service-writer" + + private companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("service") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .requiredInt("hosts_up") + .requiredInt("hosts_down") + .requiredInt("servers_pending") + .requiredInt("servers_active") + .requiredInt("attempts_success") + .requiredInt("attempts_failure") + .requiredInt("attempts_error") + .endRecord() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt new file mode 100644 index 00000000..ae20482d --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace + +import org.opendc.compute.workload.trace.bp.BPTraceFormat +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.trace.* +import java.io.File +import java.util.UUID + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param path The directory of the traces. + */ +public class RawParquetTraceReader(private val path: File) { + /** + * The [Trace] that represents this trace. + */ + private val trace = BPTraceFormat().open(path.toURI().toURL()) + + /** + * Read the fragments into memory. + */ + private fun parseFragments(): Map> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + val fragments = mutableMapOf>() + + return try { + while (reader.nextRow()) { + val id = reader.get(RESOURCE_STATE_ID) + val time = reader.get(RESOURCE_STATE_TIMESTAMP) + val duration = reader.get(RESOURCE_STATE_DURATION) + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) + + val fragment = SimTraceWorkload.Fragment( + time.toEpochMilli(), + duration.toMillis(), + cpuUsage, + cores + ) + + fragments.getOrPut(id) { mutableListOf() }.add(fragment) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(fragments: Map>): List> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + + var counter = 0 + val entries = mutableListOf>() + + return try { + while (reader.nextRow()) { + + val id = reader.get(RESOURCE_ID) + if (!fragments.containsKey(id)) { + continue + } + + val submissionTime = reader.get(RESOURCE_START_TIME) + val endTime = reader.get(RESOURCE_STOP_TIME) + val maxCores = reader.getInt(RESOURCE_NCPUS) + val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + val vmFragments = fragments.getValue(id).asSequence() + val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs + val workload = SimTraceWorkload(vmFragments) + entries.add( + TraceEntry( + uid, id, submissionTime.toEpochMilli(), workload, + mapOf( + "submit-time" to submissionTime.toEpochMilli(), + "end-time" to endTime.toEpochMilli(), + "total-load" to totalLoad, + "cores" to maxCores, + "required-memory" to requiredMemory.toLong(), + "workload" to workload + ) + ) + ) + } + + entries + } catch (e: Exception) { + e.printStackTrace() + throw e + } finally { + reader.close() + } + } + + /** + * The entries in the trace. + */ + private val entries: List> + + init { + val fragments = parseFragments() + entries = parseMeta(fragments) + } + + /** + * Read the entries in the trace. + */ + public fun read(): List> = entries +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt new file mode 100644 index 00000000..36cd0a7d --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace + +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Statistics +import org.apache.parquet.filter2.predicate.UserDefinedPredicate +import org.apache.parquet.io.api.Binary +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.trace.util.parquet.LocalInputFile +import java.io.File +import java.io.Serializable +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread + +/** + * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. + * + * @param traceFile The directory of the traces. + * @param selectedVms The list of VMs to read from the trace. + */ +public class StreamingParquetTraceReader(traceFile: File, selectedVms: List = emptyList()) : TraceReader { + private val logger = KotlinLogging.logger {} + + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * The intermediate buffer to store the read records in. + */ + private val queue = ArrayBlockingQueue>(1024) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get( + FilterApi.userDefined( + FilterApi.binaryColumn("id"), + SelectedVmFilter( + TreeSet(selectedVms) + ) + ) + ) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader + .builder(LocalInputFile(File(traceFile, "trace.parquet"))) + .disableCompatibility() + .withFilter(filter) + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val time = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + + val fragment = SimTraceWorkload.Fragment( + time, + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map>>) { + if (!hasNext) { + return + } + + val fragments = mutableListOf>() + queue.drainTo(fragments) + + for ((id, fragment) in fragments) { + if (id == poison.first) { + hasNext = false + return + } + buffers[id]?.forEach { it.add(fragment) } + } + } + + /** + * A flag to indicate whether the reader has more entries. + */ + private var hasNext: Boolean = true + + /** + * Initialize the reader. + */ + init { + val takenIds = mutableSetOf() + val entries = mutableMapOf() + val buffers = mutableMapOf>>() + + val metaReader = AvroParquetReader + .builder(LocalInputFile(File(traceFile, "meta.parquet"))) + .disableCompatibility() + .withFilter(filter) + .build() + + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + entries[id] = record + } + + metaReader.close() + + val selection = selectedVms.ifEmpty { entries.keys } + + // Create the entry iterator + iterator = selection.asSequence() + .mapNotNull { entries[it] } + .mapIndexed { index, record -> + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) + + assert(uid !in takenIds) + takenIds += uid + + logger.info { "Processing VM $id" } + + val internalBuffer = mutableListOf() + val externalBuffer = mutableListOf() + buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) + val fragments = sequence { + var time = submissionTime + repeat@ while (true) { + if (externalBuffer.isEmpty()) { + if (hasNext) { + pull(buffers) + continue + } else { + break + } + } + + internalBuffer.addAll(externalBuffer) + externalBuffer.clear() + + for (fragment in internalBuffer) { + yield(fragment) + + time += fragment.duration + if (time >= endTime) { + break@repeat + } + } + + internalBuffer.clear() + } + + buffers.remove(id) + } + val workload = SimTraceWorkload(fragments) + val meta = mapOf( + "cores" to maxCores, + "required-memory" to requiredMemory, + "workload" to workload + ) + + TraceEntry(uid, id, submissionTime, workload, meta) + } + .sortedBy { it.start } + .toList() + .iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() { + readerThread.interrupt() + } + + private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { + override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) + + override fun canDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() + } + + override fun inverseCanDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt new file mode 100644 index 00000000..bae7cb22 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workload.vm.trace + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.cooccurring +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.* +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.compute.workload.trace.azure.AzureTraceFormat +import org.opendc.compute.workload.trace.bp.BP_RESOURCES_SCHEMA +import org.opendc.compute.workload.trace.bp.BP_RESOURCE_STATES_SCHEMA +import org.opendc.compute.workload.trace.sv.SvTraceFormat +import org.opendc.trace.* +import org.opendc.trace.bitbrains.BitbrainsTraceFormat +import org.opendc.trace.util.parquet.LocalOutputFile +import java.io.File +import java.util.* +import kotlin.math.max +import kotlin.math.min +import kotlin.math.roundToLong + +/** + * A script to convert a trace in text format into a Parquet trace. + */ +public fun main(args: Array): Unit = TraceConverterCli().main(args) + +/** + * Represents the command for converting traces + */ +internal class TraceConverterCli : CliktCommand(name = "trace-converter") { + /** + * The logger instance for the converter. + */ + private val logger = KotlinLogging.logger {} + + /** + * The directory where the trace should be stored. + */ + private val output by option("-O", "--output", help = "path to store the trace") + .file(canBeFile = false, mustExist = false) + .defaultLazy { File("output") } + + /** + * The directory where the input trace is located. + */ + private val input by argument("input", help = "path to the input trace") + .file(canBeFile = false) + + /** + * The input format of the trace. + */ + private val format by option("-f", "--format", help = "input format of trace") + .choice( + "solvinity" to SvTraceFormat(), + "bitbrains" to BitbrainsTraceFormat(), + "azure" to AzureTraceFormat() + ) + .required() + + /** + * The sampling options. + */ + private val samplingOptions by SamplingOptions().cooccurring() + + override fun run() { + val metaParquet = File(output, "meta.parquet") + val traceParquet = File(output, "trace.parquet") + + if (metaParquet.exists()) { + metaParquet.delete() + } + if (traceParquet.exists()) { + traceParquet.delete() + } + + val trace = format.open(input.toURI().toURL()) + + logger.info { "Building resources table" } + + val metaWriter = AvroParquetWriter.builder(LocalOutputFile(metaParquet)) + .withSchema(BP_RESOURCES_SCHEMA) + .withCompressionCodec(CompressionCodecName.ZSTD) + .enablePageWriteChecksum() + .build() + + val selectedVms = metaWriter.use { convertResources(trace, it) } + + logger.info { "Wrote ${selectedVms.size} rows" } + logger.info { "Building resource states table" } + + val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) + .withSchema(BP_RESOURCE_STATES_SCHEMA) + .withCompressionCodec(CompressionCodecName.ZSTD) + .enableDictionaryEncoding() + .enablePageWriteChecksum() + .withBloomFilterEnabled("id", true) + .withBloomFilterNDV("id", selectedVms.size.toLong()) + .build() + + val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } + logger.info { "Wrote $statesCount rows" } + } + + /** + * Convert the resources table for the trace. + */ + private fun convertResources(trace: Trace, writer: ParquetWriter): Set { + val random = samplingOptions?.let { Random(it.seed) } + val samplingFraction = samplingOptions?.fraction ?: 1.0 + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + var hasNextRow = reader.nextRow() + val selectedVms = mutableSetOf() + + while (hasNextRow) { + var id: String + var numCpus = Int.MIN_VALUE + var memCapacity = Double.MIN_VALUE + var memUsage = Double.MIN_VALUE + var startTime = Long.MAX_VALUE + var stopTime = Long.MIN_VALUE + + do { + id = reader.get(RESOURCE_STATE_ID) + + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + startTime = min(startTime, timestamp) + stopTime = max(stopTime, timestamp) + + numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS)) + + memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) + if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { + memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) + } + + hasNextRow = reader.nextRow() + } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + + // Sample only a fraction of the VMs + if (random != null && random.nextDouble() > samplingFraction) { + continue + } + + val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA) + + builder["id"] = id + builder["submissionTime"] = startTime + builder["endTime"] = stopTime + builder["maxCores"] = numCpus + builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong() + + logger.info { "Selecting VM $id" } + + writer.write(builder.build()) + selectedVms.add(id) + } + + return selectedVms + } + + /** + * Convert the resource states table for the trace. + */ + private fun convertResourceStates(trace: Trace, writer: ParquetWriter, selectedVms: Set): Int { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + var hasNextRow = reader.nextRow() + var count = 0 + + while (hasNextRow) { + var lastTimestamp = Long.MIN_VALUE + + do { + val id = reader.get(RESOURCE_STATE_ID) + + if (id !in selectedVms) { + hasNextRow = reader.nextRow() + continue + } + + val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA) + builder["id"] = id + + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + if (lastTimestamp < 0) { + lastTimestamp = timestamp - 5 * 60 * 1000L + } + + val duration = timestamp - lastTimestamp + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) + val flops = (cpuUsage * duration / 1000.0).roundToLong() + + builder["time"] = timestamp + builder["duration"] = duration + builder["cores"] = cores + builder["cpuUsage"] = cpuUsage + builder["flops"] = flops + + writer.write(builder.build()) + + lastTimestamp = timestamp + hasNextRow = reader.nextRow() + } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + + count++ + } + + return count + } + + /** + * Options for sampling the workload trace. + */ + private class SamplingOptions : OptionGroup() { + /** + * The fraction of VMs to sample + */ + val fraction by option("--sampling-fraction", help = "fraction of the workload to sample") + .double() + .restrictTo(0.0001, 1.0) + .required() + + /** + * The seed for sampling the trace. + */ + val seed by option("--sampling-seed", help = "seed for sampling the workload") + .long() + .default(0) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt new file mode 100644 index 00000000..bfa2d051 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace + +import java.util.UUID + +/** + * An entry in a workload trace. + * + * @param uid The unique identifier of the entry. + * @param name The name of the entry. + * @param start The start time of the workload. + * @param workload The workload of the entry. + * @param meta The meta-data associated with the workload. + */ +public data class TraceEntry( + val uid: UUID, + val name: String, + val start: Long, + val workload: T, + val meta: Map +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt new file mode 100644 index 00000000..b0795d61 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace + +/** + * An interface for reading workloads into memory. + * + * This interface must guarantee that the entries are delivered in order of submission time. + * + * @param T The shape of the workloads supported by this reader. + */ +public interface TraceReader : Iterator>, AutoCloseable diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTable.kt new file mode 100644 index 00000000..32843595 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTable.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resource state [Table] for the Azure v1 VM traces. + */ +internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table { + /** + * The partitions that belong to the table. + */ + private val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + + override val name: String = TABLE_RESOURCE_STATES + + override val isSynthetic: Boolean = false + + override val columns: List> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT + ) + + override fun newReader(): TableReader { + val it = partitions.iterator() + + return object : TableReader { + var delegate: TableReader? = nextDelegate() + + override fun nextRow(): Boolean { + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextDelegate() + } + + this.delegate = delegate + return delegate != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false + + override fun get(column: TableColumn): T { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(column) + } + + override fun getBoolean(column: TableColumn): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(column) + } + + override fun getInt(column: TableColumn): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(column) + } + + override fun getLong(column: TableColumn): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(column) + } + + override fun getDouble(column: TableColumn): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(column) + } + + override fun close() { + delegate?.close() + } + + private fun nextDelegate(): TableReader? { + return if (it.hasNext()) { + val (_, path) = it.next() + return AzureResourceStateTableReader(factory.createParser(path.toFile())) + } else { + null + } + } + + override fun toString(): String = "AzureCompositeTableReader" + } + } + + override fun newReader(partition: String): TableReader { + val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } + return AzureResourceStateTableReader(factory.createParser(path.toFile())) + } + + override fun toString(): String = "AzureResourceStateTable" +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTableReader.kt new file mode 100644 index 00000000..ded9d4d6 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTableReader.kt @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.azure + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.* +import java.time.Instant + +/** + * A [TableReader] for the Azure v1 VM resource state table. + */ +internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader { + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue) + "vm id" -> id = parser.text + "avg cpu" -> cpuUsagePct = parser.doubleValue + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any? = when (column) { + RESOURCE_STATE_ID -> id + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + return when (column) { + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + parser.close() + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var timestamp: Instant? = null + private var cpuUsagePct = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + id = null + timestamp = null + cpuUsagePct = Double.NaN + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) + .addColumn("vm id", CsvSchema.ColumnType.STRING) + .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTable.kt new file mode 100644 index 00000000..2bed7725 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTable.kt @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * The resource [Table] for the Azure v1 VM traces. + */ +internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table { + override val name: String = TABLE_RESOURCES + + override val isSynthetic: Boolean = false + + override val columns: List> = listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_NCPUS, + RESOURCE_MEM_CAPACITY + ) + + override fun newReader(): TableReader { + return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("No partition $partition") + } + + override fun toString(): String = "AzureResourceTable" +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTableReader.kt new file mode 100644 index 00000000..108ce4ed --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTableReader.kt @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.azure + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.* +import java.time.Instant + +/** + * A [TableReader] for the Azure v1 VM resources table. + */ +internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader { + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "vm id" -> id = parser.text + "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue) + "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue) + "vm virtual core count" -> cpuCores = parser.intValue + "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + RESOURCE_START_TIME -> true + RESOURCE_STOP_TIME -> true + RESOURCE_NCPUS -> true + RESOURCE_MEM_CAPACITY -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any? = when (column) { + RESOURCE_ID -> id + RESOURCE_START_TIME -> startTime + RESOURCE_STOP_TIME -> stopTime + RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + return when (column) { + RESOURCE_NCPUS -> cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + return when (column) { + RESOURCE_MEM_CAPACITY -> memCapacity + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + parser.close() + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var startTime: Instant? = null + private var stopTime: Instant? = null + private var cpuCores = -1 + private var memCapacity = Double.NaN + + /** + * Reset the state. + */ + fun reset() { + id = null + startTime = null + stopTime = null + cpuCores = -1 + memCapacity = Double.NaN + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("vm id", CsvSchema.ColumnType.NUMBER) + .addColumn("subscription id", CsvSchema.ColumnType.STRING) + .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) + .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("vm category", CsvSchema.ColumnType.NUMBER) + .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) + .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTrace.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTrace.kt new file mode 100644 index 00000000..93c2210b --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTrace.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * [Trace] implementation for the Azure v1 VM traces. + */ +public class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { + override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = name in tables + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_RESOURCES -> AzureResourceTable(factory, path) + TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path) + else -> null + } + } + + override fun toString(): String = "AzureTrace[$path]" +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTraceFormat.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTraceFormat.kt new file mode 100644 index 00000000..d400e1da --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTraceFormat.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the Azure v1 format. + */ +public class AzureTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "azure-v1" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + /** + * Open the trace file. + */ + override fun open(url: URL): AzureTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return AzureTrace(factory, path) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTable.kt new file mode 100644 index 00000000..958ed49d --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTable.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path + +/** + * The resource state [Table] in the Bitbrains Parquet format. + */ +internal class BPResourceStateTable(private val path: Path) : Table { + override val name: String = TABLE_RESOURCE_STATES + override val isSynthetic: Boolean = false + + override val columns: List> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_DURATION, + RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_USAGE, + ) + + override fun newReader(): TableReader { + val reader = LocalParquetReader(path.resolve("trace.parquet")) + return BPResourceStateTableReader(reader) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Unknown partition $partition") + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTableReader.kt new file mode 100644 index 00000000..655da2b6 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTableReader.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant + +/** + * A [TableReader] implementation for the Bitbrains Parquet format. + */ +internal class BPResourceStateTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: GenericRecord? = null + + override fun nextRow(): Boolean { + record = reader.read() + return record != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_DURATION -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_USAGE -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val record = checkNotNull(record) { "Reader in invalid state" } + + @Suppress("UNCHECKED_CAST") + val res: Any = when (column) { + RESOURCE_STATE_ID -> record["id"].toString() + RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long) + RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long) + RESOURCE_STATE_NCPUS -> record["cores"] + RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + RESOURCE_STATE_NCPUS -> record["cores"] as Int + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (column) { + RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "BPResourceStateTableReader" +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTable.kt new file mode 100644 index 00000000..75782486 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTable.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path + +/** + * The resource [Table] in the Bitbrains Parquet format. + */ +internal class BPResourceTable(private val path: Path) : Table { + override val name: String = TABLE_RESOURCES + override val isSynthetic: Boolean = false + + override val columns: List> = listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_NCPUS, + RESOURCE_MEM_CAPACITY + ) + + override fun newReader(): TableReader { + val reader = LocalParquetReader(path.resolve("meta.parquet")) + return BPResourceTableReader(reader) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Unknown partition $partition") + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTableReader.kt new file mode 100644 index 00000000..323ee9d0 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTableReader.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Instant + +/** + * A [TableReader] implementation for the Bitbrains Parquet format. + */ +internal class BPResourceTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: GenericRecord? = null + + override fun nextRow(): Boolean { + record = reader.read() + return record != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + RESOURCE_START_TIME -> true + RESOURCE_STOP_TIME -> true + RESOURCE_NCPUS -> true + RESOURCE_MEM_CAPACITY -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val record = checkNotNull(record) { "Reader in invalid state" } + + @Suppress("UNCHECKED_CAST") + val res: Any = when (column) { + RESOURCE_ID -> record["id"].toString() + RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) + RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) + RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + RESOURCE_NCPUS -> record["maxCores"] as Int + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "BPResourceTableReader" +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTrace.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTrace.kt new file mode 100644 index 00000000..b4e64fab --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTrace.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.bp + +import org.opendc.trace.TABLE_RESOURCES +import org.opendc.trace.TABLE_RESOURCE_STATES +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * A [Trace] in the Bitbrains Parquet format. + */ +public class BPTrace internal constructor(private val path: Path) : Trace { + override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = + name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_RESOURCES -> BPResourceTable(path) + TABLE_RESOURCE_STATES -> BPResourceStateTable(path) + else -> null + } + } + + override fun toString(): String = "BPTrace[$path]" +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTraceFormat.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTraceFormat.kt new file mode 100644 index 00000000..9662476d --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.bp + +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the GWF trace format. + */ +public class BPTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "bitbrains-parquet" + + /** + * Open a Bitbrains Parquet trace. + */ + override fun open(url: URL): BPTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return BPTrace(path) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/Schemas.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/Schemas.kt new file mode 100644 index 00000000..4f6dbce3 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/Schemas.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.bp + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder + +/** + * Schema for the resources table in the trace. + */ +public val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder + .record("meta") + .namespace("org.opendc.trace.capelin") + .fields() + .requiredString("id") + .requiredLong("submissionTime") + .requiredLong("endTime") + .requiredInt("maxCores") + .requiredLong("requiredMemory") + .endRecord() + +/** + * Schema for the resource states table in the trace. + */ +public val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder + .record("meta") + .namespace("org.opendc.trace.capelin") + .fields() + .requiredString("id") + .requiredLong("time") + .requiredLong("duration") + .requiredInt("cores") + .requiredDouble("cpuUsage") + .requiredLong("flops") + .endRecord() diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTable.kt new file mode 100644 index 00000000..3ff69d36 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTable.kt @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.sv + +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.bufferedReader +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resource state [Table] in the extended Bitbrains format. + */ +internal class SvResourceStateTable(path: Path) : Table { + /** + * The partitions that belong to the table. + */ + private val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "txt" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + + override val name: String = TABLE_RESOURCE_STATES + + override val isSynthetic: Boolean = false + + override val columns: List> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_CLUSTER_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_CPU_DEMAND, + RESOURCE_STATE_CPU_READY_PCT, + RESOURCE_STATE_MEM_CAPACITY, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + ) + + override fun newReader(): TableReader { + val it = partitions.iterator() + + return object : TableReader { + var delegate: TableReader? = nextDelegate() + + override fun nextRow(): Boolean { + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextDelegate() + } + + this.delegate = delegate + return delegate != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false + + override fun get(column: TableColumn): T { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(column) + } + + override fun getBoolean(column: TableColumn): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(column) + } + + override fun getInt(column: TableColumn): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(column) + } + + override fun getLong(column: TableColumn): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(column) + } + + override fun getDouble(column: TableColumn): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(column) + } + + override fun close() { + delegate?.close() + } + + private fun nextDelegate(): TableReader? { + return if (it.hasNext()) { + val (_, path) = it.next() + val reader = path.bufferedReader() + return SvResourceStateTableReader(reader) + } else { + null + } + } + + override fun toString(): String = "SvCompositeTableReader" + } + } + + override fun newReader(partition: String): TableReader { + val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } + val reader = path.bufferedReader() + return SvResourceStateTableReader(reader) + } + + override fun toString(): String = "SvResourceStateTable" +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTableReader.kt new file mode 100644 index 00000000..487be950 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTableReader.kt @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.sv + +import org.opendc.trace.* +import java.io.BufferedReader +import java.time.Instant + +/** + * A [TableReader] for the Bitbrains resource state table. + */ +internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader { + override fun nextRow(): Boolean { + reset() + + var line: String + var num = 0 + + while (true) { + line = reader.readLine() ?: return false + num++ + + if (line[0] == '#' || line.isBlank()) { + // Ignore empty lines or comments + continue + } + + break + } + + line = line.trim() + + val length = line.length + var col = 0 + var start: Int + var end = 0 + + while (end < length) { + // Trim all whitespace before the field + start = end + while (start < length && line[start].isWhitespace()) { + start++ + } + + end = line.indexOf(' ', start) + + if (end < 0) { + end = length + } + + val field = line.subSequence(start, end) as String + when (col++) { + COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10)) + COL_CPU_USAGE -> cpuUsage = field.toDouble() + COL_CPU_DEMAND -> cpuDemand = field.toDouble() + COL_DISK_READ -> diskRead = field.toDouble() + COL_DISK_WRITE -> diskWrite = field.toDouble() + COL_CLUSTER_ID -> cluster = field.trim() + COL_NCPUS -> cpuCores = field.toInt(10) + COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble() + COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 + COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() + COL_ID -> id = field.trim() + COL_MEM_CAPACITY -> memCapacity = field.toDouble() + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_CLUSTER_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_STATE_CPU_USAGE -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + RESOURCE_STATE_CPU_DEMAND -> true + RESOURCE_STATE_CPU_READY_PCT -> true + RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_STATE_DISK_READ -> true + RESOURCE_STATE_DISK_WRITE -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any? = when (column) { + RESOURCE_STATE_ID -> id + RESOURCE_STATE_CLUSTER_ID -> cluster + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) + RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) + RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) + RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + return when (column) { + RESOURCE_STATE_POWERED_ON -> poweredOn + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getInt(column: TableColumn): Int { + return when (column) { + RESOURCE_STATE_NCPUS -> cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + return when (column) { + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity + RESOURCE_STATE_CPU_DEMAND -> cpuDemand + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + reader.close() + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var cluster: String? = null + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuDemand = Double.NaN + private var cpuReadyPct = Double.NaN + private var memCapacity = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var poweredOn: Boolean = false + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + timestamp = null + cluster = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuDemand = Double.NaN + cpuReadyPct = Double.NaN + memCapacity = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + poweredOn = false + } + + /** + * Default column indices for the extended Bitbrains format. + */ + private val COL_TIMESTAMP = 0 + private val COL_CPU_USAGE = 1 + private val COL_CPU_DEMAND = 2 + private val COL_DISK_READ = 4 + private val COL_DISK_WRITE = 6 + private val COL_CLUSTER_ID = 10 + private val COL_NCPUS = 12 + private val COL_CPU_READY_PCT = 13 + private val COL_POWERED_ON = 14 + private val COL_CPU_CAPACITY = 18 + private val COL_ID = 19 + private val COL_MEM_CAPACITY = 20 +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTrace.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTrace.kt new file mode 100644 index 00000000..932c73ab --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTrace.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.sv + +import org.opendc.trace.* +import java.nio.file.Path + +/** + * [Trace] implementation for the extended Bitbrains format. + */ +public class SvTrace internal constructor(private val path: Path) : Trace { + override val tables: List = listOf(TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null + } + + return SvResourceStateTable(path) + } + + override fun toString(): String = "SvTrace[$path]" +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTraceFormat.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTraceFormat.kt new file mode 100644 index 00000000..ba673b5f --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.trace.sv + +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the extended Bitbrains trace format. + */ +public class SvTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "sv" + + /** + * Open the trace file. + */ + override fun open(url: URL): SvTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return SvTrace(path) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt new file mode 100644 index 00000000..67f9626c --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup +import java.io.File +import java.io.InputStream + +/** + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. + */ +public class PerformanceInterferenceReader { + /** + * The [ObjectMapper] to use. + */ + private val mapper = jacksonObjectMapper() + + init { + mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) + } + + /** + * Read the performance interface model from [file]. + */ + public fun read(file: File): List { + return mapper.readValue(file) + } + + /** + * Read the performance interface model from the input. + */ + public fun read(input: InputStream): List { + return mapper.readValue(input) + } + + private data class GroupMixin( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set, + ) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt new file mode 100644 index 00000000..c79f0584 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll + +/** + * Test suite for the [PerformanceInterferenceReader] class. + */ +class PerformanceInterferenceReaderTest { + @Test + fun testSmoke() { + val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) + val result = PerformanceInterferenceReader().read(input) + + assertAll( + { assertEquals(2, result.size) }, + { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, + { assertEquals(0.0, result[0].targetLoad, 0.001) }, + { assertEquals(0.8830158730158756, result[0].score, 0.001) } + ) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json new file mode 100644 index 00000000..1be5852b --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json @@ -0,0 +1,22 @@ +[ + { + "vms": [ + "vm_a", + "vm_c", + "vm_x", + "vm_y" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "vm_a", + "vm_b", + "vm_c", + "vm_d" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 7dadd14d..010d18b0 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -31,6 +31,8 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) + api(projects.opendcCompute.opendcComputeWorkload) + implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) @@ -38,16 +40,13 @@ dependencies { implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(projects.opendcTelemetry.opendcTelemetryCompute) - implementation(libs.opentelemetry.semconv) - implementation(libs.kotlin.logging) implementation(libs.config) - implementation(libs.progressbar) - implementation(libs.clikt) + implementation(libs.kotlin.logging) + implementation(libs.jackson.databind) implementation(libs.jackson.module.kotlin) - implementation(libs.jackson.dataformat.csv) implementation(kotlin("reflect")) + implementation(libs.opentelemetry.semconv) - implementation(libs.parquet) testImplementation(libs.log4j.slf4j) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 6261ebbf..06db5569 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -24,16 +24,17 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkloadRunner +import org.opendc.compute.workload.export.parquet.ParquetExportMonitor +import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.trace.RawParquetTraceReader +import org.opendc.compute.workload.util.PerformanceInterferenceReader import org.opendc.experiments.capelin.env.ClusterEnvironmentReader -import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader -import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader -import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf @@ -43,7 +44,6 @@ import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File -import java.io.FileInputStream import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -100,7 +100,12 @@ abstract class Portfolio(name: String) : Experiment(name) { */ override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) + val environment = ClusterEnvironmentReader( + File( + config.getString("env-path"), + "${topology.name}.txt" + ) + ) val workload = workload val workloadNames = if (workload is CompositeWorkload) { @@ -117,7 +122,7 @@ abstract class Portfolio(name: String) : Experiment(name) { val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() - .read(FileInputStream(config.getString("interference-model"))) + .read(File(config.getString("interference-model"))) .let { VmInterferenceModel(it, Random(seeder.nextLong())) } else null @@ -128,7 +133,7 @@ abstract class Portfolio(name: String) : Experiment(name) { grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) else null - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt index babd8ada..8d9b24f4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin.env +import org.opendc.compute.workload.env.MachineDef import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt index a968b043..8d61c530 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin.env +import org.opendc.compute.workload.env.MachineDef import java.io.Closeable /** diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt deleted file mode 100644 index b0c0318f..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.env - -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.power.PowerModel -import java.util.* - -/** - * A definition of a machine in a cluster. - */ -public data class MachineDef( - val uid: UUID, - val name: String, - val meta: Map, - val model: MachineModel, - val powerModel: PowerModel -) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt deleted file mode 100644 index a4676f31..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("AvroUtils") -package org.opendc.experiments.capelin.export.parquet - -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema - -/** - * Schema for UUID type. - */ -internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) - -/** - * Schema for timestamp type. - */ -internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) - -/** - * Helper function to make a [Schema] field optional. - */ -internal fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt deleted file mode 100644 index e3d15c3b..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetFileWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread - -/** - * A writer that writes data in Parquet format. - */ -abstract class ParquetDataWriter( - path: File, - private val schema: Schema, - bufferSize: Int = 4096 -) : AutoCloseable { - /** - * The logging instance to use. - */ - private val logger = KotlinLogging.logger {} - - /** - * The queue of commands to process. - */ - private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) - - /** - * An exception to be propagated to the actual writer. - */ - private var exception: Throwable? = null - - /** - * The thread that is responsible for writing the Parquet records. - */ - private val writerThread = thread(start = false, name = this.toString()) { - val writer = let { - val builder = AvroParquetWriter.builder(LocalOutputFile(path)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - buildWriter(builder) - } - - val queue = queue - val buf = mutableListOf() - var shouldStop = false - - try { - while (!shouldStop) { - try { - process(writer, queue.take()) - } catch (e: InterruptedException) { - shouldStop = true - } - - if (queue.drainTo(buf) > 0) { - for (data in buf) { - process(writer, data) - } - buf.clear() - } - } - } catch (e: Throwable) { - logger.error(e) { "Failure in Parquet data writer" } - exception = e - } finally { - writer.close() - } - } - - /** - * Build the [ParquetWriter] used to write the Parquet files. - */ - protected open fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { - return builder.build() - } - - /** - * Convert the specified [data] into a Parquet record. - */ - protected abstract fun convert(builder: GenericRecordBuilder, data: T) - - /** - * Write the specified metrics to the database. - */ - fun write(data: T) { - val exception = exception - if (exception != null) { - throw IllegalStateException("Writer thread failed", exception) - } - - queue.put(data) - } - - /** - * Signal the writer to stop. - */ - override fun close() { - writerThread.interrupt() - writerThread.join() - } - - init { - writerThread.start() - } - - /** - * Process the specified [data] to be written to the Parquet file. - */ - private fun process(writer: ParquetWriter, data: T) { - val builder = GenericRecordBuilder(schema) - convert(builder, data) - writer.write(builder.build()) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt deleted file mode 100644 index b057e932..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData -import java.io.File - -/** - * A [ComputeMonitor] that logs the events to a Parquet file. - */ -class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { - private val serverWriter = ParquetServerDataWriter( - File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val hostWriter = ParquetHostDataWriter( - File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val serviceWriter = ParquetServiceDataWriter( - File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - override fun record(data: ServerData) { - serverWriter.write(data) - } - - override fun record(data: HostData) { - hostWriter.write(data) - } - - override fun record(data: ServiceData) { - serviceWriter.write(data) - } - - override fun close() { - hostWriter.close() - serviceWriter.close() - serverWriter.close() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt deleted file mode 100644 index 58388cb1..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.HostData -import java.io.File - -/** - * A Parquet event writer for [HostData]s. - */ -public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, SCHEMA, bufferSize) { - - override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { - return builder - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun convert(builder: GenericRecordBuilder, data: HostData) { - builder["timestamp"] = data.timestamp.toEpochMilli() - - builder["host_id"] = data.host.id - - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - val bootTime = data.bootTime - if (bootTime != null) { - builder["boot_time"] = bootTime.toEpochMilli() - } - - builder["cpu_count"] = data.host.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime - - builder["mem_limit"] = data.host.memCapacity - - builder["power_total"] = data.powerTotal - - builder["guests_terminated"] = data.guestsTerminated - builder["guests_running"] = data.guestsRunning - builder["guests_error"] = data.guestsError - builder["guests_invalid"] = data.guestsInvalid - } - - override fun toString(): String = "host-writer" - - companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("host") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .requiredDouble("power_total") - .requiredInt("guests_terminated") - .requiredInt("guests_running") - .requiredInt("guests_error") - .requiredInt("guests_invalid") - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt deleted file mode 100644 index 43b5f469..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.ServerData -import java.io.File -import java.util.* - -/** - * A Parquet event writer for [ServerData]s. - */ -public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, SCHEMA, bufferSize) { - - override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { - return builder - .withDictionaryEncoding("server_id", true) - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun convert(builder: GenericRecordBuilder, data: ServerData) { - builder["timestamp"] = data.timestamp.toEpochMilli() - - builder["server_id"] = data.server.id - builder["host_id"] = data.host?.id - - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - val bootTime = data.bootTime - if (bootTime != null) { - builder["boot_time"] = bootTime.toEpochMilli() - } - builder["scheduling_latency"] = data.schedulingLatency - - builder["cpu_count"] = data.server.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime - - builder["mem_limit"] = data.server.memCapacity - } - - override fun toString(): String = "server-writer" - - companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("server") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("server_id").type(UUID_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA.optional()).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredLong("scheduling_latency") - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt deleted file mode 100644 index 2928f445..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericRecordBuilder -import org.opendc.telemetry.compute.table.ServiceData -import java.io.File - -/** - * A Parquet event writer for [ServiceData]s. - */ -public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, SCHEMA, bufferSize) { - - override fun convert(builder: GenericRecordBuilder, data: ServiceData) { - builder["timestamp"] = data.timestamp.toEpochMilli() - builder["hosts_up"] = data.hostsUp - builder["hosts_down"] = data.hostsDown - builder["servers_pending"] = data.serversPending - builder["servers_active"] = data.serversActive - builder["attempts_success"] = data.attemptsSuccess - builder["attempts_failure"] = data.attemptsFailure - builder["attempts_error"] = data.attemptsError - } - - override fun toString(): String = "service-writer" - - companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("service") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .requiredInt("hosts_up") - .requiredInt("hosts_down") - .requiredInt("servers_pending") - .requiredInt("servers_active") - .requiredInt("attempts_success") - .requiredInt("attempts_failure") - .requiredInt("attempts_error") - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 0bf4ada6..498636ba 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,6 +22,9 @@ package org.opendc.experiments.capelin.trace +import org.opendc.compute.workload.trace.RawParquetTraceReader +import org.opendc.compute.workload.trace.TraceEntry +import org.opendc.compute.workload.trace.TraceReader import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.simulator.compute.workload.SimWorkload @@ -51,7 +54,10 @@ public class ParquetTraceReader( this.zip(listOf(workload)) } } - .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry::start) } + .flatMap { + sampleWorkload(it.first, workload, it.second, seed) + .sortedBy(TraceEntry::start) + } .iterator() override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt deleted file mode 100644 index 9549af42..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup -import java.io.InputStream - -/** - * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - */ -class PerformanceInterferenceReader { - /** - * The [ObjectMapper] to use. - */ - private val mapper = jacksonObjectMapper() - - init { - mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) - } - - /** - * Read the performance interface model from the input. - */ - fun read(input: InputStream): List { - return input.use { mapper.readValue(input) } - } - - private data class GroupMixin( - @JsonProperty("minServerLoad") - val targetLoad: Double, - @JsonProperty("performanceScore") - val score: Double, - @JsonProperty("vms") - val members: Set, - ) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt deleted file mode 100644 index ca937328..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.opendc.experiments.capelin.trace.bp.BPTraceFormat -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.trace.* -import java.io.File -import java.util.UUID - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param path The directory of the traces. - */ -class RawParquetTraceReader(private val path: File) { - /** - * The [Trace] that represents this trace. - */ - private val trace = BPTraceFormat().open(path.toURI().toURL()) - - /** - * Read the fragments into memory. - */ - private fun parseFragments(): Map> { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - - val fragments = mutableMapOf>() - - return try { - while (reader.nextRow()) { - val id = reader.get(RESOURCE_STATE_ID) - val time = reader.get(RESOURCE_STATE_TIMESTAMP) - val duration = reader.get(RESOURCE_STATE_DURATION) - val cores = reader.getInt(RESOURCE_STATE_NCPUS) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - - val fragment = SimTraceWorkload.Fragment( - time.toEpochMilli(), - duration.toMillis(), - cpuUsage, - cores - ) - - fragments.getOrPut(id) { mutableListOf() }.add(fragment) - } - - fragments - } finally { - reader.close() - } - } - - /** - * Read the metadata into a workload. - */ - private fun parseMeta(fragments: Map>): List> { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() - - var counter = 0 - val entries = mutableListOf>() - - return try { - while (reader.nextRow()) { - - val id = reader.get(RESOURCE_ID) - if (!fragments.containsKey(id)) { - continue - } - - val submissionTime = reader.get(RESOURCE_START_TIME) - val endTime = reader.get(RESOURCE_STOP_TIME) - val maxCores = reader.getInt(RESOURCE_NCPUS) - val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB - val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - - val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val workload = SimTraceWorkload(vmFragments) - entries.add( - TraceEntry( - uid, id, submissionTime.toEpochMilli(), workload, - mapOf( - "submit-time" to submissionTime.toEpochMilli(), - "end-time" to endTime.toEpochMilli(), - "total-load" to totalLoad, - "cores" to maxCores, - "required-memory" to requiredMemory.toLong(), - "workload" to workload - ) - ) - ) - } - - entries - } catch (e: Exception) { - e.printStackTrace() - throw e - } finally { - reader.close() - } - } - - /** - * The entries in the trace. - */ - private val entries: List> - - init { - val fragments = parseFragments() - entries = parseMeta(fragments) - } - - /** - * Read the entries in the trace. - */ - fun read(): List> = entries -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt deleted file mode 100644 index ed82217d..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.filter2.predicate.Statistics -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.trace.util.parquet.LocalInputFile -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -/** - * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. - * - * @param traceFile The directory of the traces. - * @param selectedVms The list of VMs to read from the trace. - */ -class StreamingParquetTraceReader(traceFile: File, selectedVms: List = emptyList()) : TraceReader { - private val logger = KotlinLogging.logger {} - - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * The intermediate buffer to store the read records in. - */ - private val queue = ArrayBlockingQueue>(1024) - - /** - * An optional filter for filtering the selected VMs - */ - private val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get( - FilterApi.userDefined( - FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) - ) - ) - ) - - /** - * A poisonous fragment. - */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0)) - - /** - * The thread to read the records in. - */ - private val readerThread = thread(start = true, name = "sc20-reader") { - val reader = AvroParquetReader - .builder(LocalInputFile(File(traceFile, "trace.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - try { - while (true) { - val record = reader.read() - - if (record == null) { - queue.put(poison) - break - } - - val id = record["id"].toString() - val time = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - - val fragment = SimTraceWorkload.Fragment( - time, - duration, - cpuUsage, - cores - ) - - queue.put(id to fragment) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - reader.close() - } - } - - /** - * Fill the buffers with the VMs - */ - private fun pull(buffers: Map>>) { - if (!hasNext) { - return - } - - val fragments = mutableListOf>() - queue.drainTo(fragments) - - for ((id, fragment) in fragments) { - if (id == poison.first) { - hasNext = false - return - } - buffers[id]?.forEach { it.add(fragment) } - } - } - - /** - * A flag to indicate whether the reader has more entries. - */ - private var hasNext: Boolean = true - - /** - * Initialize the reader. - */ - init { - val takenIds = mutableSetOf() - val entries = mutableMapOf() - val buffers = mutableMapOf>>() - - val metaReader = AvroParquetReader - .builder(LocalInputFile(File(traceFile, "meta.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - while (true) { - val record = metaReader.read() ?: break - val id = record["id"].toString() - entries[id] = record - } - - metaReader.close() - - val selection = selectedVms.ifEmpty { entries.keys } - - // Create the entry iterator - iterator = selection.asSequence() - .mapNotNull { entries[it] } - .mapIndexed { index, record -> - val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) - - assert(uid !in takenIds) - takenIds += uid - - logger.info { "Processing VM $id" } - - val internalBuffer = mutableListOf() - val externalBuffer = mutableListOf() - buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { - var time = submissionTime - repeat@ while (true) { - if (externalBuffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break - } - } - - internalBuffer.addAll(externalBuffer) - externalBuffer.clear() - - for (fragment in internalBuffer) { - yield(fragment) - - time += fragment.duration - if (time >= endTime) { - break@repeat - } - } - - internalBuffer.clear() - } - - buffers.remove(id) - } - val workload = SimTraceWorkload(fragments) - val meta = mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - - TraceEntry(uid, id, submissionTime, workload, meta) - } - .sortedBy { it.start } - .toList() - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() { - readerThread.interrupt() - } - - private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { - override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) - - override fun canDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() - } - - override fun inverseCanDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() - } - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt deleted file mode 100644 index 1f3878eb..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.arguments.argument -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.cooccurring -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.* -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat -import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA -import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA -import org.opendc.experiments.capelin.trace.sv.SvTraceFormat -import org.opendc.trace.* -import org.opendc.trace.bitbrains.BitbrainsTraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.File -import java.util.* -import kotlin.math.max -import kotlin.math.min -import kotlin.math.roundToLong - -/** - * A script to convert a trace in text format into a Parquet trace. - */ -fun main(args: Array): Unit = TraceConverterCli().main(args) - -/** - * Represents the command for converting traces - */ -class TraceConverterCli : CliktCommand(name = "trace-converter") { - /** - * The logger instance for the converter. - */ - private val logger = KotlinLogging.logger {} - - /** - * The directory where the trace should be stored. - */ - private val output by option("-O", "--output", help = "path to store the trace") - .file(canBeFile = false, mustExist = false) - .defaultLazy { File("output") } - - /** - * The directory where the input trace is located. - */ - private val input by argument("input", help = "path to the input trace") - .file(canBeFile = false) - - /** - * The input format of the trace. - */ - private val format by option("-f", "--format", help = "input format of trace") - .choice( - "solvinity" to SvTraceFormat(), - "bitbrains" to BitbrainsTraceFormat(), - "azure" to AzureTraceFormat() - ) - .required() - - /** - * The sampling options. - */ - private val samplingOptions by SamplingOptions().cooccurring() - - override fun run() { - val metaParquet = File(output, "meta.parquet") - val traceParquet = File(output, "trace.parquet") - - if (metaParquet.exists()) { - metaParquet.delete() - } - if (traceParquet.exists()) { - traceParquet.delete() - } - - val trace = format.open(input.toURI().toURL()) - - logger.info { "Building resources table" } - - val metaWriter = AvroParquetWriter.builder(LocalOutputFile(metaParquet)) - .withSchema(BP_RESOURCES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .enablePageWriteChecksum() - .build() - - val selectedVms = metaWriter.use { convertResources(trace, it) } - - logger.info { "Wrote ${selectedVms.size} rows" } - logger.info { "Building resource states table" } - - val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) - .withSchema(BP_RESOURCE_STATES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .enableDictionaryEncoding() - .enablePageWriteChecksum() - .withBloomFilterEnabled("id", true) - .withBloomFilterNDV("id", selectedVms.size.toLong()) - .build() - - val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } - logger.info { "Wrote $statesCount rows" } - } - - /** - * Convert the resources table for the trace. - */ - private fun convertResources(trace: Trace, writer: ParquetWriter): Set { - val random = samplingOptions?.let { Random(it.seed) } - val samplingFraction = samplingOptions?.fraction ?: 1.0 - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - - var hasNextRow = reader.nextRow() - val selectedVms = mutableSetOf() - - while (hasNextRow) { - var id: String - var numCpus = Int.MIN_VALUE - var memCapacity = Double.MIN_VALUE - var memUsage = Double.MIN_VALUE - var startTime = Long.MAX_VALUE - var stopTime = Long.MIN_VALUE - - do { - id = reader.get(RESOURCE_STATE_ID) - - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() - startTime = min(startTime, timestamp) - stopTime = max(stopTime, timestamp) - - numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS)) - - memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) - if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { - memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) - } - - hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) - - // Sample only a fraction of the VMs - if (random != null && random.nextDouble() > samplingFraction) { - continue - } - - val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA) - - builder["id"] = id - builder["submissionTime"] = startTime - builder["endTime"] = stopTime - builder["maxCores"] = numCpus - builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong() - - logger.info { "Selecting VM $id" } - - writer.write(builder.build()) - selectedVms.add(id) - } - - return selectedVms - } - - /** - * Convert the resource states table for the trace. - */ - private fun convertResourceStates(trace: Trace, writer: ParquetWriter, selectedVms: Set): Int { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - - var hasNextRow = reader.nextRow() - var count = 0 - - while (hasNextRow) { - var lastTimestamp = Long.MIN_VALUE - - do { - val id = reader.get(RESOURCE_STATE_ID) - - if (id !in selectedVms) { - hasNextRow = reader.nextRow() - continue - } - - val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA) - builder["id"] = id - - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L - } - - val duration = timestamp - lastTimestamp - val cores = reader.getInt(RESOURCE_STATE_NCPUS) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val flops = (cpuUsage * duration / 1000.0).roundToLong() - - builder["time"] = timestamp - builder["duration"] = duration - builder["cores"] = cores - builder["cpuUsage"] = cpuUsage - builder["flops"] = flops - - writer.write(builder.build()) - - lastTimestamp = timestamp - hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) - - count++ - } - - return count - } - - /** - * Options for sampling the workload trace. - */ - private class SamplingOptions : OptionGroup() { - /** - * The fraction of VMs to sample - */ - val fraction by option("--sampling-fraction", help = "fraction of the workload to sample") - .double() - .restrictTo(0.0001, 1.0) - .required() - - /** - * The seed for sampling the trace. - */ - val seed by option("--sampling-seed", help = "seed for sampling the workload") - .long() - .default(0) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt deleted file mode 100644 index 303a6a8c..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import java.util.UUID - -/** - * An entry in a workload trace. - * - * @param uid The unique identifier of the entry. - * @param name The name of the entry. - * @param start The start time of the workload. - * @param workload The workload of the entry. - * @param meta The meta-data associated with the workload. - */ -public data class TraceEntry( - val uid: UUID, - val name: String, - val start: Long, - val workload: T, - val meta: Map -) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt deleted file mode 100644 index 08304edc..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -/** - * An interface for reading workloads into memory. - * - * This interface must guarantee that the entries are delivered in order of submission time. - * - * @param T The shape of the workloads supported by this reader. - */ -public interface TraceReader : Iterator>, AutoCloseable diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt deleted file mode 100644 index b55bd577..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import java.io.InputStream - -/** - * A parser for the JSON VM placement data files used for the TPDS article on Capelin. - */ -class VmPlacementReader { - /** - * The [ObjectMapper] to parse the placement. - */ - private val mapper = jacksonObjectMapper() - - /** - * Read the VM placements from the input. - */ - fun read(input: InputStream): Map { - return mapper.readValue>(input) - .mapKeys { "vm__workload__${it.key}.txt" } - .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index cb32ce88..b42951df 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -23,6 +23,7 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging +import org.opendc.compute.workload.trace.TraceEntry import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt deleted file mode 100644 index f98f4b2c..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT - ) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - } - - this.delegate = delegate - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun get(column: TableColumn): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "AzureCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } - - override fun toString(): String = "AzureResourceStateTable" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt deleted file mode 100644 index f80c0e82..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.* -import java.time.Instant - -/** - * A [TableReader] for the Azure v1 VM resource state table. - */ -internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader { - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue) - "vm id" -> id = parser.text - "avg cpu" -> cpuUsagePct = parser.doubleValue - } - } - - return true - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - else -> false - } - } - - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - RESOURCE_STATE_ID -> id - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(column: TableColumn): Int { - throw IllegalArgumentException("Invalid column") - } - - override fun getLong(column: TableColumn): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn): Double { - return when (column) { - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - parser.close() - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var timestamp: Instant? = null - private var cpuUsagePct = Double.NaN - - /** - * Reset the state. - */ - private fun reset() { - id = null - timestamp = null - cpuUsagePct = Double.NaN - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = CsvSchema.builder() - .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) - .addColumn("vm id", CsvSchema.ColumnType.STRING) - .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt deleted file mode 100644 index c9d4f7eb..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * The resource [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_NCPUS, - RESOURCE_MEM_CAPACITY - ) - - override fun newReader(): TableReader { - return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("No partition $partition") - } - - override fun toString(): String = "AzureResourceTable" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt deleted file mode 100644 index b712b854..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.apache.parquet.example.Paper.schema -import org.opendc.trace.* -import java.time.Instant - -/** - * A [TableReader] for the Azure v1 VM resources table. - */ -internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader { - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "vm id" -> id = parser.text - "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue) - "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue) - "vm virtual core count" -> cpuCores = parser.intValue - "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB - } - } - - return true - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } - } - - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - RESOURCE_ID -> id - RESOURCE_START_TIME -> startTime - RESOURCE_STOP_TIME -> stopTime - RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(column: TableColumn): Int { - return when (column) { - RESOURCE_NCPUS -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(column: TableColumn): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn): Double { - return when (column) { - RESOURCE_MEM_CAPACITY -> memCapacity - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - parser.close() - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var startTime: Instant? = null - private var stopTime: Instant? = null - private var cpuCores = -1 - private var memCapacity = Double.NaN - - /** - * Reset the state. - */ - fun reset() { - id = null - startTime = null - stopTime = null - cpuCores = -1 - memCapacity = Double.NaN - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = CsvSchema.builder() - .addColumn("vm id", CsvSchema.ColumnType.NUMBER) - .addColumn("subscription id", CsvSchema.ColumnType.STRING) - .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) - .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("vm category", CsvSchema.ColumnType.NUMBER) - .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) - .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt deleted file mode 100644 index 24c60bab..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Azure v1 VM traces. - */ -class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = name in tables - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> AzureResourceTable(factory, path) - TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "AzureTrace[$path]" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt deleted file mode 100644 index 744e43a0..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import com.fasterxml.jackson.dataformat.csv.CsvParser -import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists - -/** - * A format implementation for the Azure v1 format. - */ -class AzureTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "azure-v1" - - /** - * The [CsvFactory] used to create the parser. - */ - private val factory = CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) - - /** - * Open the trace file. - */ - override fun open(url: URL): AzureTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return AzureTrace(factory, path) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt deleted file mode 100644 index f051bf88..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource state [Table] in the Bitbrains Parquet format. - */ -internal class BPResourceStateTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCE_STATES - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_STATE_NCPUS, - RESOURCE_STATE_CPU_USAGE, - ) - - override fun newReader(): TableReader { - val reader = LocalParquetReader(path.resolve("trace.parquet")) - return BPResourceStateTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt deleted file mode 100644 index 0e7ee555..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant - -/** - * A [TableReader] implementation for the Bitbrains Parquet format. - */ -internal class BPResourceStateTableReader(private val reader: LocalParquetReader) : TableReader { - /** - * The current record. - */ - private var record: GenericRecord? = null - - override fun nextRow(): Boolean { - record = reader.read() - return record != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_USAGE -> true - else -> false - } - } - - override fun get(column: TableColumn): T { - val record = checkNotNull(record) { "Reader in invalid state" } - - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_STATE_ID -> record["id"].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long) - RESOURCE_STATE_NCPUS -> record["cores"] - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(column: TableColumn): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - RESOURCE_STATE_NCPUS -> record["cores"] as Int - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(column: TableColumn): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn): Double { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - reader.close() - } - - override fun toString(): String = "BPResourceStateTableReader" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt deleted file mode 100644 index 5b0f013f..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource [Table] in the Bitbrains Parquet format. - */ -internal class BPResourceTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_NCPUS, - RESOURCE_MEM_CAPACITY - ) - - override fun newReader(): TableReader { - val reader = LocalParquetReader(path.resolve("meta.parquet")) - return BPResourceTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt deleted file mode 100644 index 4416aae8..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Instant - -/** - * A [TableReader] implementation for the Bitbrains Parquet format. - */ -internal class BPResourceTableReader(private val reader: LocalParquetReader) : TableReader { - /** - * The current record. - */ - private var record: GenericRecord? = null - - override fun nextRow(): Boolean { - record = reader.read() - return record != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } - } - - override fun get(column: TableColumn): T { - val record = checkNotNull(record) { "Reader in invalid state" } - - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record["id"].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(column: TableColumn): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - RESOURCE_NCPUS -> record["maxCores"] as Int - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(column: TableColumn): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn): Double { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - reader.close() - } - - override fun toString(): String = "BPResourceTableReader" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt deleted file mode 100644 index 486587b1..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.opendc.trace.TABLE_RESOURCES -import org.opendc.trace.TABLE_RESOURCE_STATES -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * A [Trace] in the Bitbrains Parquet format. - */ -public class BPTrace internal constructor(private val path: Path) : Trace { - override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = - name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> BPResourceTable(path) - TABLE_RESOURCE_STATES -> BPResourceStateTable(path) - else -> null - } - } - - override fun toString(): String = "BPTrace[$path]" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt deleted file mode 100644 index 49d5b4c5..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists - -/** - * A format implementation for the GWF trace format. - */ -public class BPTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "bitbrains-parquet" - - /** - * Open a Bitbrains Parquet trace. - */ - override fun open(url: URL): BPTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BPTrace(path) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt deleted file mode 100644 index 7dd8161d..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder - -/** - * Schema for the resources table in the trace. - */ -val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder - .record("meta") - .namespace("org.opendc.trace.capelin") - .fields() - .requiredString("id") - .requiredLong("submissionTime") - .requiredLong("endTime") - .requiredInt("maxCores") - .requiredLong("requiredMemory") - .endRecord() - -/** - * Schema for the resource states table in the trace. - */ -val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder - .record("meta") - .namespace("org.opendc.trace.capelin") - .fields() - .requiredString("id") - .requiredLong("time") - .requiredLong("duration") - .requiredInt("cores") - .requiredDouble("cpuUsage") - .requiredLong("flops") - .endRecord() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt deleted file mode 100644 index 67140fe9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.sv - -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.bufferedReader -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the extended Bitbrains format. - */ -internal class SvResourceStateTable(path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "txt" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_NCPUS, - RESOURCE_STATE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT, - RESOURCE_STATE_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - ) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - } - - this.delegate = delegate - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun get(column: TableColumn): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - val reader = path.bufferedReader() - return SvResourceStateTableReader(reader) - } else { - null - } - } - - override fun toString(): String = "SvCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - val reader = path.bufferedReader() - return SvResourceStateTableReader(reader) - } - - override fun toString(): String = "SvResourceStateTable" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt deleted file mode 100644 index 6ea403fe..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.sv - -import org.opendc.trace.* -import java.io.BufferedReader -import java.time.Instant - -/** - * A [TableReader] for the Bitbrains resource state table. - */ -internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader { - override fun nextRow(): Boolean { - reset() - - var line: String - var num = 0 - - while (true) { - line = reader.readLine() ?: return false - num++ - - if (line[0] == '#' || line.isBlank()) { - // Ignore empty lines or comments - continue - } - - break - } - - line = line.trim() - - val length = line.length - var col = 0 - var start: Int - var end = 0 - - while (end < length) { - // Trim all whitespace before the field - start = end - while (start < length && line[start].isWhitespace()) { - start++ - } - - end = line.indexOf(' ', start) - - if (end < 0) { - end = length - } - - val field = line.subSequence(start, end) as String - when (col++) { - COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10)) - COL_CPU_USAGE -> cpuUsage = field.toDouble() - COL_CPU_DEMAND -> cpuDemand = field.toDouble() - COL_DISK_READ -> diskRead = field.toDouble() - COL_DISK_WRITE -> diskWrite = field.toDouble() - COL_CLUSTER_ID -> cluster = field.trim() - COL_NCPUS -> cpuCores = field.toInt(10) - COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble() - COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 - COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() - COL_ID -> id = field.trim() - COL_MEM_CAPACITY -> memCapacity = field.toDouble() - } - } - - return true - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_CLUSTER_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_CPU_DEMAND -> true - RESOURCE_STATE_CPU_READY_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - else -> false - } - } - - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - RESOURCE_STATE_ID -> id - RESOURCE_STATE_CLUSTER_ID -> cluster - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) - RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) - RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) - RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) - RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) - RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) - RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn): Boolean { - return when (column) { - RESOURCE_STATE_POWERED_ON -> poweredOn - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getInt(column: TableColumn): Int { - return when (column) { - RESOURCE_STATE_NCPUS -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(column: TableColumn): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn): Double { - return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity - RESOURCE_STATE_CPU_USAGE -> cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity - RESOURCE_STATE_CPU_DEMAND -> cpuDemand - RESOURCE_STATE_MEM_CAPACITY -> memCapacity - RESOURCE_STATE_DISK_READ -> diskRead - RESOURCE_STATE_DISK_WRITE -> diskWrite - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - reader.close() - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var cluster: String? = null - private var timestamp: Instant? = null - private var cpuCores = -1 - private var cpuCapacity = Double.NaN - private var cpuUsage = Double.NaN - private var cpuDemand = Double.NaN - private var cpuReadyPct = Double.NaN - private var memCapacity = Double.NaN - private var diskRead = Double.NaN - private var diskWrite = Double.NaN - private var poweredOn: Boolean = false - - /** - * Reset the state of the reader. - */ - private fun reset() { - id = null - timestamp = null - cluster = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuDemand = Double.NaN - cpuReadyPct = Double.NaN - memCapacity = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - poweredOn = false - } - - /** - * Default column indices for the extended Bitbrains format. - */ - private val COL_TIMESTAMP = 0 - private val COL_CPU_USAGE = 1 - private val COL_CPU_DEMAND = 2 - private val COL_DISK_READ = 4 - private val COL_DISK_WRITE = 6 - private val COL_CLUSTER_ID = 10 - private val COL_NCPUS = 12 - private val COL_CPU_READY_PCT = 13 - private val COL_POWERED_ON = 14 - private val COL_CPU_CAPACITY = 18 - private val COL_ID = 19 - private val COL_MEM_CAPACITY = 20 -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt deleted file mode 100644 index dbd63de5..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.sv - -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the extended Bitbrains format. - */ -public class SvTrace internal constructor(private val path: Path) : Trace { - override val tables: List = listOf(TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return SvResourceStateTable(path) - } - - override fun toString(): String = "SvTrace[$path]" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt deleted file mode 100644 index 0cce8559..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.sv - -import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists - -/** - * A format implementation for the extended Bitbrains trace format. - */ -public class SvTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "sv" - - /** - * Open the trace file. - */ - override fun open(url: URL): SvTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return SvTrace(path) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt deleted file mode 100644 index 065a8c93..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.util - -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.simulator.SimHost -import org.opendc.experiments.capelin.env.MachineDef -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.SimHypervisorProvider -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.* -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.math.max - -/** - * Helper class to manage a [ComputeService] simulation. - */ -class ComputeServiceSimulator( - private val context: CoroutineContext, - private val clock: Clock, - scheduler: ComputeScheduler, - machines: List, - private val failureModel: FailureModel? = null, - interferenceModel: VmInterferenceModel? = null, - hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() -) : AutoCloseable { - /** - * The [ComputeService] that has been configured by the manager. - */ - val service: ComputeService - - /** - * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. - */ - val producers: List - get() = _metricProducers - private val _metricProducers = mutableListOf() - - /** - * The [SimResourceInterpreter] to simulate the hosts. - */ - private val interpreter = SimResourceInterpreter(context, clock) - - /** - * The hosts that belong to this class. - */ - private val hosts = mutableSetOf() - - init { - val (service, serviceMeterProvider) = createService(scheduler) - this._metricProducers.add(serviceMeterProvider) - this.service = service - - for (def in machines) { - val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) - this._metricProducers.add(hostMeterProvider) - hosts.add(host) - this.service.addHost(host) - } - } - - /** - * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. - */ - suspend fun run(reader: TraceReader) { - val injector = failureModel?.createInjector(context, clock, service) - val client = service.newClient() - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - // Start the fault injector - injector?.start() - - var offset = Long.MIN_VALUE - - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) - - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) - - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta + mapOf("workload" to workload) - ) - - // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) - - // Delete the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - - yield() - } finally { - injector?.close() - reader.close() - client.close() - } - } - - override fun close() { - service.close() - - for (host in hosts) { - host.close() - } - - hosts.clear() - } - - /** - * Construct a [ComputeService] instance. - */ - private fun createService(scheduler: ComputeScheduler): Pair { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") - .build() - - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .build() - - val service = ComputeService(context, clock, meterProvider, scheduler) - return service to meterProvider - } - - /** - * Construct a [SimHost] instance for the specified [MachineDef]. - */ - private fun createHost( - def: MachineDef, - hypervisorProvider: SimHypervisorProvider, - interferenceModel: VmInterferenceModel? = null - ): Pair { - val resource = Resource.builder() - .put(HOST_ID, def.uid.toString()) - .put(HOST_NAME, def.name) - .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(HOST_NCPUS, def.model.cpus.size) - .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) - .build() - - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .build() - - val host = SimHost( - def.uid, - def.name, - def.model, - def.meta, - context, - interpreter, - meterProvider, - hypervisorProvider, - powerDriver = SimplePowerDriver(def.powerModel), - interferenceDomain = interferenceModel?.newDomain() - ) - - return host to meterProvider - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt deleted file mode 100644 index 83393896..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.util - -import org.opendc.compute.service.ComputeService -import org.opendc.compute.simulator.failure.HostFaultInjector -import java.time.Clock -import kotlin.coroutines.CoroutineContext - -/** - * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. - */ -interface FailureModel { - /** - * Construct a [HostFaultInjector] for the specified [service]. - */ - fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt deleted file mode 100644 index 89b4a31c..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("FailureModels") -package org.opendc.experiments.capelin - -import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.commons.math3.random.Well19937c -import org.opendc.compute.service.ComputeService -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.failure.HostFaultInjector -import org.opendc.compute.simulator.failure.StartStopHostFault -import org.opendc.compute.simulator.failure.StochasticVictimSelector -import org.opendc.experiments.capelin.util.FailureModel -import java.time.Clock -import java.time.Duration -import kotlin.coroutines.CoroutineContext -import kotlin.math.ln -import kotlin.random.Random - -/** - * Obtain a [FailureModel] based on the GRID'5000 failure trace. - * - * This fault injector uses parameters from the GRID'5000 failure trace as described in - * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. - */ -fun grid5000(failureInterval: Duration, seed: Int): FailureModel { - return object : FailureModel { - override fun createInjector( - context: CoroutineContext, - clock: Clock, - service: ComputeService - ): HostFaultInjector { - val rng = Well19937c(seed) - val hosts = service.hosts.map { it as SimHost }.toSet() - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) - } - - override fun toString(): String = "Grid5000FailureModel" - } -} - -/** - * Obtain the [HostFaultInjector] to use for the experiments. - * - * This fault injector uses parameters from the GRID'5000 failure trace as described in - * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. - */ -fun createFaultInjector( - context: CoroutineContext, - clock: Clock, - hosts: Set, - seed: Int, - failureInterval: Double -): HostFaultInjector { - val rng = Well19937c(seed) - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt new file mode 100644 index 00000000..67de2777 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import java.io.InputStream + +/** + * A parser for the JSON VM placement data files used for the TPDS article on Capelin. + */ +class VmPlacementReader { + /** + * The [ObjectMapper] to parse the placement. + */ + private val mapper = jacksonObjectMapper() + + /** + * Read the VM placements from the input. + */ + fun read(input: InputStream): Map { + return mapper.readValue>(input) + .mapKeys { "vm__workload__${it.key}.txt" } + .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml index d1c01b8e..d46b50c3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml +++ b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml @@ -36,7 +36,7 @@ - + diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 727530e3..b0f86346 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -31,14 +31,15 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.workload.ComputeWorkloadRunner +import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.trace.RawParquetTraceReader +import org.opendc.compute.workload.trace.TraceReader +import org.opendc.compute.workload.util.PerformanceInterferenceReader import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader -import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader -import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation @@ -85,7 +86,7 @@ class CapelinIntegrationTest { val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, @@ -134,7 +135,7 @@ class CapelinIntegrationTest { val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, @@ -184,7 +185,7 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, @@ -229,7 +230,7 @@ class CapelinIntegrationTest { val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt deleted file mode 100644 index fbc39b87..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll - -/** - * Test suite for the [PerformanceInterferenceReader] class. - */ -class PerformanceInterferenceReaderTest { - @Test - fun testSmoke() { - val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) - val result = PerformanceInterferenceReader().read(input) - - assertAll( - { assertEquals(2, result.size) }, - { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, - { assertEquals(0.0, result[0].targetLoad, 0.001) }, - { assertEquals(0.8830158730158756, result[0].score, 0.001) } - ) - } -} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt new file mode 100644 index 00000000..086b900b --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("AvroUtils") +package org.opendc.trace.util.parquet + +import org.apache.avro.LogicalTypes +import org.apache.avro.Schema + +/** + * Schema for UUID type. + */ +public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) + +/** + * Schema for timestamp type. + */ +public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + +/** + * Helper function to make a [Schema] field optional. + */ +public fun Schema.optional(): Schema { + return Schema.createUnion(Schema.create(Schema.Type.NULL), this) +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 483558e1..497a7281 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -28,14 +28,14 @@ import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import kotlinx.coroutines.* import mu.KotlinLogging -import org.opendc.experiments.capelin.* +import org.opendc.compute.workload.ComputeWorkloadRunner +import org.opendc.compute.workload.env.MachineDef +import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.trace.RawParquetTraceReader +import org.opendc.compute.workload.util.PerformanceInterferenceReader import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.env.MachineDef import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader -import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader -import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel @@ -198,7 +198,7 @@ class RunnerCli : CliktCommand(name = "runner") { else null - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, diff --git a/settings.gradle.kts b/settings.gradle.kts index 933febe0..7bfc8fc6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -25,6 +25,7 @@ include(":opendc-platform") include(":opendc-compute:opendc-compute-api") include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") +include(":opendc-compute:opendc-compute-workload") include(":opendc-workflow:opendc-workflow-api") include(":opendc-workflow:opendc-workflow-service") include(":opendc-faas:opendc-faas-api") -- cgit v1.2.3