summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt99
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt48
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt)13
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt)22
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt25
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt121
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt121
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt39
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json22
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt116
13 files changed, 474 insertions, 302 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
index cc9f2705..29d84a9a 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -33,12 +33,9 @@ import kotlinx.coroutines.yield
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.workload.env.MachineDef
+import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.trace.TraceReader
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.SimHypervisorProvider
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResourceInterpreter
@@ -50,15 +47,19 @@ import kotlin.math.max
/**
* Helper class to simulated VM-based workloads in OpenDC.
+ *
+ * @param context [CoroutineContext] to run the simulation in.
+ * @param clock [Clock] instance tracking simulation time.
+ * @param scheduler [ComputeScheduler] implementation to use for the service.
+ * @param failureModel A failure model to use for injecting failures.
+ * @param interferenceModel The model to use for performance interference.
*/
public class ComputeWorkloadRunner(
private val context: CoroutineContext,
private val clock: Clock,
scheduler: ComputeScheduler,
- machines: List<MachineDef>,
private val failureModel: FailureModel? = null,
- interferenceModel: VmInterferenceModel? = null,
- hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider()
+ private val interferenceModel: VmInterferenceModel? = null,
) : AutoCloseable {
/**
* The [ComputeService] that has been configured by the manager.
@@ -86,13 +87,6 @@ public class ComputeWorkloadRunner(
val (service, serviceMeterProvider) = createService(scheduler)
this._metricProducers.add(serviceMeterProvider)
this.service = service
-
- for (def in machines) {
- val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel)
- this._metricProducers.add(hostMeterProvider)
- hosts.add(host)
- this.service.addHost(host)
- }
}
/**
@@ -156,6 +150,46 @@ public class ComputeWorkloadRunner(
}
}
+ /**
+ * Register a host for this simulation.
+ *
+ * @param spec The definition of the host.
+ * @return The [SimHost] that has been constructed by the runner.
+ */
+ public fun registerHost(spec: HostSpec): SimHost {
+ val resource = Resource.builder()
+ .put(HOST_ID, spec.uid.toString())
+ .put(HOST_NAME, spec.name)
+ .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(HOST_NCPUS, spec.model.cpus.size)
+ .put(HOST_MEM_CAPACITY, spec.model.memory.sumOf { it.size })
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+ _metricProducers.add(meterProvider)
+
+ val host = SimHost(
+ spec.uid,
+ spec.name,
+ spec.model,
+ spec.meta,
+ context,
+ interpreter,
+ meterProvider,
+ spec.hypervisor,
+ powerDriver = spec.powerDriver,
+ interferenceDomain = interferenceModel?.newDomain(),
+ )
+
+ hosts.add(host)
+ service.addHost(host)
+
+ return host
+ }
+
override fun close() {
service.close()
@@ -182,41 +216,4 @@ public class ComputeWorkloadRunner(
val service = ComputeService(context, clock, meterProvider, scheduler)
return service to meterProvider
}
-
- /**
- * Construct a [SimHost] instance for the specified [MachineDef].
- */
- private fun createHost(
- def: MachineDef,
- hypervisorProvider: SimHypervisorProvider,
- interferenceModel: VmInterferenceModel? = null
- ): Pair<SimHost, SdkMeterProvider> {
- val resource = Resource.builder()
- .put(HOST_ID, def.uid.toString())
- .put(HOST_NAME, def.name)
- .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
- .put(HOST_NCPUS, def.model.cpus.size)
- .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size })
- .build()
-
- val meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .build()
-
- val host = SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- context,
- interpreter,
- meterProvider,
- hypervisorProvider,
- powerDriver = SimplePowerDriver(def.powerModel),
- interferenceDomain = interferenceModel?.newDomain()
- )
-
- return host to meterProvider
- }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
new file mode 100644
index 00000000..f3dc1e9e
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.topology
+
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.power.PowerDriver
+import java.util.*
+
+/**
+ * Description of a physical host that will be simulated by OpenDC and host the virtual machines.
+ *
+ * @param uid Unique identifier of the host.
+ * @param name The name of the host.
+ * @param meta The metadata of the host.
+ * @param model The physical model of the machine.
+ * @param powerDriver The [PowerDriver] to model the power consumption of the machine.
+ * @param hypervisor The hypervisor implementation to use.
+ */
+public data class HostSpec(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: MachineModel,
+ val powerDriver: PowerDriver,
+ val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider()
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt
index 8d61c530..3b8dc918 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt
@@ -20,17 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.env
-
-import org.opendc.compute.workload.env.MachineDef
-import java.io.Closeable
+package org.opendc.compute.workload.topology
/**
- * An interface for reading descriptions of topology environments into memory.
+ * Representation of the environment of the compute service, describing the physical details of every host.
*/
-public interface EnvironmentReader : Closeable {
+public interface Topology {
/**
- * Read the environment into a list.
+ * Resolve the [Topology] into a list of [HostSpec]s.
*/
- public fun read(): List<MachineDef>
+ public fun resolve(): List<HostSpec>
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt
index c1695696..09159a93 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt
@@ -20,19 +20,17 @@
* SOFTWARE.
*/
-package org.opendc.compute.workload.env
+@file:JvmName("TopologyHelpers")
+package org.opendc.compute.workload.topology
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.power.PowerModel
-import java.util.*
+import org.opendc.compute.workload.ComputeWorkloadRunner
/**
- * A definition of a machine in a cluster.
+ * Apply the specified [topology] to the given [ComputeWorkloadRunner].
*/
-public data class MachineDef(
- val uid: UUID,
- val name: String,
- val meta: Map<String, Any>,
- val model: MachineModel,
- val powerModel: PowerModel
-)
+public fun ComputeWorkloadRunner.apply(topology: Topology) {
+ val hosts = topology.resolve()
+ for (spec in hosts) {
+ registerHost(spec)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 010d18b0..4bcbaf61 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -45,6 +45,7 @@ dependencies {
implementation(libs.kotlin.logging)
implementation(libs.jackson.databind)
implementation(libs.jackson.module.kotlin)
+ implementation(libs.jackson.dataformat.csv)
implementation(kotlin("reflect"))
implementation(libs.opentelemetry.semconv)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 06db5569..02811d83 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -27,13 +27,14 @@ import mu.KotlinLogging
import org.opendc.compute.workload.ComputeWorkloadRunner
import org.opendc.compute.workload.export.parquet.ParquetExportMonitor
import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.trace.RawParquetTraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
@@ -100,12 +101,6 @@ abstract class Portfolio(name: String) : Experiment(name) {
*/
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val environment = ClusterEnvironmentReader(
- File(
- config.getString("env-path"),
- "${topology.name}.txt"
- )
- )
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
@@ -133,11 +128,10 @@ abstract class Portfolio(name: String) : Experiment(name) {
grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
else
null
- val simulator = ComputeWorkloadRunner(
+ val runner = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
- environment.read(),
failureModel,
performanceInterferenceModel
)
@@ -147,17 +141,22 @@ abstract class Portfolio(name: String) : Experiment(name) {
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
+ val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
try {
- simulator.run(trace)
+ // Instantiate the desired topology
+ runner.apply(topology)
+
+ // Run the workload trace
+ runner.run(trace)
} finally {
- simulator.close()
+ runner.close()
metricReader.close()
monitor.close()
}
- val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val monitorResults = collectServiceMetrics(clock.instant(), runner.producers[0])
logger.debug {
"Scheduler " +
"Success=${monitorResults.attemptsSuccess} " +
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
deleted file mode 100644
index 8d9b24f4..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.env
-
-import org.opendc.compute.workload.env.MachineDef
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.LinearPowerModel
-import java.io.File
-import java.io.FileInputStream
-import java.io.InputStream
-import java.util.*
-
-/**
- * A [EnvironmentReader] for the internal environment format.
- *
- * @param input The input stream describing the physical cluster.
- */
-class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader {
- /**
- * Construct a [ClusterEnvironmentReader] for the specified [file].
- */
- constructor(file: File) : this(FileInputStream(file))
-
- override fun read(): List<MachineDef> {
- var clusterIdCol = 0
- var speedCol = 0
- var numberOfHostsCol = 0
- var memoryPerHostCol = 0
- var coresPerHostCol = 0
-
- var clusterIdx = 0
- var clusterId: String
- var speed: Double
- var numberOfHosts: Int
- var memoryPerHost: Long
- var coresPerHost: Int
-
- val nodes = mutableListOf<MachineDef>()
- val random = Random(0)
-
- input.bufferedReader().use { reader ->
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the file
- !line.startsWith("#") && line.isNotBlank()
- }
- .forEachIndexed { idx, line ->
- val values = line.split(";")
-
- if (idx == 0) {
- val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
- clusterIdCol = header["ClusterID"]!!
- speedCol = header["Speed"]!!
- numberOfHostsCol = header["numberOfHosts"]!!
- memoryPerHostCol = header["memoryCapacityPerHost"]!!
- coresPerHostCol = header["coreCountPerHost"]!!
- return@forEachIndexed
- }
-
- clusterIdx++
- clusterId = values[clusterIdCol].trim()
- speed = values[speedCol].trim().toDouble() * 1000.0
- numberOfHosts = values[numberOfHostsCol].trim().toInt()
- memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
- coresPerHost = values[coresPerHostCol].trim().toInt()
-
- val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
- val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
-
- repeat(numberOfHosts) {
- nodes.add(
- MachineDef(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$it",
- mapOf("cluster" to clusterId),
- MachineModel(
- List(coresPerHost) { coreId ->
- ProcessingUnit(unknownProcessingNode, coreId, speed)
- },
- listOf(unknownMemoryUnit)
- ),
- // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
- // power draw of 350W.
- // Source: https://stackoverflow.com/questions/6128960
- LinearPowerModel(350.0, idlePower = 200.0)
- )
- )
- }
- }
- }
-
- return nodes
- }
-
- override fun close() {
- input.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
new file mode 100644
index 00000000..b8b65d28
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.topology
+
+/**
+ * Definition of a compute cluster modeled in the simulation.
+ *
+ * @param id A unique identifier representing the compute cluster.
+ * @param name The name of the cluster.
+ * @param cpuCount The total number of CPUs in the cluster.
+ * @param cpuSpeed The speed of a CPU in the cluster in MHz.
+ * @param memCapacity The total memory capacity of the cluster (in MiB).
+ * @param hostCount The number of hosts in the cluster.
+ * @param memCapacityPerHost The memory capacity per host in the cluster (MiB).
+ * @param cpuCountPerHost The number of CPUs per host in the cluster.
+ */
+public data class ClusterSpec(
+ val id: String,
+ val name: String,
+ val cpuCount: Int,
+ val cpuSpeed: Double,
+ val memCapacity: Double,
+ val hostCount: Int,
+ val memCapacityPerHost: Double,
+ val cpuCountPerHost: Int
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
new file mode 100644
index 00000000..5a175f2c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.topology
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.MappingIterator
+import com.fasterxml.jackson.databind.ObjectReader
+import com.fasterxml.jackson.dataformat.csv.CsvMapper
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A helper class for reading a cluster specification file.
+ */
+class ClusterSpecReader {
+ /**
+ * The [CsvMapper] to map the environment file to an object.
+ */
+ private val mapper = CsvMapper()
+
+ /**
+ * The [ObjectReader] to convert the lines into objects.
+ */
+ private val reader: ObjectReader = mapper.readerFor(Entry::class.java).with(schema)
+
+ /**
+ * Read the specified [file].
+ */
+ fun read(file: File): List<ClusterSpec> {
+ return reader.readValues<Entry>(file).use { read(it) }
+ }
+
+ /**
+ * Read the specified [input].
+ */
+ fun read(input: InputStream): List<ClusterSpec> {
+ return reader.readValues<Entry>(input).use { read(it) }
+ }
+
+ /**
+ * Convert the specified [MappingIterator] into a list of [ClusterSpec]s.
+ */
+ private fun read(it: MappingIterator<Entry>): List<ClusterSpec> {
+ val result = mutableListOf<ClusterSpec>()
+
+ for (entry in it) {
+ val def = ClusterSpec(
+ entry.id,
+ entry.name,
+ entry.cpuCount,
+ entry.cpuSpeed * 1000, // Convert to MHz
+ entry.memCapacity * 1000, // Convert to MiB
+ entry.hostCount,
+ entry.memCapacityPerHost * 1000,
+ entry.cpuCountPerHost
+ )
+ result.add(def)
+ }
+
+ return result
+ }
+
+ private open class Entry(
+ @JsonProperty("ClusterID")
+ val id: String,
+ @JsonProperty("ClusterName")
+ val name: String,
+ @JsonProperty("Cores")
+ val cpuCount: Int,
+ @JsonProperty("Speed")
+ val cpuSpeed: Double,
+ @JsonProperty("Memory")
+ val memCapacity: Double,
+ @JsonProperty("numberOfHosts")
+ val hostCount: Int,
+ @JsonProperty("memoryCapacityPerHost")
+ val memCapacityPerHost: Double,
+ @JsonProperty("coreCountPerHost")
+ val cpuCountPerHost: Int
+ )
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("ClusterID", CsvSchema.ColumnType.STRING)
+ .addColumn("ClusterName", CsvSchema.ColumnType.STRING)
+ .addColumn("Cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Speed", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory", CsvSchema.ColumnType.NUMBER)
+ .addColumn("numberOfHosts", CsvSchema.ColumnType.NUMBER)
+ .addColumn("memoryCapacityPerHost", CsvSchema.ColumnType.NUMBER)
+ .addColumn("coreCountPerHost", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setColumnSeparator(';')
+ .setUseHeader(true)
+ .build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
new file mode 100644
index 00000000..5ab4261a
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TopologyFactories")
+package org.opendc.experiments.capelin.topology
+
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.PowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import java.io.File
+import java.io.InputStream
+import java.util.*
+import kotlin.math.roundToLong
+
+/**
+ * A [ClusterSpecReader] that is used to read the cluster definition file.
+ */
+private val reader = ClusterSpecReader()
+
+/**
+ * Construct a [Topology] from the specified [file].
+ */
+fun clusterTopology(
+ file: File,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(file), powerModel, random)
+
+/**
+ * Construct a [Topology] from the specified [input].
+ */
+fun clusterTopology(
+ input: InputStream,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(input), powerModel, random)
+
+/**
+ * Construct a [Topology] from the given list of [clusters].
+ */
+fun clusterTopology(
+ clusters: List<ClusterSpec>,
+ powerModel: PowerModel,
+ random: Random = Random(0)
+): Topology {
+ return object : Topology {
+ override fun resolve(): List<HostSpec> {
+ val hosts = mutableListOf<HostSpec>()
+ for (cluster in clusters) {
+ val cpuSpeed = cluster.cpuSpeed
+ val memoryPerHost = cluster.memCapacityPerHost.roundToLong()
+
+ val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cluster.cpuCountPerHost)
+ val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+ val machineModel = MachineModel(
+ List(cluster.cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) },
+ listOf(unknownMemoryUnit)
+ )
+
+ repeat(cluster.hostCount) {
+ val spec = HostSpec(
+ UUID(random.nextLong(), it.toLong()),
+ "node-${cluster.name}-$it",
+ mapOf("cluster" to cluster.id),
+ machineModel,
+ SimplePowerDriver(powerModel)
+ )
+
+ hosts += spec
+ }
+ }
+
+ return hosts
+ }
+
+ override fun toString(): String = "ClusterSpecTopology"
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index b0f86346..c1386bfe 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -33,12 +33,13 @@ import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.ComputeWorkloadRunner
import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.trace.RawParquetTraceReader
import org.opendc.compute.workload.trace.TraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
-import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
@@ -84,18 +85,16 @@ class CapelinIntegrationTest {
@Test
fun testLarge() = runBlockingSimulation {
val traceReader = createTestTraceReader()
- val environmentReader = createTestEnvironmentReader()
-
val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
- computeScheduler,
- environmentReader.read(),
+ computeScheduler
)
-
+ val topology = createTopology()
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
+ simulator.apply(topology)
simulator.run(traceReader)
} finally {
simulator.close()
@@ -133,18 +132,17 @@ class CapelinIntegrationTest {
fun testSmall() = runBlockingSimulation {
val seed = 1
val traceReader = createTestTraceReader(0.25, seed)
- val environmentReader = createTestEnvironmentReader("single")
val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
- computeScheduler,
- environmentReader.read(),
+ computeScheduler
)
-
+ val topology = createTopology("single")
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
+ simulator.apply(topology)
simulator.run(traceReader)
} finally {
simulator.close()
@@ -177,7 +175,6 @@ class CapelinIntegrationTest {
fun testInterference() = runBlockingSimulation {
val seed = 1
val traceReader = createTestTraceReader(0.25, seed)
- val environmentReader = createTestEnvironmentReader("single")
val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
val performanceInterferenceModel =
@@ -189,13 +186,13 @@ class CapelinIntegrationTest {
coroutineContext,
clock,
computeScheduler,
- environmentReader.read(),
interferenceModel = performanceInterferenceModel
)
-
+ val topology = createTopology("single")
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
+ simulator.apply(topology)
simulator.run(traceReader)
} finally {
simulator.close()
@@ -227,20 +224,18 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
- val environmentReader = createTestEnvironmentReader("single")
-
val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
- environmentReader.read(),
grid5000(Duration.ofDays(7), seed)
)
-
+ val topology = createTopology("single")
+ val traceReader = createTestTraceReader(0.25, seed)
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
+ simulator.apply(topology)
simulator.run(traceReader)
} finally {
simulator.close()
@@ -279,11 +274,11 @@ class CapelinIntegrationTest {
}
/**
- * Obtain the environment reader for the test.
+ * Obtain the topology factory for the test.
*/
- private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
+ private fun createTopology(name: String = "topology"): Topology {
val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt"))
- return ClusterEnvironmentReader(stream)
+ return stream.use { clusterTopology(stream) }
}
class TestExperimentReporter : ComputeMonitor {
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json
deleted file mode 100644
index 1be5852b..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json
+++ /dev/null
@@ -1,22 +0,0 @@
-[
- {
- "vms": [
- "vm_a",
- "vm_c",
- "vm_x",
- "vm_y"
- ],
- "minServerLoad": 0.0,
- "performanceScore": 0.8830158730158756
- },
- {
- "vms": [
- "vm_a",
- "vm_b",
- "vm_c",
- "vm_d"
- ],
- "minServerLoad": 0.0,
- "performanceScore": 0.7133055555552751
- }
-]
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 497a7281..48183d71 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -29,11 +29,12 @@ import com.github.ajalt.clikt.parameters.types.long
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.workload.ComputeWorkloadRunner
-import org.opendc.compute.workload.env.MachineDef
import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.trace.RawParquetTraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.util.createComputeScheduler
@@ -43,6 +44,7 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
@@ -50,12 +52,12 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
import org.opendc.web.client.model.Scenario
-import org.opendc.web.client.model.Topology
import java.io.File
import java.net.URI
import java.time.Duration
import java.util.*
import org.opendc.web.client.model.Portfolio as ClientPortfolio
+import org.opendc.web.client.model.Topology as ClientTopology
private val logger = KotlinLogging.logger {}
@@ -129,7 +131,7 @@ class RunnerCli : CliktCommand(name = "runner") {
/**
* Run a single scenario.
*/
- private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebComputeMonitor.Result> {
+ private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMonitor.Result> {
val id = scenario.id
logger.info { "Constructing performance interference model" }
@@ -156,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") {
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
- runRepeat(scenario, repeat, environment, traceReader, interferenceModel)
+ runRepeat(scenario, repeat, topology, traceReader, interferenceModel)
}
}
@@ -171,7 +173,7 @@ class RunnerCli : CliktCommand(name = "runner") {
private suspend fun runRepeat(
scenario: Scenario,
repeat: Int,
- environment: EnvironmentReader,
+ topology: Topology,
traceReader: RawParquetTraceReader,
interferenceModel: VmInterferenceModel?
): WebComputeMonitor.Result {
@@ -202,7 +204,6 @@ class RunnerCli : CliktCommand(name = "runner") {
coroutineContext,
clock,
computeScheduler,
- environment.read(),
failureModel,
interferenceModel.takeIf { operational.performanceInterferenceEnabled }
)
@@ -210,6 +211,9 @@ class RunnerCli : CliktCommand(name = "runner") {
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1))
try {
+ // Instantiate the topology onto the simulator
+ simulator.apply(topology)
+ // Run workload trace
simulator.run(trace)
} finally {
simulator.close()
@@ -292,56 +296,62 @@ class RunnerCli : CliktCommand(name = "runner") {
}
/**
- * Convert the specified [topology] into an [EnvironmentReader] understood by Capelin.
+ * Convert the specified [topology] into an [Topology] understood by OpenDC.
*/
- private fun convert(topology: Topology): EnvironmentReader {
- val nodes = mutableListOf<MachineDef>()
- val random = Random(0)
-
- val machines = topology.rooms.asSequence()
- .flatMap { room ->
- room.tiles.flatMap { tile ->
- tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList()
- }
- }
- for ((rack, machine) in machines) {
- val clusterId = rack.id
- val position = machine.position
-
- val processors = machine.cpus.flatMap { cpu ->
- val cores = cpu.numberOfCores
- val speed = cpu.clockRateMhz
- // TODO Remove hard coding of vendor
- val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
- List(cores) { coreId ->
- ProcessingUnit(node, coreId, speed)
- }
- }
- val memoryUnits = machine.memory.map { memory ->
- MemoryUnit(
- "Samsung",
- memory.name,
- memory.speedMbPerS,
- memory.sizeMb.toLong()
- )
- }
+ private fun convert(topology: ClientTopology): Topology {
+ return object : Topology {
+
+ override fun resolve(): List<HostSpec> {
+ val res = mutableListOf<HostSpec>()
+ val random = Random(0)
+
+ val machines = topology.rooms.asSequence()
+ .flatMap { room ->
+ room.tiles.flatMap { tile ->
+ tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList()
+ }
+ }
+ for ((rack, machine) in machines) {
+ val clusterId = rack.id
+ val position = machine.position
+
+ val processors = machine.cpus.flatMap { cpu ->
+ val cores = cpu.numberOfCores
+ val speed = cpu.clockRateMhz
+ // TODO Remove hard coding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits = machine.memory.map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.name,
+ memory.speedMbPerS,
+ memory.sizeMb.toLong()
+ )
+ }
- val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
+ val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
+ val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
+ val powerDriver = SimplePowerDriver(powerModel)
- nodes.add(
- MachineDef(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$position",
- mapOf("cluster" to clusterId),
- MachineModel(processors, memoryUnits),
- LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
- )
- )
- }
+ val spec = HostSpec(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf("cluster" to clusterId),
+ MachineModel(processors, memoryUnits),
+ powerDriver
+ )
+
+ res += spec
+ }
+
+ return res
+ }
- return object : EnvironmentReader {
- override fun read(): List<MachineDef> = nodes
- override fun close() {}
+ override fun toString(): String = "WebRunnerTopologyFactory"
}
}
}