summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-12 15:44:53 +0200
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-12 15:44:53 +0200
commit310daf42af741dee2f11d98eb929d2b6c0db141c (patch)
treea0570137b64651bfecd0d3a1d0e0383c2c327cea /opendc/opendc-compute/src
parent5f141c8b6aa6cfe96333f0cc02015e490b90fca6 (diff)
parent4a5ef5a41c8e008d5c09261de550d3f55eaa3348 (diff)
Merge branch 'bug/virt-driver-behavior' into '2.x'
Address multiple (performance) issues See merge request opendc/opendc-simulator!58
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt10
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt8
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt74
4 files changed, 58 insertions, 38 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index 9ad88c17..b0688f99 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -3,6 +3,7 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
+import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.ensureActive
import java.util.UUID
@@ -19,17 +20,20 @@ class VmImage(
) : Image {
override suspend fun invoke(ctx: ServerContext) {
+ val clock = simulationContext.clock
+ val job = coroutineContext[Job]!!
+
for (fragment in flopsHistory) {
- coroutineContext.ensureActive()
+ job.ensureActive()
if (fragment.flops == 0L) {
delay(fragment.duration)
} else {
val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
val burst = LongArray(cores) { fragment.flops / cores }
- val usage = DoubleArray(cores) { fragment.usage }
+ val usage = DoubleArray(cores) { fragment.usage / cores }
- ctx.run(burst, usage, simulationContext.clock.millis() + fragment.duration)
+ ctx.run(burst, usage, clock.millis() + fragment.duration)
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 8e15584a..844938db 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -62,6 +62,7 @@ import kotlin.math.min
import kotlinx.coroutines.withContext
import java.lang.Exception
import kotlin.coroutines.ContinuationInterceptor
+import kotlin.random.Random
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
@@ -116,6 +117,11 @@ public class SimpleBareMetalDriver(
override val powerDraw: Flow<Double> = powerModel(this)
+ /**
+ * The internal random instance.
+ */
+ private val random = Random(0)
+
override suspend fun init(): Node = withContext(domain.coroutineContext) {
nodeState.value
}
@@ -128,7 +134,7 @@ public class SimpleBareMetalDriver(
val events = EventFlow<ServerEvent>()
val server = Server(
- UUID.randomUUID(),
+ UUID(node.uid.leastSignificantBits xor node.uid.mostSignificantBits, random.nextLong()),
node.name,
emptyMap(),
flavor,
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
index 9ceb8bfc..7c088bc8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
@@ -60,6 +60,8 @@ public sealed class HypervisorEvent {
* it did not have the capacity.
* @property interferedBurst The sum of CPU time that virtual machines could not utilize due to performance
* interference.
+ * @property cpuUsage CPU use in megahertz.
+ * @property cpuDemand CPU demand in megahertz.
* @property numberOfDeployedImages The number of images deployed on this hypervisor.
*/
public data class SliceFinished(
@@ -68,6 +70,8 @@ public sealed class HypervisorEvent {
public val grantedBurst: Long,
public val overcommissionedBurst: Long,
public val interferedBurst: Long,
+ public val cpuUsage: Double,
+ public val cpuDemand: Double,
public val numberOfDeployedImages: Int,
public val hostServer: Server
) : HypervisorEvent()
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 5f15084d..d81b8825 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -42,6 +42,7 @@ import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -58,11 +59,12 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
+import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
-import java.lang.Exception
import java.util.Objects
import java.util.TreeSet
import java.util.UUID
+import kotlin.coroutines.resume
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -180,7 +182,7 @@ class SimpleVirtDriver(
val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }
val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>()
- val requests = TreeSet<CpuRequest>()
+ val requests = TreeSet(cpuRequestComparator)
val usage = DoubleArray(hostContext.cpus.size)
val burst = LongArray(hostContext.cpus.size)
@@ -237,7 +239,8 @@ class SimpleVirtDriver(
deadline = min(deadline, req.vm.deadline)
}
- duration = ceil(duration)
+ // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs.
+ duration = max(300.0, ceil(duration))
val totalAllocatedUsage = maxUsage - availableUsage
var totalAllocatedBurst = 0L
@@ -246,14 +249,14 @@ class SimpleVirtDriver(
// Divide the requests over the available capacity of the pCPUs fairly
for (i in pCPUs) {
- val remaining = hostContext.cpus.size - i
- val availableShare = availableUsage / remaining
- val grantedUsage = min(hostContext.cpus[i].frequency, availableShare)
- val pBurst = ceil(duration * grantedUsage).toLong()
+ val maxCpuUsage = hostContext.cpus[i].frequency
+ val fraction = maxCpuUsage / maxUsage
+ val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction)
+ val grantedBurst = ceil(duration * grantedUsage).toLong()
usage[i] = grantedUsage
- burst[i] = pBurst
- totalAllocatedBurst += pBurst
+ burst[i] = grantedBurst
+ totalAllocatedBurst += grantedBurst
availableUsage -= grantedUsage
}
@@ -308,9 +311,7 @@ class SimpleVirtDriver(
if (req.burst <= 0L || req.isCancelled) {
hasFinished = true
- }
-
- if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) {
+ } else if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) {
// Request must have its entire burst consumed or otherwise we have overcommission
// Note that we count the overcommissioned burst if the hypervisor has failed.
totalOvercommissionedBurst += req.burst
@@ -323,7 +324,7 @@ class SimpleVirtDriver(
requests.removeAll(vmRequests)
// Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vm.chan.send(Unit)
+ vm.cont?.resume(Unit)
}
}
@@ -335,7 +336,9 @@ class SimpleVirtDriver(
min(totalRequestedBurst, totalAllocatedBurst),
min(totalRequestedBurst, totalGrantedBurst), // We can run more than requested due to timing
totalOvercommissionedBurst,
- totalInterferedBurst, // Might be smaller than zero due to FP rounding errors
+ totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
+ totalAllocatedUsage,
+ totalRequestedUsage,
vmCount, // Some VMs might already have finished, so keep initial VM count
server
)
@@ -344,6 +347,25 @@ class SimpleVirtDriver(
}
/**
+ * The [Comparator] for [CpuRequest].
+ */
+ private val cpuRequestComparator: Comparator<CpuRequest> = Comparator { lhs, rhs ->
+ var cmp = lhs.limit.compareTo(rhs.limit)
+
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid)
+
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ lhs.vcpu.id.compareTo(rhs.vcpu.id)
+ }
+
+ /**
* A request to schedule a virtual CPU on the host cpu.
*/
internal data class CpuRequest(
@@ -351,7 +373,7 @@ class SimpleVirtDriver(
val vcpu: ProcessingUnit,
var burst: Long,
val limit: Double
- ) : Comparable<CpuRequest> {
+ ) {
/**
* The usage that was actually granted.
*/
@@ -364,22 +386,6 @@ class SimpleVirtDriver(
override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu
override fun hashCode(): Int = Objects.hash(vm, vcpu)
-
- override fun compareTo(other: CpuRequest): Int {
- var cmp = limit.compareTo(other.limit)
-
- if (cmp != 0) {
- return cmp
- }
-
- cmp = vm.server.uid.compareTo(other.vm.server.uid)
-
- if (cmp != 0) {
- return cmp
- }
-
- return vcpu.id.compareTo(other.vcpu.id)
- }
}
internal inner class VmServerContext(
@@ -390,7 +396,7 @@ class SimpleVirtDriver(
private var finalized: Boolean = false
lateinit var burst: LongArray
var deadline: Long = 0L
- var chan = Channel<Unit>(Channel.RENDEZVOUS)
+ var cont: CancellableContinuation<Unit>? = null
private var initialized: Boolean = false
internal val job: Job = launch {
@@ -462,13 +468,13 @@ class SimpleVirtDriver(
// Wait until the burst has been run or the coroutine is cancelled
try {
schedulingQueue.send(SchedulerCommand.Schedule(this, requests))
- chan.receive()
+ suspendCancellableCoroutine<Unit> { cont = it }
} catch (e: CancellationException) {
// Deschedule the VM
withContext(NonCancellable) {
requests.forEach { it.isCancelled = true }
schedulingQueue.send(SchedulerCommand.Interrupt)
- chan.receive()
+ suspendCancellableCoroutine<Unit> { cont = it }
}
e.assertFailure()