summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-13 23:36:39 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-13 23:36:39 +0200
commitefd62bbc3bbe236503095e5cb27af42423500d85 (patch)
tree47f9a6ed78870d889643b586a278997b969c0e71
parent0ffe0e98a0617bf5a9524fe806ac43eeebd5d7ce (diff)
feat: Add workload sampling
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt12
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt32
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt4
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt62
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt)24
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt26
-rw-r--r--opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml3
8 files changed, 131 insertions, 38 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
index 6e6da2c8..5e16b5e6 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
@@ -27,7 +27,7 @@ package com.atlarge.opendc.experiments.sc20
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
-import com.atlarge.opendc.experiments.sc20.trace.Sc20FilteringParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper
import com.atlarge.opendc.format.environment.EnvironmentReader
@@ -44,7 +44,6 @@ import java.io.File
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import javax.sql.DataSource
-import kotlin.random.Random
import kotlin.system.measureTimeMillis
/**
@@ -132,14 +131,13 @@ public class ExperimentRunner(
private fun createTraceReader(
name: String,
performanceInterferenceModel: PerformanceInterferenceModel?,
- seed: Int
+ run: Run
): TraceReader<VmWorkload> {
val raw = rawTraceReaders.getOrPut(name) { Sc20RawParquetTraceReader(File(tracePath, name)) }
- return Sc20FilteringParquetTraceReader(
+ return Sc20ParquetTraceReader(
raw,
performanceInterferenceModel,
- emptySet(),
- Random(seed)
+ run
)
}
@@ -155,7 +153,7 @@ public class ExperimentRunner(
*/
private fun run(run: Run) {
val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
- val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run.seed)
+ val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run)
val environmentReader = createEnvironmentReader(run.scenario.topology.name)
try {
run.scenario(run, reporter, environmentReader, traceReader)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
index 9dfed03b..d5990c01 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
@@ -43,6 +43,7 @@ import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.choice
import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.int
import com.zaxxer.hikari.HikariDataSource
import mu.KotlinLogging
import java.io.File
@@ -83,7 +84,6 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file")
.file()
.convert { it.inputStream() as InputStream }
- .defaultLazy { ExperimentCli::class.java.getResourceAsStream("/env/performance-interference.json") }
/**
* The path to the original VM placements file.
@@ -109,21 +109,39 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
private val portfolios by option("--portfolio")
.choice(
"hor-ver" to HorVerPortfolio,
- "more-velocitory" to MoreVelocityPortfolio,
+ "more-velocity" to MoreVelocityPortfolio,
"more-hpc" to MoreHpcPortfolio,
"operational-phenomena" to OperationalPhenomenaPortfolio,
ignoreCase = true
)
.multiple()
+ /**
+ * The maximum number of threads to use.
+ */
+ private val parallelism by option("--parallelism")
+ .int()
+ .default(Runtime.getRuntime().availableProcessors())
+
+ /**
+ * The batch size for writing results.
+ */
+ private val batchSize by option("--batch-size")
+ .int()
+ .default(4096)
+
override fun run() {
val ds = HikariDataSource()
ds.jdbcUrl = jdbcUrl
ds.addDataSourceProperty("reWriteBatchedInserts", "true")
- val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream)
- .construct()
- val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel)
+
+ reporter.batchSize = batchSize
+
+ val performanceInterferenceModel =
+ performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() }
+
+ val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel, parallelism)
try {
runner.run()
@@ -138,6 +156,8 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
* An option for specifying the type of reporter to use.
*/
internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider {
+ var batchSize = 4096
+
class Parquet : Reporter("Options for reporting using Parquet") {
private val path by option("--parquet-directory", help = "path to where the output should be stored")
.file()
@@ -153,7 +173,7 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo
lateinit var hostWriter: PostgresHostMetricsWriter
override fun init(ds: DataSource) {
- hostWriter = PostgresHostMetricsWriter(ds, 4096)
+ hostWriter = PostgresHostMetricsWriter(ds, batchSize)
}
override fun createReporter(scenario: Long, run: Int): ExperimentReporter =
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
index 58acd168..04bddce3 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
@@ -30,7 +30,7 @@ abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) {
abstract val operationalPhenomena: List<Pair<Boolean, Boolean>>
abstract val allocationPolicies: List<String>
- open val repetitions = 2
+ open val repetitions = 4
override val scenarios: Sequence<Scenario> = sequence {
for (topology in topologies) {
@@ -72,7 +72,7 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") {
// Workload("solvinity", 0.1),
// Workload("solvinity", 0.25),
// Workload("small-parquet", 0.5),
- Workload("small-parquet", 1.0)
+ Workload("small-parquet", 0.5)
)
override val operationalPhenomena = listOf(
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt
new file mode 100644
index 00000000..6089271e
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt
@@ -0,0 +1,62 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.format.trace.TraceEntry
+import kotlin.random.Random
+
+/**
+ * Sample the workload for the specified [run].
+ */
+fun sampleWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEntry<VmWorkload>> {
+ return sampleRegularWorkload(trace, run)
+}
+
+/**
+ * Sample a regular (non-HPC) workload.
+ */
+fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEntry<VmWorkload>> {
+ if (run.scenario.workload.fraction >= 1) {
+ return trace
+ }
+
+ val shuffled = trace.shuffled(Random(run.seed))
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ var currentLoad = 0.0
+
+ for (entry in shuffled) {
+ val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > run.scenario.workload.fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ return res
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
index a47258b4..bfa89e3a 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
@@ -43,7 +43,7 @@ public abstract class PostgresMetricsWriter<T>(
/**
* The queue of commands to process.
*/
- private val queue: BlockingQueue<Action> = ArrayBlockingQueue(batchSize)
+ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(12 * batchSize)
/**
* The thread for the actual writer.
@@ -93,9 +93,8 @@ public abstract class PostgresMetricsWriter<T>(
if (queue.isEmpty()) {
actions.add(queue.take())
- } else {
- queue.drainTo(actions)
}
+ queue.drainTo(actions)
for (action in actions) {
when (action) {
@@ -110,6 +109,7 @@ public abstract class PostgresMetricsWriter<T>(
stmt.executeBatch()
conn.commit()
}
+
}
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
index 1e9950de..7cc713bc 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
@@ -28,40 +28,37 @@ import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.experiments.sc20.Run
+import com.atlarge.opendc.experiments.sc20.sampleWorkload
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import kotlin.random.Random
/**
- * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
+ * A [TraceReader] for the internal VM workload trace format.
*
- * @param traceFile The directory of the traces.
+ * @param reader The internal trace reader to use.
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ * @param run The run to which this reader belongs.
*/
@OptIn(ExperimentalStdlibApi::class)
-class Sc20FilteringParquetTraceReader(
+class Sc20ParquetTraceReader(
raw: Sc20RawParquetTraceReader,
performanceInterferenceModel: PerformanceInterferenceModel?,
- selectedVms: Set<String>,
- random: Random
+ run: Run
) : TraceReader<VmWorkload> {
/**
* The iterator over the actual trace.
*/
private val iterator: Iterator<TraceEntry<VmWorkload>> =
raw.read()
- .run {
- // Apply VM selection filter
- if (selectedVms.isEmpty())
- this
- else
- filter { it.workload.image.name in selectedVms }
- }
+ .run { sampleWorkload(this, run) }
.run {
// Apply performance interference model
if (performanceInterferenceModel == null)
this
- else
+ else {
+ val random = Random(run.seed)
map { entry ->
val image = entry.workload.image
val id = image.name
@@ -82,6 +79,7 @@ class Sc20FilteringParquetTraceReader(
val newWorkload = entry.workload.copy(image = newImage)
Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
}
+ }
}
.iterator()
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
index 632746a2..f19c9275 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -94,29 +94,36 @@ class Sc20RawParquetTraceReader(private val path: File) {
var counter = 0
val entries = mutableListOf<TraceEntryImpl>()
+ val loadCache = mutableListOf<LoadCacheEntry>()
return try {
while (true) {
val record = metaReader.read() ?: break
val id = record["id"].toString()
val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
val maxCores = record["maxCores"] as Int
val requiredMemory = record["requiredMemory"] as Long
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+ val vmFragments = fragments.getValue(id).asSequence()
+ val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
val vmWorkload = VmWorkload(
- uid, "VM Workload $id",
+ uid, id,
UnnamedUser,
VmImage(
uid,
id,
- emptyMap(),
- fragments.getValue(id).asSequence(),
+ mapOf(
+ "submit-time" to submissionTime,
+ "end-time" to endTime,
+ "total-load" to totalLoad
+ ),
+ vmFragments,
maxCores,
requiredMemory
)
)
-
entries.add(TraceEntryImpl(submissionTime, vmWorkload))
}
@@ -129,17 +136,17 @@ class Sc20RawParquetTraceReader(private val path: File) {
/**
* The entries in the trace.
*/
- private val entries: Sequence<TraceEntryImpl>
+ private val entries: List<TraceEntryImpl>
init {
val fragments = parseFragments(path)
- entries = parseMeta(path, fragments).asSequence()
+ entries = parseMeta(path, fragments)
}
/**
* Read the entries in the trace.
*/
- public fun read(): Sequence<TraceEntry<VmWorkload>> = entries
+ public fun read(): List<TraceEntry<VmWorkload>> = entries
/**
* An unnamed user.
@@ -156,4 +163,9 @@ class Sc20RawParquetTraceReader(private val path: File) {
override var submissionTime: Long,
override val workload: VmWorkload
) : TraceEntry<VmWorkload>
+
+ /**
+ * A load cache entry.
+ */
+ data class LoadCacheEntry(val vm: String, val totalLoad: Double, val start: Long, val end: Long)
}
diff --git a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml
index f9a5a79e..f47a6da8 100644
--- a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml
+++ b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml
@@ -36,6 +36,9 @@
<Logger name="com.atlarge.opendc" level="warn" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
+ <Logger name="com.atlarge.opendc.experiments.sc20" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
<Logger name="org.apache.hadoop" level="warn" additivity="false">
<AppenderRef ref="Console"/>
</Logger>