diff options
19 files changed, 91 insertions, 685 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 47f5f71e..06251dd3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -41,10 +41,8 @@ import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload @@ -53,7 +51,6 @@ import org.opendc.simulator.failures.FaultInjector import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock -import java.io.File import java.time.Clock import kotlin.coroutines.resume import kotlin.math.ln @@ -68,7 +65,7 @@ private val logger = KotlinLogging.logger {} /** * Construct the failure domain for the experiments. */ -public fun createFailureDomain( +fun createFailureDomain( coroutineScope: CoroutineScope, clock: Clock, seed: Int, @@ -100,7 +97,7 @@ public fun createFailureDomain( /** * Obtain the [FaultInjector] to use for the experiments. */ -public fun createFaultInjector( +fun createFaultInjector( coroutineScope: CoroutineScope, clock: Clock, random: Random, @@ -119,26 +116,9 @@ public fun createFaultInjector( } /** - * Create the trace reader from which the VM workloads are read. - */ -public fun createTraceReader( - path: File, - performanceInterferenceModel: PerformanceInterferenceModel, - vms: List<String>, - seed: Int -): Sc20StreamingParquetTraceReader { - return Sc20StreamingParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) -} - -/** * Construct the environment for a simulated compute service.. */ -public suspend fun withComputeService( +suspend fun withComputeService( clock: Clock, meterProvider: MeterProvider, environmentReader: EnvironmentReader, @@ -182,15 +162,13 @@ public suspend fun withComputeService( * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public suspend fun withMonitor( +suspend fun withMonitor( monitor: ExperimentMonitor, clock: Clock, metricProducer: MetricProducer, scheduler: ComputeService, block: suspend CoroutineScope.() -> Unit ): Unit = coroutineScope { - val monitorJobs = mutableSetOf<Job>() - // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -211,24 +189,23 @@ public suspend fun withMonitor( try { block(this) } finally { - monitorJobs.forEach(Job::cancel) reader.close() monitor.close() } } -public class ComputeMetrics { - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var unscheduledVms: Int = 0 - public var finishedVms: Int = 0 +class ComputeMetrics { + var submittedVms: Int = 0 + var queuedVms: Int = 0 + var runningVms: Int = 0 + var unscheduledVms: Int = 0 + var finishedVms: Int = 0 } /** * Collect the metrics of the compute service. */ -public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { +fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { val metrics = metricProducer.collectAllMetrics().associateBy { it.name } val res = ComputeMetrics() try { @@ -247,7 +224,7 @@ public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { /** * Process the trace. */ -public suspend fun processTrace( +suspend fun processTrace( clock: Clock, reader: TraceReader<SimWorkload>, scheduler: ComputeService, @@ -306,7 +283,7 @@ public suspend fun processTrace( /** * Create a [MeterProvider] instance for the experiment. */ -public fun createMeterProvider(clock: Clock): MeterProvider { +fun createMeterProvider(clock: Clock): MeterProvider { val powerSelector = InstrumentSelector.builder() .setInstrumentNameRegex("power\\.usage") .setInstrumentType(InstrumentType.VALUE_RECORDER) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index b70eefb2..460da303 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -32,29 +32,27 @@ import org.opendc.compute.service.scheduler.* import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.* +import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.PerformanceInterferenceModelReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation import java.io.File import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom /** * A portfolio represents a collection of scenarios are tested for the work. * * @param name The name of the portfolio. */ -public abstract class Portfolio(name: String) : Experiment(name) { +abstract class Portfolio(name: String) : Experiment(name) { /** * The logger for this portfolio instance. */ @@ -71,34 +69,29 @@ public abstract class Portfolio(name: String) : Experiment(name) { private val vmPlacements by anyOf(emptyMap<String, String>()) /** - * The path to the performance interference model. - */ - private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null) - - /** * The topology to test. */ - public abstract val topology: Topology + abstract val topology: Topology /** * The workload to test. */ - public abstract val workload: Workload + abstract val workload: Workload /** * The operational phenomenas to consider. */ - public abstract val operationalPhenomena: OperationalPhenomena + abstract val operationalPhenomena: OperationalPhenomena /** * The allocation policies to consider. */ - public abstract val allocationPolicy: String + abstract val allocationPolicy: String /** * A map of trace readers. */ - private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>() + private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>() /** * Perform a single trial for this portfolio. @@ -106,7 +99,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val environment = Sc20ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) + val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = createComputeScheduler(seeder) @@ -122,14 +115,11 @@ public abstract class Portfolio(name: String) : Experiment(name) { val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } - Sc20RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) + RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - val performanceInterferenceModel = performanceInterferenceModel - ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder.asKotlinRandom()) ?: emptyMap() - val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val monitor = ParquetExperimentMonitor( File(config.getString("output-path")), diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt index 1efd2ddf..d73d14f5 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.format.environment.sc20 +package org.opendc.experiments.capelin.env import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef @@ -37,22 +37,22 @@ import java.util.* /** * A [EnvironmentReader] for the internal environment format. * - * @param environmentFile The file describing the physical cluster. + * @param input The input stream describing the physical cluster. */ -public class Sc20ClusterEnvironmentReader( - private val input: InputStream -) : EnvironmentReader { +class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader { + /** + * Construct a [ClusterEnvironmentReader] for the specified [file]. + */ + constructor(file: File) : this(FileInputStream(file)) - public constructor(file: File) : this(FileInputStream(file)) - - public override fun read(): List<MachineDef> { + override fun read(): List<MachineDef> { var clusterIdCol = 0 var speedCol = 0 var numberOfHostsCol = 0 var memoryPerHostCol = 0 var coresPerHostCol = 0 - var clusterIdx: Int = 0 + var clusterIdx = 0 var clusterId: String var speed: Double var numberOfHosts: Int @@ -116,5 +116,7 @@ public class Sc20ClusterEnvironmentReader( return nodes } - override fun close() {} + override fun close() { + input.close() + } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 7f25137e..2ebe65ea 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -26,21 +26,17 @@ import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload -import java.util.TreeSet /** * A [TraceReader] for the internal VM workload trace format. * - * @param reader The internal trace reader to use. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - * @param run The run to which this reader belongs. + * @param rawReaders The raw trace readers to use.. + * @param workload The workload to use. + * @param seed The seed to use for workload sampling. */ -public class Sc20ParquetTraceReader( - rawReaders: List<Sc20RawParquetTraceReader>, - performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, +class ParquetTraceReader( + rawReaders: List<RawParquetTraceReader>, workload: Workload, seed: Int ) : TraceReader<SimWorkload> { @@ -59,20 +55,6 @@ public class Sc20ParquetTraceReader( } .map { sampleWorkload(it.first, workload, it.second, seed) } .flatten() - .run { - // Apply performance interference model - if (performanceInterferenceModel.isEmpty()) - this - else { - map { entry -> - val id = entry.name - val relevantPerformanceInterferenceModelItems = - performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - - entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems)) - } - } - } .iterator() override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 54151c9f..94193780 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin.trace -import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -32,14 +31,12 @@ import org.opendc.simulator.compute.workload.SimWorkload import java.io.File import java.util.UUID -private val logger = KotlinLogging.logger {} - /** * A [TraceReader] for the internal VM workload trace format. * * @param path The directory of the traces. */ -public class Sc20RawParquetTraceReader(private val path: File) { +class RawParquetTraceReader(private val path: File) { /** * Read the fragments into memory. */ @@ -136,14 +133,5 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the entries in the trace. */ - public fun read(): List<TraceEntry<SimWorkload>> = entries - - /** - * Create a [TraceReader] instance. - */ - public fun createReader(): TraceReader<SimWorkload> { - return object : TraceReader<SimWorkload>, Iterator<TraceEntry<SimWorkload>> by entries.iterator() { - override fun close() {} - } - } + fun read(): List<TraceEntry<SimWorkload>> = entries } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index 6792c2ab..a3b45f47 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -33,8 +33,6 @@ import org.apache.parquet.io.api.Binary import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.format.util.LocalInputFile -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.io.File @@ -44,7 +42,6 @@ import java.util.TreeSet import java.util.UUID import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread -import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -52,14 +49,9 @@ private val logger = KotlinLogging.logger {} * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. * * @param traceFile The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + * @param selectedVms The list of VMs to read from the trace. */ -public class Sc20StreamingParquetTraceReader( - traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel? = null, - selectedVms: List<String> = emptyList(), - random: Random -) : TraceReader<SimWorkload> { +class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ @@ -227,14 +219,6 @@ public class Sc20StreamingParquetTraceReader( buffers.remove(id) } - val relevantPerformanceInterferenceModelItems = - if (performanceInterferenceModel != null) - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), - Random(random.nextInt()) - ) - else - null val workload = SimTraceWorkload(fragments) val meta = mapOf( "cores" to maxCores, @@ -242,13 +226,7 @@ public class Sc20StreamingParquetTraceReader( "workload" to workload ) - TraceEntry( - uid, id, submissionTime, workload, - if (performanceInterferenceModel != null) - meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any) - else - meta - ) + TraceEntry(uid, id, submissionTime, workload, meta) } .sortedBy { it.start } .toList() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index d0031a66..7cd1f159 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -41,7 +41,6 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.sc20.Sc20VmPlacementReader import org.opendc.format.util.LocalOutputFile import java.io.BufferedReader import java.io.File @@ -53,7 +52,7 @@ import kotlin.math.min /** * Represents the command for converting traces */ -public class TraceConverterCli : CliktCommand(name = "trace-converter") { +class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The directory where the trace should be stored. */ @@ -149,24 +148,24 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The supported trace conversions. */ -public sealed class TraceConversion(name: String) : OptionGroup(name) { +sealed class TraceConversion(name: String) : OptionGroup(name) { /** * Read the fragments of the trace. */ - public abstract fun read( + abstract fun read( traceDirectory: File, metaSchema: Schema, metaWriter: ParquetWriter<GenericData.Record> ): MutableList<Fragment> } -public class SolvinityConversion : TraceConversion("Solvinity") { +class SolvinityConversion : TraceConversion("Solvinity") { private val clusters by option() .split(",") private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") .file(canBeDir = false) - .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } + .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } } .required() override fun read( @@ -335,7 +334,7 @@ public class SolvinityConversion : TraceConversion("Solvinity") { /** * Conversion of the Bitbrains public trace. */ -public class BitbrainsConversion : TraceConversion("Bitbrains") { +class BitbrainsConversion : TraceConversion("Bitbrains") { override fun read( traceDirectory: File, metaSchema: Schema, @@ -447,7 +446,7 @@ public class BitbrainsConversion : TraceConversion("Bitbrains") { /** * Conversion of the Azure public VM trace. */ -public class AzureConversion : TraceConversion("Azure") { +class AzureConversion : TraceConversion("Azure") { private val seed by option(help = "seed for trace sampling") .long() .default(0) @@ -604,18 +603,18 @@ public class AzureConversion : TraceConversion("Azure") { } } -public data class Fragment( - public val id: String, - public val tick: Long, - public val flops: Long, - public val duration: Long, - public val usage: Double, - public val cores: Int +data class Fragment( + val id: String, + val tick: Long, + val flops: Long, + val duration: Long, + val usage: Double, + val cores: Int ) -public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long) +class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) /** * A script to convert a trace in text format into a Parquet trace. */ -public fun main(args: Array<String>): Unit = TraceConverterCli().main(args) +fun main(args: Array<String>): Unit = TraceConverterCli().main(args) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt index 61bdea60..fb641f1b 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,11 @@ * SOFTWARE. */ -package org.opendc.format.trace.sc20 +package org.opendc.experiments.capelin.trace import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.trace.VmPlacementReader import java.io.InputStream /** @@ -34,18 +33,20 @@ import java.io.InputStream * @param input The input stream to read from. * @param mapper The Jackson object mapper to use. */ -public class Sc20VmPlacementReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : - VmPlacementReader { +public class VmPlacementReader( + private val input: InputStream, + private val mapper: ObjectMapper = jacksonObjectMapper() +) : AutoCloseable { /** - * The environment that was read from the file. + * Read the VM placements. */ - private val placements = mapper.readValue<Map<String, String>>(input) - - override fun construct(): Map<String, String> { - return placements + fun read(): Map<String, String> { + return mapper.readValue<Map<String, String>>(input) .mapKeys { "vm__workload__${it.key}.txt" } .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 } - override fun close() {} + override fun close() { + input.close() + } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 4b21b4f7..beaa798f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,12 +34,12 @@ import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher +import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation @@ -161,9 +161,8 @@ class CapelinIntegrationTest { * Obtain the trace reader for the test. */ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { - return Sc20ParquetTraceReader( - listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), - emptyMap(), + return ParquetTraceReader( + listOf(RawParquetTraceReader(File("src/test/resources/trace"))), Workload("test", fraction), seed ) @@ -174,7 +173,7 @@ class CapelinIntegrationTest { */ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") - return Sc20ClusterEnvironmentReader(stream) + return ClusterEnvironmentReader(stream) } class TestExperimentReporter : ExperimentMonitor { diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index 28928dcb..8fc4f6b8 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -38,7 +38,7 @@ import org.opendc.compute.service.scheduler.weights.RandomWeigher import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader +import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider @@ -53,7 +53,6 @@ import org.opendc.simulator.resources.SimResourceInterpreter import java.io.File import java.time.Clock import java.util.* -import kotlin.random.asKotlinRandom /** * Experiments for the OpenDC project on Energy modeling. @@ -88,7 +87,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { val meterProvider: MeterProvider = createMeterProvider(clock) val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) - val trace = Sc20StreamingParquetTraceReader(File(config.getString("trace-path"), trace), random = Random(1).asKotlinRandom()) + val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace)) withComputeService(clock, meterProvider, allocationPolicy) { scheduler -> withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt deleted file mode 100644 index 58af8453..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo - -/** - * A topology setup. - * - * @property name The name of the setup. - * @property rooms The rooms in the topology. - */ -internal data class Setup(val name: String, val rooms: List<Room>) - -/** - * A room in a topology. - * - * @property type The type of room in the topology. - * @property objects The objects in the room. - */ -internal data class Room(val type: String, val objects: List<RoomObject>) - -/** - * An object in a [Room]. - * - * @property type The type of the room object. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) -internal sealed class RoomObject(val type: String) { - /** - * A rack in a server room. - * - * @property machines The machines in the rack. - */ - internal data class Rack(val machines: List<Machine>) : RoomObject("RACK") -} - -/** - * A machine in the setup that consists of the specified CPU's represented as - * integer identifiers and ethernet speed. - * - * @property cpus The CPUs in the machine represented as integer identifiers. - * @property memories The memories in the machine represented as integer identifiers. - */ -internal data class Machine(val cpus: List<Int>, val memories: List<Int>) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt deleted file mode 100644 index 9b77702e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.LinearPowerModel -import java.io.InputStream -import java.util.* - -/** - * A parser for the JSON experiment setup files used for the SC20 paper. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { - /** - * The environment that was read from the file. - */ - private val setup: Setup = mapper.readValue(input) - - /** - * Read the environment. - */ - public override fun read(): List<MachineDef> { - var counter = 0 - return setup.rooms.flatMap { room -> - room.objects.flatMap { roomObject -> - when (roomObject) { - is RoomObject.Rack -> { - roomObject.machines.map { machine -> - val cores = machine.cpus.flatMap { id -> - when (id) { - 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } - } - 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } - } - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - val memories = machine.memories.map { id -> - when (id) { - 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - MachineDef( - UUID(0L, counter++.toLong()), - "node-$counter", - emptyMap(), - MachineModel(cores, memories), - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearPowerModel(350.0, idlePower = 200.0) - ) - } - } - } - } - } - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt deleted file mode 100644 index f30e64cf..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import java.io.Closeable -import kotlin.random.Random - -/** - * An interface for reading descriptions of performance interference models into memory. - */ -public interface PerformanceInterferenceModelReader : Closeable { - /** - * Construct a [PerformanceInterferenceModel]. - */ - public fun construct(random: Random): Map<String, PerformanceInterferenceModel> -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt index 7df1acd3..797a88d5 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt @@ -22,8 +22,6 @@ package org.opendc.format.trace -import java.io.Closeable - /** * An interface for reading workloads into memory. * @@ -31,4 +29,4 @@ import java.io.Closeable * * @param T The shape of the workloads supported by this reader. */ -public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable +public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt deleted file mode 100644 index 6861affe..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import java.io.Closeable - -/** - * An interface for reading VM placement data into memory. - */ -public interface VmPlacementReader : Closeable { - /** - * Construct a map of VMs to clusters. - */ - public fun construct(): Map<String, String> -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt deleted file mode 100644 index 0da1f7c2..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt +++ /dev/null @@ -1,7 +0,0 @@ -package org.opendc.format.trace.sc20 - -internal data class PerformanceInterferenceEntry( - val vms: List<String>, - val minServerLoad: Double, - val performanceScore: Double -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt deleted file mode 100644 index 4267737d..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.sc20 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.trace.PerformanceInterferenceModelReader -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import java.io.InputStream -import java.util.* -import kotlin.random.Random - -/** - * A parser for the JSON performance interference setup files used for the SC20 paper. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : - PerformanceInterferenceModelReader { - /** - * The computed value from the file. - */ - private val items: Map<String, TreeSet<PerformanceInterferenceModel.Item>> - - init { - val entries: List<PerformanceInterferenceEntry> = mapper.readValue(input) - val res = mutableMapOf<String, TreeSet<PerformanceInterferenceModel.Item>>() - for (entry in entries) { - val item = PerformanceInterferenceModel.Item(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore) - for (workload in entry.vms) { - res.computeIfAbsent(workload) { TreeSet() }.add(item) - } - } - - items = res - } - - override fun construct(random: Random): Map<String, PerformanceInterferenceModel> { - return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) } - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt deleted file mode 100644 index 1eb4bac2..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.sc20 - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.* -import kotlin.math.max -import kotlin.math.min -import kotlin.random.Random - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param traceDirectory The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -public class Sc20TraceReader( - traceDirectory: File, - performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List<String>, - random: Random -) : TraceReader<SimWorkload> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>() - - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - val vms = if (selectedVms.isEmpty()) { - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - } else { - selectedVms.map { - File(traceDirectory, it) - } - } - - vms - .forEachIndexed { idx, vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var timestamp: Long - var cores = -1 - var minTime = Long.MAX_VALUE - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEach { line -> - val values = line.split(" ") - - vmId = vmFile.name - timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - } - } - - val flopsFragments = sequence { - var last: SimTraceWorkload.Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(" ") - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - last = if (last != null && last!!.usage == 0.0 && cpuUsage == 0.0) { - val oldFragment = last!! - SimTraceWorkload.Fragment( - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - - if (last != null) { - yield(last!!) - } - } - } - - val uuid = UUID(0, idx.toLong()) - - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(), - Random(random.nextInt()) - ) - val workload = SimTraceWorkload(flopsFragments.asSequence()) - entries[uuid] = TraceEntry( - uuid, - vmId, - minTime, - workload, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt index 09f7de35..f5d0b65e 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -47,10 +47,9 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.sdk.toOtelClock import java.io.File @@ -167,19 +166,7 @@ public class RunnerCli : CliktCommand(name = "runner") { tracePath, scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) ) - val traceReader = Sc20RawParquetTraceReader(traceDir) - val performanceInterferenceReader = let { - val path = File(traceDir, "performance-interference-model.json") - val operational = scenario.get("operational", Document::class.java) - val enabled = operational.getBoolean("performanceInterferenceEnabled") - - if (!enabled || !path.exists()) { - return@let null - } - - path.inputStream().use { Sc20PerformanceInterferenceReader(it) } - } - + val traceReader = RawParquetTraceReader(traceDir) val targets = portfolio.get("targets", Document::class.java) val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = topologyParser.read(topologyId) @@ -187,7 +174,7 @@ public class RunnerCli : CliktCommand(name = "runner") { val results = (0 until targets.getInteger("repeatsPerScenario")).map { logger.info { "Starting repeat $it" } withTimeout(runTimeout * 1000) { - runRepeat(scenario, it, environment, traceReader, performanceInterferenceReader) + runRepeat(scenario, it, environment, traceReader) } } @@ -203,8 +190,7 @@ public class RunnerCli : CliktCommand(name = "runner") { scenario: Document, repeat: Int, environment: EnvironmentReader, - traceReader: Sc20RawParquetTraceReader, - performanceInterferenceReader: Sc20PerformanceInterferenceReader? + traceReader: RawParquetTraceReader, ): WebExperimentMonitor.Result { val monitor = WebExperimentMonitor() @@ -267,10 +253,8 @@ public class RunnerCli : CliktCommand(name = "runner") { else -> throw IllegalArgumentException("Unknown policy $policyName") } - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( + val trace = ParquetTraceReader( listOf(traceReader), - performanceInterferenceModel, Workload(workloadName, workloadFraction), seed ) |
