summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-20 15:21:35 +0200
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-20 15:21:35 +0200
commit6981d5581e2ce5c6df42dfbf133c350bd9c35a0f (patch)
tree74e8993babb28dd56950f1da69eda2d97735a0e8 /opendc/opendc-compute/src
parent60372f0022d423efd5267ef4008d9afcbe870911 (diff)
parent3e056406616860c77168d827f1ca9d8d3c79c08e (diff)
Merge branch 'bug/experiment-issues' into '2.x'
Address issues during experiments See merge request opendc/opendc-simulator!61
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt129
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt6
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt64
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt44
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt7
6 files changed, 203 insertions, 49 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index b0688f99..36bbfa45 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -14,7 +14,7 @@ class VmImage(
public override val uid: UUID,
public override val name: String,
public override val tags: TagContainer,
- public val flopsHistory: List<FlopsHistoryFragment>,
+ public val flopsHistory: Sequence<FlopsHistoryFragment>,
public val maxCores: Int,
public val requiredMemory: Long
) : Image {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
new file mode 100644
index 00000000..45024a49
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
@@ -0,0 +1,129 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.core.workload
+
+import com.atlarge.opendc.compute.core.Server
+import java.util.SortedSet
+import java.util.TreeSet
+import kotlin.random.Random
+
+/**
+ * Meta-data key for the [PerformanceInterferenceModel] of an image.
+ */
+const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference"
+
+/**
+ * Performance Interference Model describing the variability incurred by different sets of workloads if colocated.
+ *
+ * @param items The [PerformanceInterferenceModelItem]s that make up this model.
+ */
+class PerformanceInterferenceModel(
+ items: Set<PerformanceInterferenceModelItem>,
+ val random: Random = Random(0)
+) {
+ private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList()
+ private val comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs ->
+ var cmp = lhs.performanceScore.compareTo(rhs.performanceScore)
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad)
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ lhs.hashCode().compareTo(rhs.hashCode())
+ }
+ val items = TreeSet(comparator)
+ private val colocatedWorkloads = TreeSet<String>()
+
+ init {
+ this.items.addAll(items)
+ }
+
+ fun vmStarted(server: Server) {
+ colocatedWorkloads.add(server.image.name)
+ intersectingItems = items.filter { item -> doesMatch(item) }
+ }
+
+ fun vmStopped(server: Server) {
+ colocatedWorkloads.remove(server.image.name)
+ intersectingItems = items.filter { item -> doesMatch(item) }
+ }
+
+ private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean {
+ var count = 0
+ for (name in item.workloadNames.subSet(colocatedWorkloads.first(), colocatedWorkloads.last() + "\u0000")) {
+ if (name in colocatedWorkloads)
+ count++
+ if (count > 1)
+ return true
+ }
+ return false
+ }
+
+ fun apply(currentServerLoad: Double): Double {
+ if (intersectingItems.isEmpty()) {
+ return 1.0
+ }
+ val score = intersectingItems
+ .firstOrNull { it.minServerLoad <= currentServerLoad }
+
+ // Apply performance penalty to (on average) only one of the VMs
+ return if (score != null && random.nextInt(score.workloadNames.size) == 0) {
+ score.performanceScore
+ } else {
+ 1.0
+ }
+ }
+}
+
+/**
+ * Model describing how a specific set of workloads causes performance variability for each workload.
+ *
+ * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set.
+ * @param minServerLoad The minimum total server load at which this interference is activated and noticeable.
+ * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no
+ * influence, <1 means that performance degrades, and >1 means that performance improves.
+ */
+data class PerformanceInterferenceModelItem(
+ val workloadNames: SortedSet<String>,
+ val minServerLoad: Double,
+ val performanceScore: Double
+) {
+ override fun equals(other: Any?): Boolean {
+ if (this === other) return true
+ if (javaClass != other?.javaClass) return false
+
+ other as PerformanceInterferenceModelItem
+
+ if (workloadNames != other.workloadNames) return false
+
+ return true
+ }
+
+ override fun hashCode(): Int = workloadNames.hashCode()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 844938db..08f04760 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -120,7 +120,7 @@ public class SimpleBareMetalDriver(
/**
* The internal random instance.
*/
- private val random = Random(0)
+ private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits)
override suspend fun init(): Node = withContext(domain.coroutineContext) {
nodeState.value
@@ -134,7 +134,7 @@ public class SimpleBareMetalDriver(
val events = EventFlow<ServerEvent>()
val server = Server(
- UUID(node.uid.leastSignificantBits xor node.uid.mostSignificantBits, random.nextLong()),
+ UUID(random.nextLong(), random.nextLong()),
node.name,
emptyMap(),
flavor,
@@ -151,7 +151,7 @@ public class SimpleBareMetalDriver(
override suspend fun stop(): Node = withContext(domain.coroutineContext) {
val node = nodeState.value
- if (node.state == NodeState.SHUTOFF || node.state == NodeState.ERROR) {
+ if (node.state == NodeState.SHUTOFF) {
return@withContext node
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 73f9dd5c..53fa463b 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -40,31 +40,26 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
-import kotlinx.coroutines.CancellableContinuation
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
-import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.filter
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlinx.coroutines.withContext
import java.util.Objects
import java.util.TreeSet
import java.util.UUID
+import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -101,15 +96,6 @@ class SimpleVirtDriver(
override val events: Flow<HypervisorEvent> = eventFlow
init {
- events.filter { it is HypervisorEvent.VmsUpdated }.onEach {
- val imagesRunning = vms.map { it.server.image }.toSet()
- vms.forEach {
- val performanceModel =
- it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
- performanceModel?.computeIntersectingItems(imagesRunning)
- }
- }.launchIn(this)
-
launch {
try {
scheduler()
@@ -140,6 +126,7 @@ class SimpleVirtDriver(
)
availableMemory -= requiredMemory
vms.add(VmServerContext(server, events, simulationContext.domain))
+ vmStarted(server)
eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
@@ -148,6 +135,22 @@ class SimpleVirtDriver(
eventFlow.close()
}
+ private fun vmStarted(server: Server) {
+ vms.forEach {
+ val performanceModel =
+ it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ performanceModel?.vmStarted(server)
+ }
+ }
+
+ private fun vmStopped(server: Server) {
+ vms.forEach {
+ val performanceModel =
+ it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ performanceModel?.vmStopped(server)
+ }
+ }
+
/**
* A scheduling command processed by the scheduler.
*/
@@ -240,7 +243,7 @@ class SimpleVirtDriver(
}
// XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs.
- duration = max(300.0, ceil(duration))
+ duration = 300.0
val totalAllocatedUsage = maxUsage - availableUsage
var totalAllocatedBurst = 0L
@@ -325,14 +328,14 @@ class SimpleVirtDriver(
requests.removeAll(vmRequests)
// Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vm.cont?.resume(Unit)
+ vm.chan?.resume(Unit)
}
}
eventFlow.emit(
HypervisorEvent.SliceFinished(
this@SimpleVirtDriver,
- totalRequestedSubBurst,
+ totalRequestedBurst,
min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
totalOvercommissionedBurst,
totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
@@ -371,7 +374,7 @@ class SimpleVirtDriver(
val vm: VmServerContext,
val vcpu: ProcessingUnit,
var burst: Long,
- val limit: Double
+ var limit: Double
) {
/**
* The usage that was actually granted.
@@ -395,7 +398,7 @@ class SimpleVirtDriver(
private var finalized: Boolean = false
lateinit var burst: LongArray
var deadline: Long = 0L
- var cont: CancellableContinuation<Unit>? = null
+ var chan: Continuation<Unit>? = null
private var initialized: Boolean = false
internal val job: Job = launch {
@@ -443,6 +446,7 @@ class SimpleVirtDriver(
server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
vms.remove(this)
+ vmStopped(server)
eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
events.close()
}
@@ -451,6 +455,7 @@ class SimpleVirtDriver(
require(burst.size == limit.size) { "Array dimensions do not match" }
this.deadline = deadline
this.burst = burst
+
val requests = cpus.asSequence()
.take(burst.size)
.mapIndexed { i, cpu ->
@@ -465,16 +470,13 @@ class SimpleVirtDriver(
// Wait until the burst has been run or the coroutine is cancelled
try {
- schedulingQueue.send(SchedulerCommand.Schedule(this, requests))
- suspendCancellableCoroutine<Unit> { cont = it }
+ schedulingQueue.offer(SchedulerCommand.Schedule(this, requests))
+ suspendCoroutine<Unit> { chan = it }
} catch (e: CancellationException) {
// Deschedule the VM
- withContext(NonCancellable) {
- requests.forEach { it.isCancelled = true }
- schedulingQueue.send(SchedulerCommand.Interrupt)
- suspendCancellableCoroutine<Unit> { cont = it }
- }
-
+ requests.forEach { it.isCancelled = true }
+ schedulingQueue.offer(SchedulerCommand.Interrupt)
+ suspendCoroutine<Unit> { chan = it }
e.assertFailure()
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 6200ad7c..520f6dc5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,7 +1,6 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.odcsim.SimulationContext
-import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
@@ -20,7 +19,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -28,6 +26,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
+import kotlin.math.max
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
@@ -55,7 +54,14 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
- override val hypervisorEvents: Flow<HypervisorEvent> = EventFlow()
+ public var submittedVms = 0L
+ public var queuedVms = 0L
+ public var runningVms = 0L
+ public var finishedVms = 0L
+ public var unscheduledVms = 0L
+
+ private var maxCores = 0
+ private var maxMemory = 0L
/**
* The allocation logic to use.
@@ -87,6 +93,8 @@ class SimpleVirtProvisioningService(
image: Image,
flavor: Flavor
): Server = withContext(coroutineContext) {
+ submittedVms++
+ queuedVms++
suspendCancellableCoroutine<Server> { cont ->
val vmInstance = ImageView(name, image, flavor, cont)
incomingImages += vmInstance
@@ -121,15 +129,24 @@ class SimpleVirtProvisioningService(
}
private suspend fun schedule() {
- val log = simulationContext.log
+ val clock = simulationContext.clock
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
val requiredMemory = (imageInstance.image as VmImage).requiredMemory
- val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break
+ val selectedHv = allocationLogic.select(availableHypervisors, imageInstance)
+
+ if (selectedHv == null) {
+ if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
+ unscheduledVms++
+ println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}")
+ }
+
+ break
+ }
try {
- log.info("Spawning ${imageInstance.image} on ${selectedHv.server}")
+ println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}")
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -145,6 +162,8 @@ class SimpleVirtProvisioningService(
)
imageInstance.server = server
imageInstance.continuation.resume(server)
+ queuedVms--
+ runningVms++
activeImages += imageInstance
server.events
@@ -152,6 +171,10 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
+ println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}")
+ runningVms--
+ finishedVms++
+
activeImages -= imageInstance
selectedHv.provisionedCores -= server.flavor.cpuCount
@@ -170,6 +193,8 @@ class SimpleVirtProvisioningService(
selectedHv.numberOfActiveServers--
selectedHv.provisionedCores -= imageInstance.flavor.cpuCount
selectedHv.availableMemory += requiredMemory
+ } catch (e: Throwable) {
+ e.printStackTrace()
}
}
}
@@ -188,13 +213,18 @@ class SimpleVirtProvisioningService(
server.flavor.memorySize,
0
)
+ maxCores = max(maxCores, server.flavor.cpuCount)
+ maxMemory = max(maxMemory, server.flavor.memorySize)
hypervisors[server] = hv
}
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
- requestCycle()
+
+ if (incomingImages.isNotEmpty()) {
+ requestCycle()
+ }
}
else -> throw IllegalStateException()
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 550048a4..2ad7df84 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -3,10 +3,8 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
-import kotlinx.coroutines.flow.Flow
/**
* A service for VM provisioning on a cloud.
@@ -18,11 +16,6 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
- * The events emitted by the hypervisors.
- */
- public val hypervisorEvents: Flow<HypervisorEvent>
-
- /**
* Obtain the active hypervisors for this provisioner.
*/
public suspend fun drivers(): Set<VirtDriver>