summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-15 23:06:08 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-19 14:02:49 +0200
commitb0ece0533825f5cd7983752330847071f4e438c4 (patch)
treeb85df385e33f8ce24fdb9da8af9a6c4bb1cb4810 /opendc-experiments/opendc-experiments-capelin/src/main
parent859ce303f0b9110c7110b918e5957c2156fa8b26 (diff)
refactor(capelin): Support flexible topology creation
This change adds support for creating flexible topologies by creating a TopologyFactory interface that is responsible for configuring the hosts of a compute service.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/main')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt25
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt121
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt)32
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt121
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt103
5 files changed, 257 insertions, 145 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 06db5569..02811d83 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -27,13 +27,14 @@ import mu.KotlinLogging
import org.opendc.compute.workload.ComputeWorkloadRunner
import org.opendc.compute.workload.export.parquet.ParquetExportMonitor
import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.trace.RawParquetTraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
@@ -100,12 +101,6 @@ abstract class Portfolio(name: String) : Experiment(name) {
*/
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val environment = ClusterEnvironmentReader(
- File(
- config.getString("env-path"),
- "${topology.name}.txt"
- )
- )
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
@@ -133,11 +128,10 @@ abstract class Portfolio(name: String) : Experiment(name) {
grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
else
null
- val simulator = ComputeWorkloadRunner(
+ val runner = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
- environment.read(),
failureModel,
performanceInterferenceModel
)
@@ -147,17 +141,22 @@ abstract class Portfolio(name: String) : Experiment(name) {
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
+ val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
try {
- simulator.run(trace)
+ // Instantiate the desired topology
+ runner.apply(topology)
+
+ // Run the workload trace
+ runner.run(trace)
} finally {
- simulator.close()
+ runner.close()
metricReader.close()
monitor.close()
}
- val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val monitorResults = collectServiceMetrics(clock.instant(), runner.producers[0])
logger.debug {
"Scheduler " +
"Success=${monitorResults.attemptsSuccess} " +
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
deleted file mode 100644
index 8d9b24f4..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.env
-
-import org.opendc.compute.workload.env.MachineDef
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.LinearPowerModel
-import java.io.File
-import java.io.FileInputStream
-import java.io.InputStream
-import java.util.*
-
-/**
- * A [EnvironmentReader] for the internal environment format.
- *
- * @param input The input stream describing the physical cluster.
- */
-class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader {
- /**
- * Construct a [ClusterEnvironmentReader] for the specified [file].
- */
- constructor(file: File) : this(FileInputStream(file))
-
- override fun read(): List<MachineDef> {
- var clusterIdCol = 0
- var speedCol = 0
- var numberOfHostsCol = 0
- var memoryPerHostCol = 0
- var coresPerHostCol = 0
-
- var clusterIdx = 0
- var clusterId: String
- var speed: Double
- var numberOfHosts: Int
- var memoryPerHost: Long
- var coresPerHost: Int
-
- val nodes = mutableListOf<MachineDef>()
- val random = Random(0)
-
- input.bufferedReader().use { reader ->
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the file
- !line.startsWith("#") && line.isNotBlank()
- }
- .forEachIndexed { idx, line ->
- val values = line.split(";")
-
- if (idx == 0) {
- val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
- clusterIdCol = header["ClusterID"]!!
- speedCol = header["Speed"]!!
- numberOfHostsCol = header["numberOfHosts"]!!
- memoryPerHostCol = header["memoryCapacityPerHost"]!!
- coresPerHostCol = header["coreCountPerHost"]!!
- return@forEachIndexed
- }
-
- clusterIdx++
- clusterId = values[clusterIdCol].trim()
- speed = values[speedCol].trim().toDouble() * 1000.0
- numberOfHosts = values[numberOfHostsCol].trim().toInt()
- memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
- coresPerHost = values[coresPerHostCol].trim().toInt()
-
- val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
- val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
-
- repeat(numberOfHosts) {
- nodes.add(
- MachineDef(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$it",
- mapOf("cluster" to clusterId),
- MachineModel(
- List(coresPerHost) { coreId ->
- ProcessingUnit(unknownProcessingNode, coreId, speed)
- },
- listOf(unknownMemoryUnit)
- ),
- // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
- // power draw of 350W.
- // Source: https://stackoverflow.com/questions/6128960
- LinearPowerModel(350.0, idlePower = 200.0)
- )
- )
- }
- }
- }
-
- return nodes
- }
-
- override fun close() {
- input.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
index 8d61c530..b8b65d28 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
@@ -20,17 +20,27 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.env
-
-import org.opendc.compute.workload.env.MachineDef
-import java.io.Closeable
+package org.opendc.experiments.capelin.topology
/**
- * An interface for reading descriptions of topology environments into memory.
+ * Definition of a compute cluster modeled in the simulation.
+ *
+ * @param id A unique identifier representing the compute cluster.
+ * @param name The name of the cluster.
+ * @param cpuCount The total number of CPUs in the cluster.
+ * @param cpuSpeed The speed of a CPU in the cluster in MHz.
+ * @param memCapacity The total memory capacity of the cluster (in MiB).
+ * @param hostCount The number of hosts in the cluster.
+ * @param memCapacityPerHost The memory capacity per host in the cluster (MiB).
+ * @param cpuCountPerHost The number of CPUs per host in the cluster.
*/
-public interface EnvironmentReader : Closeable {
- /**
- * Read the environment into a list.
- */
- public fun read(): List<MachineDef>
-}
+public data class ClusterSpec(
+ val id: String,
+ val name: String,
+ val cpuCount: Int,
+ val cpuSpeed: Double,
+ val memCapacity: Double,
+ val hostCount: Int,
+ val memCapacityPerHost: Double,
+ val cpuCountPerHost: Int
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
new file mode 100644
index 00000000..5a175f2c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.topology
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.MappingIterator
+import com.fasterxml.jackson.databind.ObjectReader
+import com.fasterxml.jackson.dataformat.csv.CsvMapper
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A helper class for reading a cluster specification file.
+ */
+class ClusterSpecReader {
+ /**
+ * The [CsvMapper] to map the environment file to an object.
+ */
+ private val mapper = CsvMapper()
+
+ /**
+ * The [ObjectReader] to convert the lines into objects.
+ */
+ private val reader: ObjectReader = mapper.readerFor(Entry::class.java).with(schema)
+
+ /**
+ * Read the specified [file].
+ */
+ fun read(file: File): List<ClusterSpec> {
+ return reader.readValues<Entry>(file).use { read(it) }
+ }
+
+ /**
+ * Read the specified [input].
+ */
+ fun read(input: InputStream): List<ClusterSpec> {
+ return reader.readValues<Entry>(input).use { read(it) }
+ }
+
+ /**
+ * Convert the specified [MappingIterator] into a list of [ClusterSpec]s.
+ */
+ private fun read(it: MappingIterator<Entry>): List<ClusterSpec> {
+ val result = mutableListOf<ClusterSpec>()
+
+ for (entry in it) {
+ val def = ClusterSpec(
+ entry.id,
+ entry.name,
+ entry.cpuCount,
+ entry.cpuSpeed * 1000, // Convert to MHz
+ entry.memCapacity * 1000, // Convert to MiB
+ entry.hostCount,
+ entry.memCapacityPerHost * 1000,
+ entry.cpuCountPerHost
+ )
+ result.add(def)
+ }
+
+ return result
+ }
+
+ private open class Entry(
+ @JsonProperty("ClusterID")
+ val id: String,
+ @JsonProperty("ClusterName")
+ val name: String,
+ @JsonProperty("Cores")
+ val cpuCount: Int,
+ @JsonProperty("Speed")
+ val cpuSpeed: Double,
+ @JsonProperty("Memory")
+ val memCapacity: Double,
+ @JsonProperty("numberOfHosts")
+ val hostCount: Int,
+ @JsonProperty("memoryCapacityPerHost")
+ val memCapacityPerHost: Double,
+ @JsonProperty("coreCountPerHost")
+ val cpuCountPerHost: Int
+ )
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("ClusterID", CsvSchema.ColumnType.STRING)
+ .addColumn("ClusterName", CsvSchema.ColumnType.STRING)
+ .addColumn("Cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Speed", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory", CsvSchema.ColumnType.NUMBER)
+ .addColumn("numberOfHosts", CsvSchema.ColumnType.NUMBER)
+ .addColumn("memoryCapacityPerHost", CsvSchema.ColumnType.NUMBER)
+ .addColumn("coreCountPerHost", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setColumnSeparator(';')
+ .setUseHeader(true)
+ .build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
new file mode 100644
index 00000000..5ab4261a
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TopologyFactories")
+package org.opendc.experiments.capelin.topology
+
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.PowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import java.io.File
+import java.io.InputStream
+import java.util.*
+import kotlin.math.roundToLong
+
+/**
+ * A [ClusterSpecReader] that is used to read the cluster definition file.
+ */
+private val reader = ClusterSpecReader()
+
+/**
+ * Construct a [Topology] from the specified [file].
+ */
+fun clusterTopology(
+ file: File,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(file), powerModel, random)
+
+/**
+ * Construct a [Topology] from the specified [input].
+ */
+fun clusterTopology(
+ input: InputStream,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(input), powerModel, random)
+
+/**
+ * Construct a [Topology] from the given list of [clusters].
+ */
+fun clusterTopology(
+ clusters: List<ClusterSpec>,
+ powerModel: PowerModel,
+ random: Random = Random(0)
+): Topology {
+ return object : Topology {
+ override fun resolve(): List<HostSpec> {
+ val hosts = mutableListOf<HostSpec>()
+ for (cluster in clusters) {
+ val cpuSpeed = cluster.cpuSpeed
+ val memoryPerHost = cluster.memCapacityPerHost.roundToLong()
+
+ val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cluster.cpuCountPerHost)
+ val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+ val machineModel = MachineModel(
+ List(cluster.cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) },
+ listOf(unknownMemoryUnit)
+ )
+
+ repeat(cluster.hostCount) {
+ val spec = HostSpec(
+ UUID(random.nextLong(), it.toLong()),
+ "node-${cluster.name}-$it",
+ mapOf("cluster" to cluster.id),
+ machineModel,
+ SimplePowerDriver(powerModel)
+ )
+
+ hosts += spec
+ }
+ }
+
+ return hosts
+ }
+
+ override fun toString(): String = "ClusterSpecTopology"
+ }
+}