summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-24 14:29:29 +0200
committerGitHub <noreply@github.com>2021-06-24 14:29:29 +0200
commit36cb3c0cf642990a7b087a56d627a0de4fe2e71f (patch)
tree67c09fa437bc9b1f37f23b80b970b6aa686ad818 /opendc-experiments/opendc-experiments-capelin/src
parenta29a61334adb8432c69800b19508eca4eff4bfd1 (diff)
parente56967a29ac2b2d26cc085b1f3e27096dad6a170 (diff)
simulator: Support perf interference in uniform resource model
This pull request re-implements the performance interference model to integrate with the uniform resource model in OpenDC. This forms the basis for other forms of resource interference (e.g., network or disk). * Add interface for resource interference in uniform resource model (`opendc-simulator-resources`) * Remove dependency on performance interference model from trace readers * Re-implement the performance interference model on top of the interface in the uniform resource model. **Breaking API Changes** * The original performance interference model classes are removed * The SC20 trace and environment related readers have moved to the Capelin experiments module. * Changes to the interfaces in `opendc-format`. Implements #103
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt45
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt122
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt)28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt65
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt)16
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt)28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt)33
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt52
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt15
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json22
12 files changed, 381 insertions, 148 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 47f5f71e..9548253d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -41,11 +41,11 @@ import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
@@ -53,7 +53,6 @@ import org.opendc.simulator.failures.FaultInjector
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
-import java.io.File
import java.time.Clock
import kotlin.coroutines.resume
import kotlin.math.ln
@@ -68,7 +67,7 @@ private val logger = KotlinLogging.logger {}
/**
* Construct the failure domain for the experiments.
*/
-public fun createFailureDomain(
+fun createFailureDomain(
coroutineScope: CoroutineScope,
clock: Clock,
seed: Int,
@@ -100,7 +99,7 @@ public fun createFailureDomain(
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-public fun createFaultInjector(
+fun createFaultInjector(
coroutineScope: CoroutineScope,
clock: Clock,
random: Random,
@@ -119,30 +118,14 @@ public fun createFaultInjector(
}
/**
- * Create the trace reader from which the VM workloads are read.
- */
-public fun createTraceReader(
- path: File,
- performanceInterferenceModel: PerformanceInterferenceModel,
- vms: List<String>,
- seed: Int
-): Sc20StreamingParquetTraceReader {
- return Sc20StreamingParquetTraceReader(
- path,
- performanceInterferenceModel,
- vms,
- Random(seed)
- )
-}
-
-/**
* Construct the environment for a simulated compute service..
*/
-public suspend fun withComputeService(
+suspend fun withComputeService(
clock: Clock,
meterProvider: MeterProvider,
environmentReader: EnvironmentReader,
scheduler: ComputeScheduler,
+ interferenceModel: VmInterferenceModel? = null,
block: suspend CoroutineScope.(ComputeService) -> Unit
): Unit = coroutineScope {
val interpreter = SimResourceInterpreter(coroutineContext, clock)
@@ -158,7 +141,8 @@ public suspend fun withComputeService(
interpreter,
meterProvider.get("opendc-compute-simulator"),
SimFairShareHypervisorProvider(),
- def.powerModel
+ powerDriver = SimplePowerDriver(def.powerModel),
+ interferenceDomain = interferenceModel?.newDomain()
)
}
@@ -181,16 +165,13 @@ public suspend fun withComputeService(
/**
* Attach the specified monitor to the VM provisioner.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
-public suspend fun withMonitor(
+suspend fun withMonitor(
monitor: ExperimentMonitor,
clock: Clock,
metricProducer: MetricProducer,
scheduler: ComputeService,
block: suspend CoroutineScope.() -> Unit
): Unit = coroutineScope {
- val monitorJobs = mutableSetOf<Job>()
-
// Monitor host events
for (host in scheduler.hosts) {
monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
@@ -211,24 +192,23 @@ public suspend fun withMonitor(
try {
block(this)
} finally {
- monitorJobs.forEach(Job::cancel)
reader.close()
monitor.close()
}
}
-public class ComputeMetrics {
- public var submittedVms: Int = 0
- public var queuedVms: Int = 0
- public var runningVms: Int = 0
- public var unscheduledVms: Int = 0
- public var finishedVms: Int = 0
+class ComputeMetrics {
+ var submittedVms: Int = 0
+ var queuedVms: Int = 0
+ var runningVms: Int = 0
+ var unscheduledVms: Int = 0
+ var finishedVms: Int = 0
}
/**
* Collect the metrics of the compute service.
*/
-public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
+fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
val res = ComputeMetrics()
try {
@@ -247,7 +227,7 @@ public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
/**
* Process the trace.
*/
-public suspend fun processTrace(
+suspend fun processTrace(
clock: Clock,
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
@@ -306,7 +286,7 @@ public suspend fun processTrace(
/**
* Create a [MeterProvider] instance for the experiment.
*/
-public fun createMeterProvider(clock: Clock): MeterProvider {
+fun createMeterProvider(clock: Clock): MeterProvider {
val powerSelector = InstrumentSelector.builder()
.setInstrumentNameRegex("power\\.usage")
.setInstrumentType(InstrumentType.VALUE_RECORDER)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index b70eefb2..cbb5bfd9 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -32,29 +32,30 @@ import org.opendc.compute.service.scheduler.*
import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.weights.*
+import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import org.opendc.format.trace.PerformanceInterferenceModelReader
+import org.opendc.experiments.capelin.trace.ParquetTraceReader
+import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
+import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
import java.io.File
+import java.io.FileInputStream
import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.asKotlinRandom
/**
* A portfolio represents a collection of scenarios are tested for the work.
*
* @param name The name of the portfolio.
*/
-public abstract class Portfolio(name: String) : Experiment(name) {
+abstract class Portfolio(name: String) : Experiment(name) {
/**
* The logger for this portfolio instance.
*/
@@ -71,34 +72,29 @@ public abstract class Portfolio(name: String) : Experiment(name) {
private val vmPlacements by anyOf(emptyMap<String, String>())
/**
- * The path to the performance interference model.
- */
- private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null)
-
- /**
* The topology to test.
*/
- public abstract val topology: Topology
+ abstract val topology: Topology
/**
* The workload to test.
*/
- public abstract val workload: Workload
+ abstract val workload: Workload
/**
* The operational phenomenas to consider.
*/
- public abstract val operationalPhenomena: OperationalPhenomena
+ abstract val operationalPhenomena: OperationalPhenomena
/**
* The allocation policies to consider.
*/
- public abstract val allocationPolicy: String
+ abstract val allocationPolicy: String
/**
* A map of trace readers.
*/
- private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>()
+ private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>()
/**
* Perform a single trial for this portfolio.
@@ -106,7 +102,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
@OptIn(ExperimentalCoroutinesApi::class)
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val environment = Sc20ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
+ val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = createComputeScheduler(seeder)
@@ -122,14 +118,17 @@ public abstract class Portfolio(name: String) : Experiment(name) {
val rawReaders = workloadNames.map { workloadName ->
traceReaders.computeIfAbsent(workloadName) {
logger.info { "Loading trace $workloadName" }
- Sc20RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
+ RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
}
}
- val performanceInterferenceModel = performanceInterferenceModel
- ?.takeIf { operationalPhenomena.hasInterference }
- ?.construct(seeder.asKotlinRandom()) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt())
+ val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
+ PerformanceInterferenceReader(FileInputStream(config.getString("interference-model")))
+ .use { VmInterferenceModel(it.read(), Random(seeder.nextLong())) }
+ else
+ null
+
+ val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val monitor = ParquetExperimentMonitor(
File(config.getString("output-path")),
@@ -137,7 +136,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
new file mode 100644
index 00000000..d73d14f5
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.env
+
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.environment.MachineDef
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import java.io.File
+import java.io.FileInputStream
+import java.io.InputStream
+import java.util.*
+
+/**
+ * A [EnvironmentReader] for the internal environment format.
+ *
+ * @param input The input stream describing the physical cluster.
+ */
+class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader {
+ /**
+ * Construct a [ClusterEnvironmentReader] for the specified [file].
+ */
+ constructor(file: File) : this(FileInputStream(file))
+
+ override fun read(): List<MachineDef> {
+ var clusterIdCol = 0
+ var speedCol = 0
+ var numberOfHostsCol = 0
+ var memoryPerHostCol = 0
+ var coresPerHostCol = 0
+
+ var clusterIdx = 0
+ var clusterId: String
+ var speed: Double
+ var numberOfHosts: Int
+ var memoryPerHost: Long
+ var coresPerHost: Int
+
+ val nodes = mutableListOf<MachineDef>()
+ val random = Random(0)
+
+ input.bufferedReader().use { reader ->
+ reader.lineSequence()
+ .filter { line ->
+ // Ignore comments in the file
+ !line.startsWith("#") && line.isNotBlank()
+ }
+ .forEachIndexed { idx, line ->
+ val values = line.split(";")
+
+ if (idx == 0) {
+ val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
+ clusterIdCol = header["ClusterID"]!!
+ speedCol = header["Speed"]!!
+ numberOfHostsCol = header["numberOfHosts"]!!
+ memoryPerHostCol = header["memoryCapacityPerHost"]!!
+ coresPerHostCol = header["coreCountPerHost"]!!
+ return@forEachIndexed
+ }
+
+ clusterIdx++
+ clusterId = values[clusterIdCol].trim()
+ speed = values[speedCol].trim().toDouble() * 1000.0
+ numberOfHosts = values[numberOfHostsCol].trim().toInt()
+ memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
+ coresPerHost = values[coresPerHostCol].trim().toInt()
+
+ val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
+ val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+
+ repeat(numberOfHosts) {
+ nodes.add(
+ MachineDef(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$it",
+ mapOf("cluster" to clusterId),
+ MachineModel(
+ List(coresPerHost) { coreId ->
+ ProcessingUnit(unknownProcessingNode, coreId, speed)
+ },
+ listOf(unknownMemoryUnit)
+ ),
+ // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
+ // power draw of 350W.
+ // Source: https://stackoverflow.com/questions/6128960
+ LinearPowerModel(350.0, idlePower = 200.0)
+ )
+ )
+ }
+ }
+ }
+
+ return nodes
+ }
+
+ override fun close() {
+ input.close()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
index 7f25137e..5ad75565 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
@@ -26,21 +26,17 @@ import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.Workload
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.TreeSet
/**
* A [TraceReader] for the internal VM workload trace format.
*
- * @param reader The internal trace reader to use.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
- * @param run The run to which this reader belongs.
+ * @param rawReaders The internal raw trace readers to use.
+ * @param workload The workload to read.
+ * @param seed The seed to use for sampling.
*/
-public class Sc20ParquetTraceReader(
- rawReaders: List<Sc20RawParquetTraceReader>,
- performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
+public class ParquetTraceReader(
+ rawReaders: List<RawParquetTraceReader>,
workload: Workload,
seed: Int
) : TraceReader<SimWorkload> {
@@ -59,20 +55,6 @@ public class Sc20ParquetTraceReader(
}
.map { sampleWorkload(it.first, workload, it.second, seed) }
.flatten()
- .run {
- // Apply performance interference model
- if (performanceInterferenceModel.isEmpty())
- this
- else {
- map { entry ->
- val id = entry.name
- val relevantPerformanceInterferenceModelItems =
- performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
-
- entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems))
- }
- }
- }
.iterator()
override fun hasNext(): Boolean = iterator.hasNext()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
new file mode 100644
index 00000000..a19f5699
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
+import java.io.InputStream
+
+/**
+ * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
+ *
+ * @param input The input stream to read from.
+ * @param mapper The Jackson object mapper to use.
+ */
+class PerformanceInterferenceReader(
+ private val input: InputStream,
+ private val mapper: ObjectMapper = jacksonObjectMapper()
+) : AutoCloseable {
+ init {
+ mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
+ }
+
+ /**
+ * Read the performance interface model from the input.
+ */
+ fun read(): List<VmInterferenceGroup> {
+ return mapper.readValue(input)
+ }
+
+ override fun close() {
+ input.close()
+ }
+
+ private data class GroupMixin(
+ @JsonProperty("minServerLoad")
+ val targetLoad: Double,
+ @JsonProperty("performanceScore")
+ val score: Double,
+ @JsonProperty("vms")
+ val members: Set<String>,
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
index 54151c9f..94193780 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin.trace
-import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -32,14 +31,12 @@ import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
import java.util.UUID
-private val logger = KotlinLogging.logger {}
-
/**
* A [TraceReader] for the internal VM workload trace format.
*
* @param path The directory of the traces.
*/
-public class Sc20RawParquetTraceReader(private val path: File) {
+class RawParquetTraceReader(private val path: File) {
/**
* Read the fragments into memory.
*/
@@ -136,14 +133,5 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the entries in the trace.
*/
- public fun read(): List<TraceEntry<SimWorkload>> = entries
-
- /**
- * Create a [TraceReader] instance.
- */
- public fun createReader(): TraceReader<SimWorkload> {
- return object : TraceReader<SimWorkload>, Iterator<TraceEntry<SimWorkload>> by entries.iterator() {
- override fun close() {}
- }
- }
+ fun read(): List<TraceEntry<SimWorkload>> = entries
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
index 6792c2ab..a3b45f47 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
@@ -33,8 +33,6 @@ import org.apache.parquet.io.api.Binary
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.format.util.LocalInputFile
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
@@ -44,7 +42,6 @@ import java.util.TreeSet
import java.util.UUID
import java.util.concurrent.ArrayBlockingQueue
import kotlin.concurrent.thread
-import kotlin.random.Random
private val logger = KotlinLogging.logger {}
@@ -52,14 +49,9 @@ private val logger = KotlinLogging.logger {}
* A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
*
* @param traceFile The directory of the traces.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ * @param selectedVms The list of VMs to read from the trace.
*/
-public class Sc20StreamingParquetTraceReader(
- traceFile: File,
- performanceInterferenceModel: PerformanceInterferenceModel? = null,
- selectedVms: List<String> = emptyList(),
- random: Random
-) : TraceReader<SimWorkload> {
+class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
@@ -227,14 +219,6 @@ public class Sc20StreamingParquetTraceReader(
buffers.remove(id)
}
- val relevantPerformanceInterferenceModelItems =
- if (performanceInterferenceModel != null)
- PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
- Random(random.nextInt())
- )
- else
- null
val workload = SimTraceWorkload(fragments)
val meta = mapOf(
"cores" to maxCores,
@@ -242,13 +226,7 @@ public class Sc20StreamingParquetTraceReader(
"workload" to workload
)
- TraceEntry(
- uid, id, submissionTime, workload,
- if (performanceInterferenceModel != null)
- meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any)
- else
- meta
- )
+ TraceEntry(uid, id, submissionTime, workload, meta)
}
.sortedBy { it.start }
.toList()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index d0031a66..7cd1f159 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -41,7 +41,6 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.format.trace.sc20.Sc20VmPlacementReader
import org.opendc.format.util.LocalOutputFile
import java.io.BufferedReader
import java.io.File
@@ -53,7 +52,7 @@ import kotlin.math.min
/**
* Represents the command for converting traces
*/
-public class TraceConverterCli : CliktCommand(name = "trace-converter") {
+class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The directory where the trace should be stored.
*/
@@ -149,24 +148,24 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The supported trace conversions.
*/
-public sealed class TraceConversion(name: String) : OptionGroup(name) {
+sealed class TraceConversion(name: String) : OptionGroup(name) {
/**
* Read the fragments of the trace.
*/
- public abstract fun read(
+ abstract fun read(
traceDirectory: File,
metaSchema: Schema,
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment>
}
-public class SolvinityConversion : TraceConversion("Solvinity") {
+class SolvinityConversion : TraceConversion("Solvinity") {
private val clusters by option()
.split(",")
private val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
.file(canBeDir = false)
- .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } }
+ .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } }
.required()
override fun read(
@@ -335,7 +334,7 @@ public class SolvinityConversion : TraceConversion("Solvinity") {
/**
* Conversion of the Bitbrains public trace.
*/
-public class BitbrainsConversion : TraceConversion("Bitbrains") {
+class BitbrainsConversion : TraceConversion("Bitbrains") {
override fun read(
traceDirectory: File,
metaSchema: Schema,
@@ -447,7 +446,7 @@ public class BitbrainsConversion : TraceConversion("Bitbrains") {
/**
* Conversion of the Azure public VM trace.
*/
-public class AzureConversion : TraceConversion("Azure") {
+class AzureConversion : TraceConversion("Azure") {
private val seed by option(help = "seed for trace sampling")
.long()
.default(0)
@@ -604,18 +603,18 @@ public class AzureConversion : TraceConversion("Azure") {
}
}
-public data class Fragment(
- public val id: String,
- public val tick: Long,
- public val flops: Long,
- public val duration: Long,
- public val usage: Double,
- public val cores: Int
+data class Fragment(
+ val id: String,
+ val tick: Long,
+ val flops: Long,
+ val duration: Long,
+ val usage: Double,
+ val cores: Int
)
-public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long)
+class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long)
/**
* A script to convert a trace in text format into a Parquet trace.
*/
-public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
new file mode 100644
index 00000000..7a1683f0
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.readValue
+import java.io.InputStream
+
+/**
+ * A parser for the JSON VM placement data files used for the TPDS article on Capelin.
+ *
+ * @param input The input stream to read from.
+ * @param mapper The Jackson object mapper to use.
+ */
+public class VmPlacementReader(
+ private val input: InputStream,
+ private val mapper: ObjectMapper = jacksonObjectMapper()
+) : AutoCloseable {
+ /**
+ * Read the VM placements from the input.
+ */
+ public fun read(): Map<String, String> {
+ return mapper.readValue<Map<String, String>>(input)
+ .mapKeys { "vm__workload__${it.key}.txt" }
+ .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00
+ }
+
+ override fun close() {
+ input.close()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 4b21b4f7..08e04ddf 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -34,12 +34,12 @@ import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
+import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
+import org.opendc.experiments.capelin.trace.ParquetTraceReader
+import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
@@ -161,9 +161,8 @@ class CapelinIntegrationTest {
* Obtain the trace reader for the test.
*/
private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return Sc20ParquetTraceReader(
- listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
- emptyMap(),
+ return ParquetTraceReader(
+ listOf(RawParquetTraceReader(File("src/test/resources/trace"))),
Workload("test", fraction),
seed
)
@@ -173,8 +172,8 @@ class CapelinIntegrationTest {
* Obtain the environment reader for the test.
*/
private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
- val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
- return Sc20ClusterEnvironmentReader(stream)
+ val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt"))
+ return ClusterEnvironmentReader(stream)
}
class TestExperimentReporter : ExperimentMonitor {
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
new file mode 100644
index 00000000..9b1513dc
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+
+/**
+ * Test suite for the [PerformanceInterferenceReader] class.
+ */
+class PerformanceInterferenceReaderTest {
+ @Test
+ fun testSmoke() {
+ val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
+ val reader = PerformanceInterferenceReader(input)
+
+ val result = reader.use { reader.read() }
+
+ assertAll(
+ { assertEquals(2, result.size) },
+ { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) },
+ { assertEquals(0.0, result[0].targetLoad, 0.001) },
+ { assertEquals(0.8830158730158756, result[0].score, 0.001) }
+ )
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json
new file mode 100644
index 00000000..1be5852b
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json
@@ -0,0 +1,22 @@
+[
+ {
+ "vms": [
+ "vm_a",
+ "vm_c",
+ "vm_x",
+ "vm_y"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "vm_a",
+ "vm_b",
+ "vm_c",
+ "vm_d"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]