diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-19 14:33:05 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-19 14:33:05 +0200 |
| commit | 453c25c4b453fa0af26bebbd8863abfb79218119 (patch) | |
| tree | 7977e81ec34ce7f57d8b14a717ccb63f54cd03cb /opendc-web | |
| parent | c1b9719aad10566c9d17f9eb757236c58a602b89 (diff) | |
| parent | 76a0f8889a4990108bc7906556dec6381647404b (diff) | |
merge: Enable re-use of virtual machine workload helpers
This pull request enables re-use of virtual machine workload helpers by extracting the helpers into a
separate module which may be used by other experiments.
- Support workload/machine CPU count mismatch
- Extract common code out of Capelin experiments
- Support flexible topology creation
- Add option for optimizing SimHost simulation
- Support creating CPU-optimized topology
- Make workload sampling model extensible
- Add support for extended Bitbrains trace format
- Add support for Azure VM trace format
- Add support for internal OpenDC VM trace format
- Optimize OpenDC VM trace format
- Add tool for converting workload traces
- Remove dependency on SnakeYaml
**Breaking API Changes**
- `RESOURCE_NCPU` and `RESOURCE_STATE_NCPU` are renamed to `RESOURCE_CPU_COUNT` and `RESOURCE_STATE_CPU_COUNT` respectively.
Diffstat (limited to 'opendc-web')
| -rw-r--r-- | opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt | 145 |
1 files changed, 72 insertions, 73 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 483558e1..1b518fee 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -28,14 +28,12 @@ import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import kotlinx.coroutines.* import mu.KotlinLogging -import org.opendc.experiments.capelin.* -import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.env.MachineDef +import org.opendc.compute.workload.* +import org.opendc.compute.workload.topology.HostSpec +import org.opendc.compute.workload.topology.Topology +import org.opendc.compute.workload.topology.apply +import org.opendc.compute.workload.util.PerformanceInterferenceReader import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.trace.ParquetTraceReader -import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader -import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel @@ -43,6 +41,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics @@ -50,12 +49,12 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.web.client.ApiClient import org.opendc.web.client.AuthConfiguration import org.opendc.web.client.model.Scenario -import org.opendc.web.client.model.Topology import java.io.File import java.net.URI import java.time.Duration import java.util.* import org.opendc.web.client.model.Portfolio as ClientPortfolio +import org.opendc.web.client.model.Topology as ClientTopology private val logger = KotlinLogging.logger {} @@ -129,18 +128,14 @@ class RunnerCli : CliktCommand(name = "runner") { /** * Run a single scenario. */ - private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebComputeMonitor.Result> { + private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMonitor.Result> { val id = scenario.id logger.info { "Constructing performance interference model" } - val traceDir = File( - tracePath, - scenario.trace.traceId - ) - val traceReader = RawParquetTraceReader(traceDir) + val workloadLoader = ComputeWorkloadLoader(tracePath) val interferenceGroups = let { - val path = File(traceDir, "performance-interference-model.json") + val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json") val operational = scenario.operationalPhenomena val enabled = operational.performanceInterferenceEnabled @@ -156,7 +151,7 @@ class RunnerCli : CliktCommand(name = "runner") { logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } - runRepeat(scenario, repeat, environment, traceReader, interferenceModel) + runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel) } } @@ -171,8 +166,8 @@ class RunnerCli : CliktCommand(name = "runner") { private suspend fun runRepeat( scenario: Scenario, repeat: Int, - environment: EnvironmentReader, - traceReader: RawParquetTraceReader, + topology: Topology, + workloadLoader: ComputeWorkloadLoader, interferenceModel: VmInterferenceModel? ): WebComputeMonitor.Result { val monitor = WebComputeMonitor() @@ -186,23 +181,18 @@ class RunnerCli : CliktCommand(name = "runner") { val operational = scenario.operationalPhenomena val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) + val workload = Workload(workloadName, trace(workloadName).sampleByLoad(workloadFraction)) - val trace = ParquetTraceReader( - listOf(traceReader), - Workload(workloadName, workloadFraction), - repeat - ) val failureModel = if (operational.failuresEnabled) - grid5000(Duration.ofDays(7), repeat) + grid5000(Duration.ofDays(7)) else null - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, - environment.read(), failureModel, interferenceModel.takeIf { operational.performanceInterferenceEnabled } ) @@ -210,7 +200,10 @@ class RunnerCli : CliktCommand(name = "runner") { val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1)) try { - simulator.run(trace) + // Instantiate the topology onto the simulator + simulator.apply(topology) + // Run workload trace + simulator.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { simulator.close() metricReader.close() @@ -292,56 +285,62 @@ class RunnerCli : CliktCommand(name = "runner") { } /** - * Convert the specified [topology] into an [EnvironmentReader] understood by Capelin. + * Convert the specified [topology] into an [Topology] understood by OpenDC. */ - private fun convert(topology: Topology): EnvironmentReader { - val nodes = mutableListOf<MachineDef>() - val random = Random(0) - - val machines = topology.rooms.asSequence() - .flatMap { room -> - room.tiles.flatMap { tile -> - tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList() - } - } - for ((rack, machine) in machines) { - val clusterId = rack.id - val position = machine.position - - val processors = machine.cpus.flatMap { cpu -> - val cores = cpu.numberOfCores - val speed = cpu.clockRateMhz - // TODO Remove hard coding of vendor - val node = ProcessingNode("Intel", "amd64", cpu.name, cores) - List(cores) { coreId -> - ProcessingUnit(node, coreId, speed) - } - } - val memoryUnits = machine.memory.map { memory -> - MemoryUnit( - "Samsung", - memory.name, - memory.speedMbPerS, - memory.sizeMb.toLong() - ) - } + private fun convert(topology: ClientTopology): Topology { + return object : Topology { + + override fun resolve(): List<HostSpec> { + val res = mutableListOf<HostSpec>() + val random = Random(0) + + val machines = topology.rooms.asSequence() + .flatMap { room -> + room.tiles.flatMap { tile -> + tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList() + } + } + for ((rack, machine) in machines) { + val clusterId = rack.id + val position = machine.position + + val processors = machine.cpus.flatMap { cpu -> + val cores = cpu.numberOfCores + val speed = cpu.clockRateMhz + // TODO Remove hard coding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.name, cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = machine.memory.map { memory -> + MemoryUnit( + "Samsung", + memory.name, + memory.speedMbPerS, + memory.sizeMb.toLong() + ) + } - val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } + val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } + val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) + val powerDriver = SimplePowerDriver(powerModel) - nodes.add( - MachineDef( - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$position", - mapOf("cluster" to clusterId), - MachineModel(processors, memoryUnits), - LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) - ) - ) - } + val spec = HostSpec( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf("cluster" to clusterId), + MachineModel(processors, memoryUnits), + powerDriver + ) + + res += spec + } + + return res + } - return object : EnvironmentReader { - override fun read(): List<MachineDef> = nodes - override fun close() {} + override fun toString(): String = "WebRunnerTopologyFactory" } } } |
