summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-15 23:06:08 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-19 14:02:49 +0200
commitb0ece0533825f5cd7983752330847071f4e438c4 (patch)
treeb85df385e33f8ce24fdb9da8af9a6c4bb1cb4810 /opendc-web/opendc-web-runner/src
parent859ce303f0b9110c7110b918e5957c2156fa8b26 (diff)
refactor(capelin): Support flexible topology creation
This change adds support for creating flexible topologies by creating a TopologyFactory interface that is responsible for configuring the hosts of a compute service.
Diffstat (limited to 'opendc-web/opendc-web-runner/src')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt116
1 files changed, 63 insertions, 53 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 497a7281..48183d71 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -29,11 +29,12 @@ import com.github.ajalt.clikt.parameters.types.long
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.workload.ComputeWorkloadRunner
-import org.opendc.compute.workload.env.MachineDef
import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.trace.RawParquetTraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.util.createComputeScheduler
@@ -43,6 +44,7 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
@@ -50,12 +52,12 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
import org.opendc.web.client.model.Scenario
-import org.opendc.web.client.model.Topology
import java.io.File
import java.net.URI
import java.time.Duration
import java.util.*
import org.opendc.web.client.model.Portfolio as ClientPortfolio
+import org.opendc.web.client.model.Topology as ClientTopology
private val logger = KotlinLogging.logger {}
@@ -129,7 +131,7 @@ class RunnerCli : CliktCommand(name = "runner") {
/**
* Run a single scenario.
*/
- private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebComputeMonitor.Result> {
+ private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMonitor.Result> {
val id = scenario.id
logger.info { "Constructing performance interference model" }
@@ -156,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") {
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
- runRepeat(scenario, repeat, environment, traceReader, interferenceModel)
+ runRepeat(scenario, repeat, topology, traceReader, interferenceModel)
}
}
@@ -171,7 +173,7 @@ class RunnerCli : CliktCommand(name = "runner") {
private suspend fun runRepeat(
scenario: Scenario,
repeat: Int,
- environment: EnvironmentReader,
+ topology: Topology,
traceReader: RawParquetTraceReader,
interferenceModel: VmInterferenceModel?
): WebComputeMonitor.Result {
@@ -202,7 +204,6 @@ class RunnerCli : CliktCommand(name = "runner") {
coroutineContext,
clock,
computeScheduler,
- environment.read(),
failureModel,
interferenceModel.takeIf { operational.performanceInterferenceEnabled }
)
@@ -210,6 +211,9 @@ class RunnerCli : CliktCommand(name = "runner") {
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1))
try {
+ // Instantiate the topology onto the simulator
+ simulator.apply(topology)
+ // Run workload trace
simulator.run(trace)
} finally {
simulator.close()
@@ -292,56 +296,62 @@ class RunnerCli : CliktCommand(name = "runner") {
}
/**
- * Convert the specified [topology] into an [EnvironmentReader] understood by Capelin.
+ * Convert the specified [topology] into an [Topology] understood by OpenDC.
*/
- private fun convert(topology: Topology): EnvironmentReader {
- val nodes = mutableListOf<MachineDef>()
- val random = Random(0)
-
- val machines = topology.rooms.asSequence()
- .flatMap { room ->
- room.tiles.flatMap { tile ->
- tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList()
- }
- }
- for ((rack, machine) in machines) {
- val clusterId = rack.id
- val position = machine.position
-
- val processors = machine.cpus.flatMap { cpu ->
- val cores = cpu.numberOfCores
- val speed = cpu.clockRateMhz
- // TODO Remove hard coding of vendor
- val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
- List(cores) { coreId ->
- ProcessingUnit(node, coreId, speed)
- }
- }
- val memoryUnits = machine.memory.map { memory ->
- MemoryUnit(
- "Samsung",
- memory.name,
- memory.speedMbPerS,
- memory.sizeMb.toLong()
- )
- }
+ private fun convert(topology: ClientTopology): Topology {
+ return object : Topology {
+
+ override fun resolve(): List<HostSpec> {
+ val res = mutableListOf<HostSpec>()
+ val random = Random(0)
+
+ val machines = topology.rooms.asSequence()
+ .flatMap { room ->
+ room.tiles.flatMap { tile ->
+ tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList()
+ }
+ }
+ for ((rack, machine) in machines) {
+ val clusterId = rack.id
+ val position = machine.position
+
+ val processors = machine.cpus.flatMap { cpu ->
+ val cores = cpu.numberOfCores
+ val speed = cpu.clockRateMhz
+ // TODO Remove hard coding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits = machine.memory.map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.name,
+ memory.speedMbPerS,
+ memory.sizeMb.toLong()
+ )
+ }
- val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
+ val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
+ val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
+ val powerDriver = SimplePowerDriver(powerModel)
- nodes.add(
- MachineDef(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$position",
- mapOf("cluster" to clusterId),
- MachineModel(processors, memoryUnits),
- LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
- )
- )
- }
+ val spec = HostSpec(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf("cluster" to clusterId),
+ MachineModel(processors, memoryUnits),
+ powerDriver
+ )
+
+ res += spec
+ }
+
+ return res
+ }
- return object : EnvironmentReader {
- override fun read(): List<MachineDef> = nodes
- override fun close() {}
+ override fun toString(): String = "WebRunnerTopologyFactory"
}
}
}