summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 00:45:37 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 00:45:37 +0200
commit880c0783d7e20d9b082227a5cea685bfd76e4920 (patch)
treea8b0eb3bea29ca0febf2cc58063222c6cf5e2b0e /opendc
parent0068fd4e3c47e9bfb2b61c7c90527a2ccda3952d (diff)
perf: Various performance improvements
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt16
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt32
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt11
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt12
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt35
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt34
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt3
8 files changed, 89 insertions, 56 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
index 61b5759d..ac43b6ac 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
@@ -31,6 +31,7 @@ import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.metal.NODE_CLUSTER
+import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
@@ -143,17 +144,19 @@ suspend fun createProvisioner(
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: ExperimentReporter) {
val domain = simulationContext.domain
+ val clock = simulationContext.clock
val hypervisors = scheduler.drivers()
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
hypervisor.server.events
.onEach { event ->
+ val time = clock.millis()
when (event) {
is ServerEvent.StateChanged -> {
- reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ reporter.reportHostStateChange(time, hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
}
}
}
@@ -179,6 +182,11 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
}
}
.launchIn(domain)
+
+ val driver = hypervisor.server.services[BareMetalDriver.Key]
+ driver.powerDraw
+ .onEach { reporter.reportPowerConsumption(hypervisor.server, it) }
+ .launchIn(domain)
}
}
@@ -222,8 +230,10 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
// Monitor server events
server.events
.onEach {
+ val time = simulationContext.clock.millis()
+
if (it is ServerEvent.StateChanged) {
- reporter.reportVmStateChange(it.server)
+ reporter.reportVmStateChange(time, it.server)
}
delay(1)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
index 5e16b5e6..9455cb9d 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
@@ -133,7 +133,7 @@ public class ExperimentRunner(
performanceInterferenceModel: PerformanceInterferenceModel?,
run: Run
): TraceReader<VmWorkload> {
- val raw = rawTraceReaders.getOrPut(name) { Sc20RawParquetTraceReader(File(tracePath, name)) }
+ val raw = rawTraceReaders.getValue(name)
return Sc20ParquetTraceReader(
raw,
performanceInterferenceModel,
@@ -149,20 +149,6 @@ public class ExperimentRunner(
}
/**
- * Run the specified run.
- */
- private fun run(run: Run) {
- val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
- val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run)
- val environmentReader = createEnvironmentReader(run.scenario.topology.name)
- try {
- run.scenario(run, reporter, environmentReader, traceReader)
- } finally {
- reporter.close()
- }
- }
-
- /**
* Run the portfolios.
*/
@OptIn(ExperimentalStdlibApi::class)
@@ -179,6 +165,12 @@ public class ExperimentRunner(
val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!!
for (run in plan) {
val scenarioId = scenarioIds[run.scenario]!!
+
+ rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name ->
+ logger.info { "Loading trace $name" }
+ Sc20RawParquetTraceReader(File(tracePath, name))
+ }
+
launch(dispatcher) {
launch(mainDispatcher) {
helper.startRun(scenarioId, run.id)
@@ -189,7 +181,15 @@ public class ExperimentRunner(
try {
val duration = measureTimeMillis {
- run(run)
+ val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
+ val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run)
+ val environmentReader = createEnvironmentReader(run.scenario.topology.name)
+
+ try {
+ run.scenario(run, reporter, environmentReader, traceReader)
+ } finally {
+ reporter.close()
+ }
}
finished.incrementAndGet()
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
index f2ac84a1..963d47e9 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
@@ -72,7 +72,7 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") {
// Workload("solvinity", 0.1),
// Workload("solvinity", 0.25),
// Workload("small-parquet", 0.5),
- Workload("small-parquet", 1.0)
+ Workload("full-traces", 0.10)
)
override val operationalPhenomena = listOf(
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt
index 6089271e..bb3466ba 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt
@@ -26,8 +26,11 @@ package com.atlarge.opendc.experiments.sc20
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.format.trace.TraceEntry
+import mu.KotlinLogging
import kotlin.random.Random
+private val logger = KotlinLogging.logger {}
+
/**
* Sample the workload for the specified [run].
*/
@@ -39,7 +42,8 @@ fun sampleWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEnt
* Sample a regular (non-HPC) workload.
*/
fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEntry<VmWorkload>> {
- if (run.scenario.workload.fraction >= 1) {
+ val fraction = run.scenario.workload.fraction
+ if (fraction >= 1) {
return trace
}
@@ -50,7 +54,7 @@ fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<T
for (entry in shuffled) {
val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > run.scenario.workload.fraction) {
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
}
@@ -58,5 +62,8 @@ fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<T
res += entry
}
+ logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+
return res
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt
index 0403a3b5..1c752cb1 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt
@@ -35,12 +35,13 @@ interface ExperimentReporter : Closeable {
/**
* This method is invoked when the state of a VM changes.
*/
- suspend fun reportVmStateChange(server: Server) {}
+ fun reportVmStateChange(time: Long, server: Server) {}
/**
* This method is invoked when the state of a host changes.
*/
- suspend fun reportHostStateChange(
+ fun reportHostStateChange(
+ time: Long,
driver: VirtDriver,
server: Server,
submittedVms: Long,
@@ -50,9 +51,14 @@ interface ExperimentReporter : Closeable {
) {}
/**
+ * Report the power consumption of a host.
+ */
+ fun reportPowerConsumption(host: Server, draw: Double)
+
+ /**
* This method is invoked for a host for each slice that is finishes.
*/
- suspend fun reportHostSlice(
+ fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
index 6b3351d4..58d384e7 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
@@ -27,9 +27,7 @@ package com.atlarge.opendc.experiments.sc20.reporter
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import kotlinx.coroutines.flow.first
import mu.KotlinLogging
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
@@ -87,9 +85,10 @@ class ExperimentParquetReporter(destination: File) :
}
}
- override suspend fun reportVmStateChange(server: Server) {}
+ override fun reportVmStateChange(time: Long, server: Server) {}
- override suspend fun reportHostStateChange(
+ override fun reportHostStateChange(
+ time: Long,
driver: VirtDriver,
server: Server,
submittedVms: Long,
@@ -97,11 +96,13 @@ class ExperimentParquetReporter(destination: File) :
runningVms: Long,
finishedVms: Long
) {
+ logger.info("Host ${server.uid} changed state ${server.state} [$time]")
+
val lastServerState = lastServerStates[server]
if (server.state == ServerState.SHUTOFF && lastServerState != null) {
- val duration = simulationContext.clock.millis() - lastServerState.second
+ val duration = time - lastServerState.second
reportHostSlice(
- simulationContext.clock.millis(),
+ time,
0,
0,
0,
@@ -116,14 +117,21 @@ class ExperimentParquetReporter(destination: File) :
finishedVms,
duration
)
+
+ lastServerStates.remove(server)
+ lastPowerConsumption.remove(server)
+ } else {
+ lastServerStates[server] = Pair(server.state, time)
}
+ }
- logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]")
+ private val lastPowerConsumption = mutableMapOf<Server, Double>()
- lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
+ override fun reportPowerConsumption(host: Server, draw: Double) {
+ lastPowerConsumption[host] = draw
}
- override suspend fun reportHostSlice(
+ override fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
@@ -139,11 +147,6 @@ class ExperimentParquetReporter(destination: File) :
finishedVms: Long,
duration: Long
) {
- // Assume for now that the host is not virtualized and measure the current power draw
- val driver = hostServer.services[BareMetalDriver.Key]
- val usage = driver.usage.first()
- val powerDraw = driver.powerDraw.first()
-
val record = GenericData.Record(schema)
record.put("time", time)
record.put("duration", duration)
@@ -156,8 +159,8 @@ class ExperimentParquetReporter(destination: File) :
record.put("image_count", numberOfDeployedImages)
record.put("server", hostServer.uid)
record.put("host_state", hostServer.state)
- record.put("host_usage", usage)
- record.put("power_draw", powerDraw)
+ record.put("host_usage", cpuUsage)
+ record.put("power_draw", lastPowerConsumption[hostServer] ?: 200.0)
record.put("total_submitted_vms", submittedVms)
record.put("total_queued_vms", queuedVms)
record.put("total_running_vms", runningVms)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
index 5de3535d..a92278d8 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
@@ -27,9 +27,7 @@ package com.atlarge.opendc.experiments.sc20.reporter
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import kotlinx.coroutines.flow.first
import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
@@ -37,9 +35,10 @@ private val logger = KotlinLogging.logger {}
class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: PostgresHostMetricsWriter) : ExperimentReporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
- override suspend fun reportVmStateChange(server: Server) {}
+ override fun reportVmStateChange(time: Long, server: Server) {}
- override suspend fun reportHostStateChange(
+ override fun reportHostStateChange(
+ time: Long,
driver: VirtDriver,
server: Server,
submittedVms: Long,
@@ -48,10 +47,12 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P
finishedVms: Long
) {
val lastServerState = lastServerStates[server]
+ logger.debug("Host ${server.uid} changed state ${server.state} [$time]")
+
if (server.state == ServerState.SHUTOFF && lastServerState != null) {
- val duration = simulationContext.clock.millis() - lastServerState.second
+ val duration = time - lastServerState.second
reportHostSlice(
- simulationContext.clock.millis(),
+ time,
0,
0,
0,
@@ -66,14 +67,23 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P
finishedVms,
duration
)
+
+ lastServerStates.remove(server)
+ lastPowerConsumption.remove(server)
+ } else {
+ lastServerStates[server] = Pair(server.state, time)
}
+ }
+
- logger.debug("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]")
+ private val lastPowerConsumption = mutableMapOf<Server, Double>()
- lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
+ override fun reportPowerConsumption(host: Server, draw: Double) {
+ lastPowerConsumption[host] = draw
}
- override suspend fun reportHostSlice(
+
+ override fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
@@ -89,10 +99,6 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P
finishedVms: Long,
duration: Long
) {
- // Assume for now that the host is not virtualized and measure the current power draw
- val driver = hostServer.services[BareMetalDriver.Key]
- val powerDraw = driver.powerDraw.first()
-
writer.write(
scenario, run, HostMetrics(
time,
@@ -105,7 +111,7 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P
interferedBurst,
cpuUsage,
cpuDemand,
- powerDraw
+ lastPowerConsumption[hostServer] ?: 200.0
)
)
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
index f19c9275..485c2922 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -94,7 +94,6 @@ class Sc20RawParquetTraceReader(private val path: File) {
var counter = 0
val entries = mutableListOf<TraceEntryImpl>()
- val loadCache = mutableListOf<LoadCacheEntry>()
return try {
while (true) {
@@ -106,6 +105,8 @@ class Sc20RawParquetTraceReader(private val path: File) {
val requiredMemory = record["requiredMemory"] as Long
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+ logger.info { "VM $id" }
+
val vmFragments = fragments.getValue(id).asSequence()
val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
val vmWorkload = VmWorkload(