diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-21 11:34:34 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-21 11:34:34 +0200 |
| commit | 68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f (patch) | |
| tree | 73201888564accde4cfa107f4ffdb15e9f93d45c | |
| parent | c7fff03408ee3109d0a39a96c043584a2d8f67ca (diff) | |
feat(trace): Add support for writing traces
This change adds a new API for writing traces in a trace format.
Currently, writing is only supported by the OpenDC VM format, but over
time the other formats will also have support for writing added.
26 files changed, 596 insertions, 77 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index e82cf203..28a5e1da 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcTrace.opendcTraceOpendc) + implementation(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt index 3b7c3f0f..c94f30e4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt @@ -21,7 +21,7 @@ */ @file:JvmName("ComputeSchedulers") -package org.opendc.experiments.capelin.util +package org.opendc.compute.workload import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.scheduler.FilterScheduler @@ -38,10 +38,10 @@ import java.util.* /** * Create a [ComputeScheduler] for the experiment. */ -fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { +public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler { val cpuAllocationRatio = 16.0 val ramAllocationRatio = 1.5 - return when (allocationPolicy) { + return when (name) { "mem" -> FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), weighers = listOf(RamWeigher(multiplier = 1.0)) @@ -80,7 +80,7 @@ fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacement subsetSize = Int.MAX_VALUE, random = Random(seeder.nextLong()) ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + "replay" -> ReplayScheduler(placements) + else -> throw IllegalArgumentException("Unknown policy $name") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 6dba41e6..7c579e39 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -145,15 +145,15 @@ public class ComputeWorkloadLoader(private val baseDir: File) { } /** - * Load the trace with the specified [name]. + * Load the trace with the specified [name] and [format]. */ - public fun get(name: String): List<VirtualMachine> { + public fun get(name: String, format: String): List<VirtualMachine> { return cache.computeIfAbsent(name) { val path = baseDir.resolve(it) logger.info { "Loading trace $it at $path" } - val trace = Trace.open(path, format = "opendc-vm") + val trace = Trace.open(path, format) val fragments = parseFragments(trace) parseMeta(trace, fragments) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt index f58ce587..2f4935ca 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -31,7 +31,7 @@ import org.opendc.compute.workload.internal.TraceComputeWorkload /** * Construct a workload from a trace. */ -public fun trace(name: String): ComputeWorkload = TraceComputeWorkload(name) +public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format) /** * Construct a composite workload with the specified fractions. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index d657ff01..c20cb8f3 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -30,8 +30,8 @@ import java.util.* /** * A [ComputeWorkload] from a trace. */ -internal class TraceComputeWorkload(val name: String) : ComputeWorkload { +internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { - return loader.get(name) + return loader.get(name, format) } } diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 4bcbaf61..23a3e4a7 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -33,8 +33,6 @@ dependencies { api(projects.opendcHarness.opendcHarnessApi) api(projects.opendcCompute.opendcComputeWorkload) - implementation(projects.opendcTrace.opendcTraceParquet) - implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcCompute.opendcComputeSimulator) @@ -49,5 +47,7 @@ dependencies { implementation(kotlin("reflect")) implementation(libs.opentelemetry.semconv) + runtimeOnly(projects.opendcTrace.opendcTraceOpendc) + testImplementation(libs.log4j.slf4j) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 630b76c4..2201a6b4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -26,6 +26,7 @@ import com.typesafe.config.ConfigFactory import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.ComputeWorkloadRunner +import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetExportMonitor import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply @@ -34,7 +35,6 @@ import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 031ee269..b0181cbc 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -45,4 +45,11 @@ public interface Table { * Open a [TableReader] for this table. */ public fun newReader(): TableReader + + /** + * Open a [TableWriter] for this table. + * + * @throws UnsupportedOperationException if writing is not supported by the table. + */ + public fun newWriter(): TableWriter } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt new file mode 100644 index 00000000..423ce86a --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * Base class for writing workload traces. + */ +public interface TableWriter : AutoCloseable { + /** + * Start a new row in the table. + */ + public fun startRow() + + /** + * Flush the current row to the table. + */ + public fun endRow() + + /** + * Resolve the index of the specified [column] for this writer. + * + * @param column The column to lookup. + * @return The zero-based index of the column or a negative value if the column is not present in this table. + */ + public fun resolve(column: TableColumn<*>): Int + + /** + * Determine whether the [TableReader] supports the specified [column]. + */ + public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + + /** + * Set [column] to [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun set(index: Int, value: Any) + + /** + * Set [column] to boolean [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(index: Int, value: Boolean) + + /** + * Set [column] to integer [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(index: Int, value: Int) + + /** + * Set [column] to long [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(index: Int, value: Long) + + /** + * Set [column] to double [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(index: Int, value: Double) + + /** + * Set [column] to [value]. + * + * @param column The column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun <T : Any> set(column: TableColumn<T>, value: T): Unit = set(resolve(column), value) + + /** + * Set [column] to boolean [value]. + * + * @param column The column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(column: TableColumn<Boolean>, value: Boolean): Unit = setBoolean(resolve(column), value) + + /** + * Set [column] to integer [value]. + * + * @param column The column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(column: TableColumn<Int>, value: Int): Unit = setInt(resolve(column), value) + + /** + * Set [column] to long [value]. + * + * @param column The column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(column: TableColumn<Long>, value: Long): Unit = setLong(resolve(column), value) + + /** + * Set [column] to double [value]. + * + * @param column The column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(column: TableColumn<Double>, value: Double): Unit = setDouble(resolve(column), value) + + /** + * Flush any buffered content to the underlying target. + */ + public fun flush() + + /** + * Close the writer so that no more rows can be written. + */ + public override fun close() +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 6d0014cb..64e8f272 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -51,21 +51,45 @@ public interface Trace { * Open a [Trace] at the specified [path] in the given [format]. * * @param path The path to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ - public fun open(path: File, format: String): Trace { - return open(path.toPath(), format) - } + @JvmStatic + public fun open(path: File, format: String): Trace = open(path.toPath(), format) /** * Open a [Trace] at the specified [path] in the given [format]. * * @param path The [Path] to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ + @JvmStatic public fun open(path: Path, format: String): Trace { val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } return TraceImpl(provider, path) } + + /** + * Create a [Trace] at the specified [path] in the given [format]. + * + * @param path The [Path] to the trace. + * @param format The format of the trace to create. + */ + @JvmStatic + public fun create(path: File, format: String): Trace = create(path.toPath(), format) + + /** + * Create a [Trace] at the specified [path] in the given [format]. + * + * @param path The [Path] to the trace. + * @param format The format of the trace to create. + */ + @JvmStatic + public fun create(path: Path, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + provider.create(path) + return TraceImpl(provider, path) + } } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index fd0a0f04..24551edb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -25,6 +25,7 @@ package org.opendc.trace.internal import org.opendc.trace.Table import org.opendc.trace.TableColumn import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter import java.util.* /** @@ -44,6 +45,8 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name) + override fun toString(): String = "Table[name=$name]" override fun hashCode(): Int = Objects.hash(trace, name) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index e04dd948..f2e610db 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -23,6 +23,7 @@ package org.opendc.trace.spi import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter import java.nio.file.Path import java.util.* @@ -36,6 +37,15 @@ public interface TraceFormat { public val name: String /** + * Construct an empty trace at [path]. + * + * @param path The path where to create the empty trace. + * @throws IllegalArgumentException If [path] is invalid. + * @throws UnsupportedOperationException If the table does not support trace creation. + */ + public fun create(path: Path) + + /** * Return the name of the tables available in the trace at the specified [path]. * * @param path The path to the trace. @@ -64,6 +74,17 @@ public interface TraceFormat { public fun newReader(path: Path, table: String): TableReader /** + * Open a [TableWriter] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableWriter] for. + * @throws IllegalArgumentException If [table] does not exist. + * @throws UnsupportedOperationException If the format does not support writing. + * @return A [TableWriter] instance for the table. + */ + public fun newWriter(path: Path, table: String): TableWriter + + /** * A helper object for resolving providers. */ public companion object { diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 77af0d81..253c7057 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -50,6 +50,10 @@ public class AzureTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) override fun getDetails(path: Path, table: String): TableDetails { @@ -83,6 +87,10 @@ public class AzureTraceFormat : TraceFormat { } } + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + /** * Construct a [TableReader] for reading over all VM CPU readings. */ diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 080b73de..20222c8a 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -42,6 +42,10 @@ public class BitbrainsExTraceFormat : TraceFormat { */ override val name: String = "bitbrains-ex" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES) override fun getDetails(path: Path, table: String): TableDetails { @@ -74,6 +78,10 @@ public class BitbrainsExTraceFormat : TraceFormat { } } + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + /** * Construct a [TableReader] for reading over all resource state partitions. */ diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index 1573726f..3885c931 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -50,6 +50,10 @@ public class BitbrainsTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) override fun getDetails(path: Path, table: String): TableDetails { @@ -90,6 +94,10 @@ public class BitbrainsTraceFormat : TraceFormat { } } + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + /** * Construct a [TableReader] for reading over all resource state partitions. */ diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 0f7b9d6e..d4287420 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -45,6 +45,10 @@ public class GwfTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -71,4 +75,8 @@ public class GwfTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt new file mode 100644 index 00000000..15a8cb85 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.* +import java.time.Duration +import java.time.Instant + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceStateTableWriter( + private val writer: ParquetWriter<GenericRecord>, + private val schema: Schema +) : TableWriter { + /** + * The current builder for the record that is being written. + */ + private var builder: GenericRecordBuilder? = null + + /** + * The fields belonging to the resource state schema. + */ + private val fields = schema.fields + + override fun startRow() { + builder = GenericRecordBuilder(schema) + } + + override fun endRow() { + val builder = checkNotNull(builder) { "No active row" } + this.builder = null + + val record = builder.build() + val id = record[COL_ID] as String + val timestamp = record[COL_TIMESTAMP] as Long + + check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + + writer.write(builder.build()) + + lastId = id + lastTimestamp = timestamp + } + + override fun resolve(column: TableColumn<*>): Int { + val schema = schema + return when (column) { + RESOURCE_ID -> schema.getField("id").pos() + RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos() + RESOURCE_STATE_DURATION -> schema.getField("duration").pos() + RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + val builder = checkNotNull(builder) { "No active row" } + + builder.set( + fields[index], + when (index) { + COL_TIMESTAMP -> (value as Instant).toEpochMilli() + COL_DURATION -> (value as Duration).toMillis() + else -> value + } + ) + } + + override fun setBoolean(index: Int, value: Boolean) = set(index, value) + + override fun setInt(index: Int, value: Int) = set(index, value) + + override fun setLong(index: Int, value: Long) = set(index, value) + + override fun setDouble(index: Int, value: Double) = set(index, value) + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Last column values that are used to check for correct partitioning. + */ + private var lastId: String? = null + private var lastTimestamp: Long = Long.MIN_VALUE + + /** + * Columns with special behavior. + */ + private val COL_ID = resolve(RESOURCE_ID) + private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP) + private val COL_DURATION = resolve(RESOURCE_STATE_DURATION) +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt new file mode 100644 index 00000000..9cc6ca7d --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.* +import java.time.Instant +import kotlin.math.roundToLong + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceTableWriter( + private val writer: ParquetWriter<GenericRecord>, + private val schema: Schema +) : TableWriter { + /** + * The current builder for the record that is being written. + */ + private var builder: GenericRecordBuilder? = null + + /** + * The fields belonging to the resource schema. + */ + private val fields = schema.fields + + override fun startRow() { + builder = GenericRecordBuilder(schema) + } + + override fun endRow() { + val builder = checkNotNull(builder) { "No active row" } + this.builder = null + writer.write(builder.build()) + } + + override fun resolve(column: TableColumn<*>): Int { + val schema = schema + return when (column) { + RESOURCE_ID -> schema.getField("id").pos() + RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + val builder = checkNotNull(builder) { "No active row" } + builder.set( + fields[index], + when (index) { + COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli() + COL_MEM_CAPACITY -> (value as Double).roundToLong() + else -> value + } + ) + } + + override fun setBoolean(index: Int, value: Boolean) = set(index, value) + + override fun setInt(index: Int, value: Int) = set(index, value) + + override fun setLong(index: Int, value: Long) = set(index, value) + + override fun setDouble(index: Int, value: Double) = set(index, value) + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Columns with special behavior. + */ + private val COL_START_TIME = resolve(RESOURCE_START_TIME) + private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME) + private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY) +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 29818147..9b32f8fd 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -25,11 +25,16 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecord +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import java.nio.file.Files import java.nio.file.Path /** @@ -41,6 +46,18 @@ public class OdcVmTraceFormat : TraceFormat { */ override val name: String = "opendc-vm" + override fun create(path: Path) { + // Construct directory containing the trace files + Files.createDirectory(path) + + val tables = getTables(path) + + for (table in tables) { + val writer = newWriter(path, table) + writer.close() + } + } + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) override fun getDetails(path: Path, table: String): TableDetails { @@ -82,6 +99,32 @@ public class OdcVmTraceFormat : TraceFormat { } } + override fun newWriter(path: Path, table: String): TableWriter { + return when (table) { + TABLE_RESOURCES -> { + val schema = RESOURCES_SCHEMA + val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet"))) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceTableWriter(writer, schema) + } + TABLE_RESOURCE_STATES -> { + val schema = RESOURCE_STATES_SCHEMA + val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet"))) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withDictionaryEncoding("id", true) + .withBloomFilterEnabled("id", true) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceStateTableWriter(writer, schema) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + public companion object { /** * Schema for the resources table in the trace. diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 4cb7e49e..1fd076d5 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -36,6 +36,10 @@ import kotlin.io.path.bufferedReader public class SwfTraceFormat : TraceFormat { override val name: String = "swf" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -65,4 +69,8 @@ public class SwfTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts index 35190dba..14a0fc7c 100644 --- a/opendc-trace/opendc-trace-tools/build.gradle.kts +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -29,19 +29,18 @@ plugins { } application { - mainClass.set("org.opendc.trace.tools.TraceConverterKt") + mainClass.set("org.opendc.trace.tools.TraceConverter") } dependencies { api(platform(projects.opendcPlatform)) - implementation(projects.opendcTrace.opendcTraceParquet) - implementation(projects.opendcTrace.opendcTraceOpendc) - implementation(projects.opendcTrace.opendcTraceAzure) - implementation(projects.opendcTrace.opendcTraceBitbrains) - + implementation(projects.opendcTrace.opendcTraceApi) implementation(libs.kotlin.logging) implementation(libs.clikt) + runtimeOnly(projects.opendcTrace.opendcTraceOpendc) + runtimeOnly(projects.opendcTrace.opendcTraceBitbrains) + runtimeOnly(projects.opendcTrace.opendcTraceAzure) runtimeOnly(libs.log4j.slf4j) } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index cd5d287f..6fad43be 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -20,6 +20,7 @@ * SOFTWARE. */ +@file:JvmName("TraceConverter") package org.opendc.trace.tools import com.github.ajalt.clikt.core.CliktCommand @@ -29,25 +30,19 @@ import com.github.ajalt.clikt.parameters.groups.cooccurring import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.* import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* -import org.opendc.trace.opendc.OdcVmTraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File +import java.time.Duration +import java.time.Instant import java.util.* import kotlin.math.abs import kotlin.math.max import kotlin.math.min -import kotlin.math.roundToLong /** * A script to convert a trace in text format into a Parquet trace. */ -public fun main(args: Array<String>): Unit = TraceConverterCli().main(args) +fun main(args: Array<String>): Unit = TraceConverterCli().main(args) /** * Represents the command for converting traces @@ -74,11 +69,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The input format of the trace. */ - private val format by option("-f", "--format", help = "input format of trace") - .choice("bitbrains-ex", "bitbrains", "azure") + private val inputFormat by option("-f", "--input-format", help = "format of output trace") .required() /** + * The format of the output trace. + */ + private val outputFormat by option("--output-format", help = "format of output trace") + .default("opendc-vm") + + /** * The sampling options. */ private val samplingOptions by SamplingOptions().cooccurring() @@ -94,17 +94,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - val trace = Trace.open(input, format = format) + val inputTrace = Trace.open(input, format = inputFormat) + val outputTrace = Trace.create(output, format = outputFormat) logger.info { "Building resources table" } - val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet)) - .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .enablePageWriteChecksum() - .build() + val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter() - val selectedVms = metaWriter.use { convertResources(trace, it) } + val selectedVms = metaWriter.use { convertResources(inputTrace, it) } if (selectedVms.isEmpty()) { logger.warn { "No VMs selected" } @@ -114,23 +111,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { logger.info { "Wrote ${selectedVms.size} rows" } logger.info { "Building resource states table" } - val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet)) - .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withBloomFilterNDV("id", selectedVms.size.toLong()) - .enableValidation() - .build() + val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter() - val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } + val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) } logger.info { "Wrote $statesCount rows" } } /** * Convert the resources table for the trace. */ - private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> { + private fun convertResources(trace: Trace, writer: TableWriter): Set<String> { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() @@ -168,18 +158,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { continue } - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA) - - builder["id"] = id - builder["start_time"] = startTime - builder["stop_time"] = stopTime - builder["cpu_count"] = numCpus - builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong() - logger.info { "Selecting VM $id" } - - writer.write(builder.build()) selectedVms.add(id) + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) + writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) + writer.setInt(RESOURCE_CPU_COUNT, numCpus) + writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) + writer.endRow() } return selectedVms @@ -188,7 +176,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * Convert the resource states table for the trace. */ - private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int { + private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set<String>): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() var hasNextRow = reader.nextRow() @@ -231,15 +219,13 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { cpuCount == reader.getInt(RESOURCE_CPU_COUNT) } while (shouldContinue) - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - - builder["id"] = id - builder["timestamp"] = startTimestamp - builder["duration"] = duration - builder["cpu_count"] = cpuCount - builder["cpu_usage"] = cpuUsage - - writer.write(builder.build()) + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp)) + writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.endRow() count++ diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index 825c3d6d..c75e3cbb 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -39,6 +39,10 @@ public class WfFormatTraceFormat : TraceFormat { override val name: String = "wfformat" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -64,4 +68,8 @@ public class WfFormatTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index 2f17694f..ef88d295 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -35,6 +35,10 @@ import java.nio.file.Path public class WtfTraceFormat : TraceFormat { override val name: String = "wtf" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -67,4 +71,8 @@ public class WtfTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index bfbb1687..810f512f 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -36,10 +36,11 @@ application { dependencies { api(platform(projects.opendcPlatform)) implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcExperiments.opendcExperimentsCapelin) + implementation(projects.opendcCompute.opendcComputeWorkload) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(projects.opendcTelemetry.opendcTelemetryCompute) + implementation(projects.opendcTrace.opendcTraceApi) implementation(libs.kotlin.logging) implementation(libs.clikt) @@ -50,6 +51,7 @@ dependencies { implementation(libs.jackson.datatype.jsr310) implementation(kotlin("reflect")) + runtimeOnly(projects.opendcTrace.opendcTraceOpendc) runtimeOnly(libs.log4j.slf4j) testImplementation(libs.ktor.client.mock) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 1b518fee..40a7ea62 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -33,8 +33,6 @@ import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.PerformanceInterferenceReader -import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -181,7 +179,7 @@ class RunnerCli : CliktCommand(name = "runner") { val operational = scenario.operationalPhenomena val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) - val workload = Workload(workloadName, trace(workloadName).sampleByLoad(workloadFraction)) + val workload = trace(workloadName).sampleByLoad(workloadFraction) val failureModel = if (operational.failuresEnabled) @@ -203,7 +201,7 @@ class RunnerCli : CliktCommand(name = "runner") { // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) + simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { simulator.close() metricReader.close() |
