summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-15 16:12:25 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-15 17:25:41 +0200
commiteab4c190142f54291ed235e4e18f3a35385a541c (patch)
treea970a585d381ab26e76598a647612f083be3861b
parenta77829c7a8cf300a99a695423d85f905e0209286 (diff)
perf: Optimize trace loading for memory usage
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt22
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt1
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt6
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt2
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts1
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt21
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt9
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt71
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt2
11 files changed, 82 insertions, 57 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index b0688f99..b37f05a7 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -14,7 +14,7 @@ class VmImage(
public override val uid: UUID,
public override val name: String,
public override val tags: TagContainer,
- public val flopsHistory: List<FlopsHistoryFragment>,
+ public val flopsHistory: Sequence<FlopsHistoryFragment>,
public val maxCores: Int,
public val requiredMemory: Long
) : Image {
@@ -23,17 +23,19 @@ class VmImage(
val clock = simulationContext.clock
val job = coroutineContext[Job]!!
- for (fragment in flopsHistory) {
- job.ensureActive()
+ for (fragments in flopsHistory.chunked(1024)) {
+ for (fragment in fragments) {
+ job.ensureActive()
- if (fragment.flops == 0L) {
- delay(fragment.duration)
- } else {
- val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
- val burst = LongArray(cores) { fragment.flops / cores }
- val usage = DoubleArray(cores) { fragment.usage / cores }
+ if (fragment.flops == 0L) {
+ delay(fragment.duration)
+ } else {
+ val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
+ val burst = LongArray(cores) { fragment.flops / cores }
+ val usage = DoubleArray(cores) { fragment.usage / cores }
- ctx.run(burst, usage, clock.millis() + fragment.duration)
+ ctx.run(burst, usage, clock.millis() + fragment.duration)
+ }
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
index ddf9bb33..45024a49 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
@@ -64,7 +64,7 @@ class PerformanceInterferenceModel(
this.items.addAll(items)
}
- fun vmStarted(server: Server) {
+ fun vmStarted(server: Server) {
colocatedWorkloads.add(server.image.name)
intersectingItems = items.filter { item -> doesMatch(item) }
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
index e7344fa6..7c088bc8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
@@ -26,7 +26,6 @@ package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.service.VirtProvisioningService
/**
* An event that is emitted by a [VirtDriver].
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index f32f407c..2c25c0fa 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -42,27 +42,21 @@ import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
-import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
-import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlinx.coroutines.withContext
import java.util.Objects
import java.util.TreeSet
import java.util.UUID
-import kotlin.coroutines.resume
-import kotlin.coroutines.suspendCoroutine
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 5ed58fee..09036d0d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,7 +1,6 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.odcsim.SimulationContext
-import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
@@ -20,7 +19,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 695f7274..2ad7df84 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -3,10 +3,8 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
-import kotlinx.coroutines.flow.Flow
/**
* A service for VM provisioning on a cloud.
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index d23456a8..4b73cedd 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -32,6 +32,7 @@ plugins {
application {
mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt"
+ applicationDefaultJvmArgs = listOf("-Xmx3096M")
}
dependencies {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 464f4fc6..7c198e56 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -25,7 +25,7 @@ class Sc20Monitor(
.name("duration").type().longType().noDefault()
.name("requestedBurst").type().longType().noDefault()
.name("grantedBurst").type().longType().noDefault()
- .name("overcommisionedBurst").type().longType().noDefault()
+ .name("overcommissionedBurst").type().longType().noDefault()
.name("interferedBurst").type().longType().noDefault()
.name("cpuUsage").type().doubleType().noDefault()
.name("cpuDemand").type().doubleType().noDefault()
@@ -48,7 +48,14 @@ class Sc20Monitor(
suspend fun onVmStateChanged(server: Server) {}
- suspend fun serverStateChanged(driver: VirtDriver, server: Server) {
+ suspend fun serverStateChanged(
+ driver: VirtDriver,
+ server: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long
+ ) {
val lastServerState = lastServerStates[server]
if (server.state == ServerState.SHUTOFF && lastServerState != null) {
val duration = simulationContext.clock.millis() - lastServerState.second
@@ -62,10 +69,10 @@ class Sc20Monitor(
0.0,
0,
server,
- 0,
- 0,
- 0,
- 0,
+ submittedVms,
+ queuedVms,
+ runningVms,
+ finishedVms,
duration
)
}
@@ -101,7 +108,7 @@ class Sc20Monitor(
record.put("duration", duration)
record.put("requestedBurst", requestedBurst)
record.put("grantedBurst", grantedBurst)
- record.put("overcommisionedBurst", overcommissionedBurst)
+ record.put("overcommissionedBurst", overcommissionedBurst)
record.put("interferedBurst", interferedBurst)
record.put("cpuUsage", cpuUsage)
record.put("cpuDemand", cpuDemand)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 08815720..3aef80e6 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -29,7 +29,6 @@ import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.NODE_CLUSTER
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
@@ -174,20 +173,16 @@ fun main(args: Array<String>) {
delay(10)
val hypervisors = scheduler.drivers()
- var availableHypervisors = hypervisors.size
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server)
+ monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
hypervisor.server.events
.onEach { event ->
when (event) {
is ServerEvent.StateChanged -> {
- monitor.serverStateChanged(hypervisor, event.server)
-
- if (event.server.state == ServerState.ERROR)
- availableHypervisors -= 1
+ monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
}
}
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index 96daa2ce..d4eef029 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -37,6 +37,7 @@ import java.io.File
import java.io.FileReader
import java.util.UUID
import kotlin.math.max
+import kotlin.math.min
/**
* A [TraceReader] for the internal VM workload trace format.
@@ -81,10 +82,13 @@ class Sc20TraceReader(
vms
.forEachIndexed { idx, vmFile ->
println(vmFile)
- val flopsHistory = mutableListOf<FlopsHistoryFragment>()
+
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
+ var timestamp = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
BufferedReader(FileReader(vmFile)).use { reader ->
reader.lineSequence()
@@ -96,33 +100,61 @@ class Sc20TraceReader(
val values = line.split(" ")
vmId = vmFile.name
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
- val cores = values[coreCol].trim().toInt()
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ cores = values[coreCol].trim().toInt()
requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ }
+ }
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- if (flopsHistory.isEmpty()) {
- flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores))
- } else {
- // Restrict merging to empty fragments for now
- if (flopsHistory.last().flops == 0L && flops == 0L) {
- val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
- flopsHistory.add(
+ val flopsFragments = sequence {
+ var last: FlopsHistoryFragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(1024)
+ .forEach { lines ->
+ val res = ArrayList<FlopsHistoryFragment>(lines.size)
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(" ")
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
FlopsHistoryFragment(
oldFragment.tick,
oldFragment.flops + flops,
oldFragment.duration + traceInterval,
cpuUsage,
- cores)
- )
- } else {
- flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores))
+ cores
+ )
+ } else {
+ val fragment =
+ FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)
+ if (last != null) {
+ res.add(last!!)
+ }
+ fragment
+ }
}
+
+ yieldAll(res)
}
+
+ if (last != null) {
+ yield(last!!)
}
+ }
}
val uuid = UUID(0, idx.toLong())
@@ -131,20 +163,19 @@ class Sc20TraceReader(
PerformanceInterferenceModel(
performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet()
)
-
val vmWorkload = VmWorkload(
uuid, "VM Workload $vmId", UnnamedUser,
VmImage(
uuid,
vmId,
mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- flopsHistory,
+ flopsFragments,
maxCores,
requiredMemory
)
)
entries[uuid] = TraceEntryImpl(
- flopsHistory.firstOrNull()?.tick ?: -1,
+ minTime,
vmWorkload
)
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
index be9ddfaa..fe1049d9 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
@@ -134,7 +134,7 @@ class VmTraceReader(
uuid,
vmId.toString(),
mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- flopsHistory,
+ flopsHistory.asSequence(),
cores,
requiredMemory
)