summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-05-20 15:59:54 +0200
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-05-20 15:59:54 +0200
commit70ad01d793f88b1bef7d7988d24bff384ddbb3b9 (patch)
tree10b4d6053d1cd58e921f71ff7b0d6f0cf7bab75a /opendc
parentee494d6ce6f817cf4e9ab0dba0d9f9f1987c0029 (diff)
parent21eafd32c45495ab9e8ebbeffbdbe1d43ffe566b (diff)
Merge branch 'perf/batch-slices' into '2.x'
Batch VM slices See merge request opendc/opendc-simulator!70
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/build.gradle.kts1
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt113
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt7
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt234
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt336
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt12
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt11
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt77
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt134
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt56
-rw-r--r--opendc/opendc-experiments-sc20/src/test/resources/env/single.txt3
13 files changed, 772 insertions, 244 deletions
diff --git a/opendc/opendc-compute/build.gradle.kts b/opendc/opendc-compute/build.gradle.kts
index 7d43b064..acdcd5a7 100644
--- a/opendc/opendc-compute/build.gradle.kts
+++ b/opendc/opendc-compute/build.gradle.kts
@@ -36,6 +36,7 @@ dependencies {
implementation("io.github.microutils:kotlin-logging:1.7.9")
testRuntimeOnly(project(":odcsim:odcsim-engine-omega"))
+ testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}")
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index 663fa5e4..f770fa49 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -51,36 +51,113 @@ public interface ServerContext {
public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T)
/**
- * Request the specified burst time from the processor cores and suspend execution until a processor core finishes
- * processing a **non-zero** burst or until the deadline is reached.
+ * Ask the processor cores to run the specified [slice] and suspend execution until the trigger condition is met as
+ * specified by [triggerMode].
*
- * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be
- * zero).
+ * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
+ * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
+ * the timing of this change.
*
- * Both [burst] and [limit] must be of the same size and in any other case the method will throw an
- * [IllegalArgumentException].
+ * @param slice The representation of work to run on the processors.
+ * @param triggerMode The trigger condition to resume execution.
+ */
+ public suspend fun run(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST) =
+ select<Unit> { onRun(slice, triggerMode).invoke {} }
+
+ /**
+ * Ask the processors cores to run the specified [batch] of work slices and suspend execution until the trigger
+ * condition is met as specified by [triggerMode].
*
- * @param burst The burst time to request from each of the processor cores.
- * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst.
- * @param deadline The instant at which this request needs to be fulfilled.
+ * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
+ * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
+ * the timing of this change.
+ *
+ * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these
+ * slices with the next slice to be executed.
+ *
+ * @param batch The batch of work to run on the processors.
+ * @param triggerMode The trigger condition to resume execution.
+ * @param merge The merge function for consecutive slices in case the last slice was not completed within its
+ * deadline.
*/
- public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
- select<Unit> { onRun(burst, limit, deadline).invoke {} }
- }
+ public suspend fun run(
+ batch: Sequence<Slice>,
+ triggerMode: TriggerMode = TriggerMode.FIRST,
+ merge: (Slice, Slice) -> Slice = { _, r -> r }
+ ) = select<Unit> { onRun(batch, triggerMode, merge).invoke {} }
+
+ /**
+ * Ask the processor cores to run the specified [slice] and select when the trigger condition is met as specified
+ * by [triggerMode].
+ *
+ * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
+ * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
+ * the timing of this change.
+ *
+ * @param slice The representation of work to request from the processors.
+ * @param triggerMode The trigger condition to resume execution.
+ */
+ public fun onRun(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): SelectClause0 =
+ onRun(sequenceOf(slice), triggerMode)
/**
- * Request the specified burst time from the processor cores and suspend execution until a processor core finishes
- * processing a **non-zero** burst or until the deadline is reached.
+ * Ask the processors cores to run the specified [batch] of work slices and select when the trigger condition is met
+ * as specified by [triggerMode].
+ *
+ * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
+ * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
+ * the timing of this change.
*
- * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be
- * zero).
+ * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these
+ * slices with the next slice to be executed.
+ *
+ * @param batch The batch of work to run on the processors.
+ * @param triggerMode The trigger condition to resume execution during the **last** slice.
+ * @param merge The merge function for consecutive slices in case the last slice was not completed within its
+ * deadline.
+ */
+ public fun onRun(
+ batch: Sequence<Slice>,
+ triggerMode: TriggerMode = TriggerMode.FIRST,
+ merge: (Slice, Slice) -> Slice = { _, r -> r }
+ ): SelectClause0
+
+ /**
+ * A request to the host machine for a slice of CPU time from the processor cores.
*
* Both [burst] and [limit] must be of the same size and in any other case the method will throw an
* [IllegalArgumentException].
*
+ *
* @param burst The burst time to request from each of the processor cores.
* @param limit The maximum usage in terms of MHz that the processing core may use while running the burst.
- * @param deadline The instant at which this request needs to be fulfilled.
+ * @param deadline The instant at which this slice needs to be fulfilled.
*/
- public fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0
+ public class Slice(val burst: LongArray, val limit: DoubleArray, val deadline: Long) {
+ init {
+ require(burst.size == limit.size) { "Incompatible array dimensions" }
+ }
+ }
+
+ /**
+ * The modes for triggering a machine exit from the machine.
+ */
+ public enum class TriggerMode {
+ /**
+ * A machine exit occurs when either the first processor finishes processing a **non-zero** burst or the
+ * deadline is reached.
+ */
+ FIRST,
+
+ /**
+ * A machine exit occurs when either the last processor finishes processing a **non-zero** burst or the deadline
+ * is reached.
+ */
+ LAST,
+
+ /**
+ * A machine exit occurs only when the deadline is reached.
+ */
+ DEADLINE
+ }
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
index e77b55a6..d65e7e94 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
@@ -26,9 +26,7 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.ensureActive
import java.util.UUID
-import kotlin.coroutines.coroutineContext
import kotlin.math.min
/**
@@ -64,9 +62,6 @@ data class FlopsApplicationImage(
val burst = LongArray(cores) { flops / cores }
val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization }
- while (burst.any { it != 0L }) {
- coroutineContext.ensureActive()
- ctx.run(burst, maxUsage, Long.MAX_VALUE)
- }
+ ctx.run(ServerContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = ServerContext.TriggerMode.LAST)
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index 36bbfa45..c615d865 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -3,11 +3,7 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.ensureActive
import java.util.UUID
-import kotlin.coroutines.coroutineContext
import kotlin.math.min
class VmImage(
@@ -21,21 +17,17 @@ class VmImage(
override suspend fun invoke(ctx: ServerContext) {
val clock = simulationContext.clock
- val job = coroutineContext[Job]!!
+ var offset = clock.millis()
- for (fragment in flopsHistory) {
- job.ensureActive()
-
- if (fragment.flops == 0L) {
- delay(fragment.duration)
- } else {
- val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
- val burst = LongArray(cores) { fragment.flops / cores }
- val usage = DoubleArray(cores) { fragment.usage / cores }
-
- ctx.run(burst, usage, clock.millis() + fragment.duration)
- }
+ val batch = flopsHistory.map { fragment ->
+ val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
+ val burst = LongArray(cores) { fragment.flops / cores }
+ val usage = DoubleArray(cores) { fragment.usage / cores }
+ offset += fragment.duration
+ ServerContext.Slice(burst, usage, offset)
}
+
+ ctx.run(batch)
}
override fun toString(): String = "VmImage(uid=$uid, name=$name, cores=$maxCores, requiredMemory=$requiredMemory)"
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 08f04760..6a77415c 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -34,6 +34,7 @@ import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
+import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
import com.atlarge.opendc.compute.core.image.EmptyImage
@@ -48,9 +49,12 @@ import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
@@ -61,6 +65,7 @@ import kotlin.math.max
import kotlin.math.min
import kotlinx.coroutines.withContext
import java.lang.Exception
+import java.time.Clock
import kotlin.coroutines.ContinuationInterceptor
import kotlin.random.Random
@@ -75,6 +80,7 @@ import kotlin.random.Random
* @param memoryUnits The memory units in this machine.
* @param powerModel The power model of this machine.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
public class SimpleBareMetalDriver(
private val domain: Domain,
uid: UUID,
@@ -104,7 +110,7 @@ public class SimpleBareMetalDriver(
/**
* The flow containing the load of the server.
*/
- private val usageState = StateFlow(0.0)
+ private val usageState = MutableStateFlow(0.0)
/**
* The machine state.
@@ -113,6 +119,7 @@ public class SimpleBareMetalDriver(
override val node: Flow<Node> = nodeState
+ @OptIn(FlowPreview::class)
override val usage: Flow<Double> = usageState
override val powerDraw: Flow<Double> = powerModel(this)
@@ -252,79 +259,200 @@ public class SimpleBareMetalDriver(
setNode(nodeState.value.copy(state = newNodeState, server = server))
}
- private var flush: DisposableHandle? = null
+ /**
+ * A disposable to prevent resetting the usage state for subsequent calls to onRun.
+ */
+ private var usageFlush: DisposableHandle? = null
+
+ /**
+ * Cache the [Clock] for timing.
+ */
+ private val clock = domain.coroutineContext[SimulationContext]!!.clock
+
+ /**
+ * Cache the [Delay] instance for timing.
+ *
+ * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy.
+ * XXX Note however that this is an ugly hack which may break in the future.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay
@OptIn(InternalCoroutinesApi::class)
- override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 {
- require(burst.size == limit.size) { "Array dimensions do not match" }
+ override fun onRun(
+ batch: Sequence<ServerContext.Slice>,
+ triggerMode: ServerContext.TriggerMode,
+ merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice
+ ): SelectClause0 {
assert(!finalized) { "Server instance is already finalized" }
return object : SelectClause0 {
@InternalCoroutinesApi
override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
- // If run is called in at the same timestamp as the previous call, cancel the load flush
- flush?.dispose()
- flush = null
+ // Do not reset the usage state: we will set it ourselves
+ usageFlush?.dispose()
+ usageFlush = null
+
+ val queue = batch.iterator()
+ var start = Long.MIN_VALUE
+ var currentWork: SliceWork? = null
+ var currentDisposable: DisposableHandle? = null
+
+ fun schedule(slice: ServerContext.Slice) {
+ start = clock.millis()
+
+ val isLastSlice = !queue.hasNext()
+ val work = SliceWork(slice)
+ val candidateDuration = when (triggerMode) {
+ ServerContext.TriggerMode.FIRST -> work.minExit
+ ServerContext.TriggerMode.LAST -> work.maxExit
+ ServerContext.TriggerMode.DEADLINE -> slice.deadline - start
+ }
+
+ // Check whether the deadline is exceeded during the run of the slice.
+ val duration = min(candidateDuration, slice.deadline - start)
+
+ val action = Runnable {
+ currentWork = null
+
+ // Flush all the work that was performed
+ val hasFinished = work.stop(duration)
+
+ if (!isLastSlice) {
+ val candidateSlice = queue.next()
+ val nextSlice =
+ // If our previous slice exceeds its deadline, merge it with the next candidate slice
+ if (hasFinished)
+ candidateSlice
+ else
+ merge(candidateSlice, slice)
+ schedule(nextSlice)
+ } else if (select.trySelect()) {
+ block.startCoroutineCancellable(select.completion)
+ }
+ }
+
+ // Schedule the flush after the entire slice has finished
+ currentDisposable = delay.invokeOnTimeout(duration, action)
- val context = select.completion.context
- val simulationContext = context[SimulationContext]!!
- val delay = context[ContinuationInterceptor] as Delay
+ // Start the slice work
+ currentWork = work
+ work.start()
+ }
- val start = simulationContext.clock.millis()
- var duration = max(0, deadline - start)
- var totalUsage = 0.0
+ // Schedule the first work
+ if (queue.hasNext()) {
+ schedule(queue.next())
- // Determine the duration of the first CPU to finish
- for (i in 0 until min(cpus.size, burst.size)) {
- val cpu = cpus[i]
- val usage = min(limit[i], cpu.frequency)
- val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
+ // A DisposableHandle to flush the work in case the call is cancelled
+ val disposable = DisposableHandle {
+ val end = clock.millis()
+ val duration = end - start
- totalUsage += usage / cpu.frequency
+ currentWork?.stop(duration)
+ currentDisposable?.dispose()
- if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
- duration = min(duration, cpuDuration)
+ // Schedule reset the usage of the machine since the call is returning
+ usageFlush = delay.invokeOnTimeout(1, Runnable {
+ usageState.value = 0.0
+ usageFlush = null
+ })
}
- }
- if (!unavailable) {
- delay.invokeOnTimeout(1, Runnable {
- usageState.value = totalUsage / cpus.size
- })
+ select.disposeOnSelect(disposable)
+ } else if (select.trySelect()) {
+ // No work has been given: select immediately
+ block.startCoroutineCancellable(select.completion)
}
+ }
+ }
+ }
- val action = Runnable {
- // todo: we could have replaced startCoroutine with startCoroutineUndispatched
- // But we need a way to know that Delay.invokeOnTimeout had used the right thread
- if (select.trySelect()) {
- block.startCoroutineCancellable(select.completion) // shall be cancellable while waits for dispatch
- }
+ /**
+ * A slice to be processed.
+ */
+ private inner class SliceWork(val slice: ServerContext.Slice) {
+ /**
+ * The duration after which the first processor finishes processing this slice.
+ */
+ public val minExit: Long
+
+ /**
+ * The duration after which the last processor finishes processing this slice.
+ */
+ public val maxExit: Long
+
+ /**
+ * A flag to indicate that the slice will exceed the deadline.
+ */
+ public val exceedsDeadline: Boolean
+ get() = slice.deadline < maxExit
+
+ /**
+ * The total amount of CPU usage.
+ */
+ public val totalUsage: Double
+
+ /**
+ * A flag to indicate that this slice is empty.
+ */
+ public val isEmpty: Boolean
+
+ init {
+ var totalUsage = 0.0
+ var minExit = Long.MAX_VALUE
+ var maxExit = 0L
+ var nonEmpty = false
+
+ // Determine the duration of the first/last CPU to finish
+ for (i in 0 until min(cpus.size, slice.burst.size)) {
+ val cpu = cpus[i]
+ val usage = min(slice.limit[i], cpu.frequency)
+ val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
+
+ totalUsage += usage / cpu.frequency
+
+ if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
+ minExit = min(minExit, cpuDuration)
+ maxExit = max(maxExit, cpuDuration)
+ nonEmpty = true
}
+ }
- val disposable = delay.invokeOnTimeout(duration, action)
- val flush = DisposableHandle {
- val end = simulationContext.clock.millis()
-
- // Flush the load if they do not receive a new run call for the same timestamp
- flush = delay.invokeOnTimeout(1, Runnable {
- usageState.value = 0.0
- flush = null
- })
-
- if (!unavailable) {
- // Write back the remaining burst time
- for (i in 0 until min(cpus.size, burst.size)) {
- val usage = min(limit[i], cpus[i].frequency)
- val granted = ceil((end - start) / 1000.0 * usage).toLong()
- burst[i] = max(0, burst[i] - granted)
- }
- }
+ this.isEmpty = !nonEmpty
+ this.totalUsage = totalUsage
+ this.minExit = minExit
+ this.maxExit = maxExit
+ }
- disposable.dispose()
- }
+ /**
+ * Indicate that the work on the slice has started.
+ */
+ public fun start() {
+ usageState.value = totalUsage / cpus.size
+ }
- select.disposeOnSelect(flush)
+ /**
+ * Flush the work performed on the slice.
+ */
+ public fun stop(duration: Long): Boolean {
+ var hasFinished = true
+
+ // Only flush the work if the machine is available
+ if (!unavailable) {
+ for (i in 0 until min(cpus.size, slice.burst.size)) {
+ val usage = min(slice.limit[i], cpus[i].frequency)
+ val granted = ceil(duration / 1000.0 * usage).toLong()
+ val res = max(0, slice.burst[i] - granted)
+ slice.burst[i] = res
+
+ if (res != 0L) {
+ hasFinished = false
+ }
+ }
}
+
+ return hasFinished
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index ce814dd8..3c41f52e 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.compute.virt.driver
-import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
@@ -35,7 +34,6 @@ import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
-import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
@@ -44,6 +42,7 @@ import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
@@ -51,15 +50,12 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
+import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
-import java.util.Objects
-import java.util.TreeSet
import java.util.UUID
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.coroutines.suspendCoroutine
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -81,7 +77,7 @@ class SimpleVirtDriver(
/**
* A set for tracking the VM context objects.
*/
- internal val vms: MutableSet<VmServerContext> = mutableSetOf()
+ private val vms: MutableSet<VmServerContext> = mutableSetOf()
/**
* Current total memory use of the images on this hypervisor.
@@ -125,7 +121,7 @@ class SimpleVirtDriver(
ServiceRegistry(), events
)
availableMemory -= requiredMemory
- vms.add(VmServerContext(server, events, simulationContext.domain))
+ vms.add(VmServerContext(server, events))
vmStarted(server)
eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
@@ -156,9 +152,14 @@ class SimpleVirtDriver(
*/
private sealed class SchedulerCommand {
/**
- * Schedule the specified vCPUs of a single VM.
+ * Schedule the specified VM on the hypervisor.
*/
- data class Schedule(val vm: VmServerContext, val requests: Collection<CpuRequest>) : SchedulerCommand()
+ data class Schedule(val vm: Vm) : SchedulerCommand()
+
+ /**
+ * De-schedule the specified VM on the hypervisor.
+ */
+ data class Deschedule(val vm: Vm) : SchedulerCommand()
/**
* Interrupt the scheduler.
@@ -184,8 +185,8 @@ class SimpleVirtDriver(
val maxUsage = hostContext.cpus.sumByDouble { it.frequency }
val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }
- val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>()
- val requests = TreeSet(cpuRequestComparator)
+ val vms = mutableSetOf<Vm>()
+ val vcpus = mutableListOf<VCpu>()
val usage = DoubleArray(hostContext.cpus.size)
val burst = LongArray(hostContext.cpus.size)
@@ -193,10 +194,14 @@ class SimpleVirtDriver(
fun process(command: SchedulerCommand) {
when (command) {
is SchedulerCommand.Schedule -> {
- vms[command.vm] = command.requests
- requests.removeAll(command.requests)
- requests.addAll(command.requests)
+ vms += command.vm
+ vcpus.addAll(command.vm.vcpus)
}
+ is SchedulerCommand.Deschedule -> {
+ vms -= command.vm
+ vcpus.removeAll(command.vm.vcpus)
+ }
+ is SchedulerCommand.Interrupt -> {}
}
}
@@ -210,7 +215,7 @@ class SimpleVirtDriver(
while (!stopped) {
// Wait for a request to be submitted if we have no work yet.
- if (requests.isEmpty()) {
+ if (vcpus.isEmpty()) {
process(schedulingQueue.receive())
}
@@ -225,21 +230,33 @@ class SimpleVirtDriver(
var totalRequestedUsage = 0.0
var totalRequestedBurst = 0L
+ // Sort the vCPUs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ vcpus.sort()
+
// Divide the available host capacity fairly across the vCPUs using max-min fair sharing
- for ((i, req) in requests.withIndex()) {
- val remaining = requests.size - i
+ for ((i, req) in vcpus.withIndex()) {
+ val remaining = vcpus.size - i
val availableShare = availableUsage / remaining
val grantedUsage = min(req.limit, availableShare)
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, req.vm.deadline)
+
+ // Ignore empty CPUs
+ if (grantedUsage <= 0 || req.burst <= 0) {
+ req.allocatedLimit = 0.0
+ continue
+ }
+
totalRequestedUsage += req.limit
totalRequestedBurst += req.burst
- req.allocatedUsage = grantedUsage
+ req.allocatedLimit = grantedUsage
availableUsage -= grantedUsage
// The duration that we want to run is that of the shortest request from a vCPU
duration = min(duration, req.burst / grantedUsage)
- deadline = min(deadline, req.vm.deadline)
}
// XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs.
@@ -265,9 +282,9 @@ class SimpleVirtDriver(
// We run the total burst on the host processor. Note that this call may be cancelled at any moment in
// time, so not all of the burst may be executed.
- val interrupted = select<Boolean> {
+ select<Boolean> {
schedulingQueue.onReceive { schedulingQueue.offer(it); true }
- hostContext.onRun(burst, usage, deadline).invoke { false }
+ hostContext.onRun(ServerContext.Slice(burst, usage, deadline), ServerContext.TriggerMode.DEADLINE).invoke { false }
}
val end = clock.millis()
@@ -278,7 +295,7 @@ class SimpleVirtDriver(
}
// The total requested burst that the VMs wanted to run in the time-frame that we ran.
- val totalRequestedSubBurst = requests.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
+ val totalRequestedSubBurst = vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
val totalRemainder = burst.sum()
val totalGrantedBurst = totalAllocatedBurst - totalRemainder
@@ -287,19 +304,19 @@ class SimpleVirtDriver(
// The burst that was lost due to interference.
var totalInterferedBurst = 0L
- val entryIterator = vms.entries.iterator()
- while (entryIterator.hasNext()) {
- val (vm, vmRequests) = entryIterator.next()
+ val vmIterator = vms.iterator()
+ while (vmIterator.hasNext()) {
+ val vm = vmIterator.next()
// Apply performance interference model
val performanceModel =
- vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ vm.ctx.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
val performanceScore = performanceModel?.apply(serverLoad) ?: 1.0
var hasFinished = false
- for ((i, req) in vmRequests.withIndex()) {
+ for (vcpu in vm.vcpus) {
// Compute the fraction of compute time allocated to the VM
- val fraction = req.allocatedUsage / totalAllocatedUsage
+ val fraction = vcpu.allocatedLimit / totalAllocatedUsage
// Compute the burst time that the VM was actually granted
val grantedBurst = ceil(totalGrantedBurst * fraction).toLong()
@@ -310,25 +327,21 @@ class SimpleVirtDriver(
totalInterferedBurst += grantedBurst - usedBurst
// Compute remaining burst time to be executed for the request
- req.burst = max(0, vm.burst[i] - usedBurst)
- vm.burst[i] = req.burst
-
- if (req.burst <= 0L || req.isCancelled) {
+ if (vcpu.consume(usedBurst)) {
hasFinished = true
} else if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) {
// Request must have its entire burst consumed or otherwise we have overcommission
// Note that we count the overcommissioned burst if the hypervisor has failed.
- totalOvercommissionedBurst += req.burst
+ totalOvercommissionedBurst += vcpu.burst
}
}
if (hasFinished || vm.deadline <= end) {
- // Deschedule all requests from this VM
- entryIterator.remove()
- requests.removeAll(vmRequests)
-
- // Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vm.chan?.resume(Unit)
+ // Mark the VM as finished and deschedule the VMs if needed
+ if (vm.finish()) {
+ vmIterator.remove()
+ vcpus.removeAll(vm.vcpus)
+ }
}
}
@@ -349,57 +362,182 @@ class SimpleVirtDriver(
}
/**
- * The [Comparator] for [CpuRequest].
+ * A virtual machine running on the hypervisor.
+ *
+ * @param ctx The execution context the vCPU runs in.
+ * @param triggerMode The mode when to trigger the VM exit.
+ * @param merge The function to merge consecutive slices on spillover.
+ * @param select The function to select on finish.
*/
- private val cpuRequestComparator: Comparator<CpuRequest> = Comparator { lhs, rhs ->
- var cmp = lhs.limit.compareTo(rhs.limit)
+ @OptIn(InternalCoroutinesApi::class)
+ private data class Vm(
+ val ctx: VmServerContext,
+ var triggerMode: ServerContext.TriggerMode = ServerContext.TriggerMode.FIRST,
+ var merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice = { _, r -> r },
+ var select: () -> Unit = {}
+ ) {
+ /**
+ * The vCPUs of this virtual machine.
+ */
+ val vcpus: List<VCpu>
+
+ /**
+ * The slices that the VM wants to run.
+ */
+ var queue: Iterator<ServerContext.Slice> = emptyList<ServerContext.Slice>().iterator()
+
+ /**
+ * The current active slice.
+ */
+ var activeSlice: ServerContext.Slice? = null
+
+ /**
+ * The current deadline of the VM.
+ */
+ val deadline: Long
+ get() = activeSlice?.deadline ?: Long.MAX_VALUE
- if (cmp != 0) {
- return@Comparator cmp
+ /**
+ * A flag to indicate that the VM is idle.
+ */
+ val isIdle: Boolean
+ get() = activeSlice == null
+
+ init {
+ vcpus = ctx.cpus.mapIndexed { i, model -> VCpu(this, model, i) }
}
- cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid)
+ /**
+ * Schedule the given slices on this vCPU, replacing the existing slices.
+ */
+ fun schedule(slices: Sequence<ServerContext.Slice>) {
+ queue = slices.iterator()
+
+ if (queue.hasNext()) {
+ activeSlice = queue.next()
+ vcpus.forEach { it.refresh() }
+ }
+ }
- if (cmp != 0) {
- return@Comparator cmp
+ /**
+ * Cancel the existing workload on the VM.
+ */
+ fun cancel() {
+ queue = emptyList<ServerContext.Slice>().iterator()
+ activeSlice = null
+ vcpus.forEach { it.refresh() }
}
- lhs.vcpu.id.compareTo(rhs.vcpu.id)
+ /**
+ * Finish the current slice of the VM.
+ *
+ * @return `true` if the vCPUs may be descheduled, `false` otherwise.
+ */
+ fun finish(): Boolean {
+ val activeSlice = activeSlice ?: return true
+
+ return if (queue.hasNext()) {
+ val needsMerge = activeSlice.burst.any { it > 0 }
+ val candidateSlice = queue.next()
+ val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice
+
+ this.activeSlice = slice
+
+ // Update the vCPU cache
+ vcpus.forEach { it.refresh() }
+
+ false
+ } else {
+ this.activeSlice = null
+ select()
+ true
+ }
+ }
}
/**
- * A request to schedule a virtual CPU on the host cpu.
+ * A virtual CPU that can be scheduled on a physical CPU.
+ *
+ * @param vm The VM of which this vCPU is part.
+ * @param model The model of CPU that this vCPU models.
+ * @param id The id of the vCPU with respect to the VM.
*/
- internal data class CpuRequest(
- val vm: VmServerContext,
- val vcpu: ProcessingUnit,
- var burst: Long,
- var limit: Double
- ) {
+ private data class VCpu(
+ val vm: Vm,
+ val model: ProcessingUnit,
+ val id: Int
+ ) : Comparable<VCpu> {
+ /**
+ * The current limit on the vCPU.
+ */
+ var limit: Double = 0.0
+
+ /**
+ * The limit allocated by the hypervisor.
+ */
+ var allocatedLimit: Double = 0.0
+
/**
- * The usage that was actually granted.
+ * The current burst running on the vCPU.
*/
- var allocatedUsage: Double = 0.0
+ var burst: Long = 0L
+
+ /**
+ * Consume the specified burst on this vCPU.
+ */
+ fun consume(burst: Long): Boolean {
+ this.burst = max(0, this.burst - burst)
+
+ // Flush the result to the slice if it exists
+ vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst)
+
+ return allocatedLimit > 0.0 && this.burst == 0L
+ }
+
+ /**
+ * Refresh the information of this vCPU based on the current slice.
+ */
+ fun refresh() {
+ limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0
+ burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0
+ }
/**
- * A flag to indicate the request was cancelled.
+ * Compare to another vCPU based on the current load of the vCPU.
*/
- var isCancelled: Boolean = false
+ override fun compareTo(other: VCpu): Int {
+ var cmp = limit.compareTo(other.limit)
- override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu
- override fun hashCode(): Int = Objects.hash(vm, vcpu)
+ if (cmp != 0) {
+ return cmp
+ }
+
+ cmp = vm.ctx.server.uid.compareTo(other.vm.ctx.server.uid)
+
+ if (cmp != 0) {
+ return cmp
+ }
+
+ return id.compareTo(other.id)
+ }
+
+ /**
+ * Create a string representation of the vCPU.
+ */
+ override fun toString(): String =
+ "vCPU(vm=${vm.ctx.server.uid},id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)"
}
- internal inner class VmServerContext(
- server: Server,
- val events: EventFlow<ServerEvent>,
- val domain: Domain
- ) : ServerManagementContext {
+ /**
+ * The execution context in which a VM runs.
+ *
+ * @param server The details of the VM.
+ * @param events The event stream to publish to.
+ */
+ private inner class VmServerContext(server: Server, val events: EventFlow<ServerEvent>) : ServerManagementContext, DisposableHandle {
private var finalized: Boolean = false
- lateinit var burst: LongArray
- var deadline: Long = 0L
- var chan: Continuation<Unit>? = null
private var initialized: Boolean = false
+ private val vm: Vm
internal val job: Job = launch {
delay(1) // TODO Introduce boot time
@@ -423,6 +561,10 @@ class SimpleVirtDriver(
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
+ init {
+ vm = Vm(this)
+ }
+
override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
server = server.copy(services = server.services.put(key, service))
events.emit(ServerEvent.ServicePublished(server, key))
@@ -451,37 +593,33 @@ class SimpleVirtDriver(
events.close()
}
- override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
- require(burst.size == limit.size) { "Array dimensions do not match" }
- this.deadline = deadline
- this.burst = burst
-
- val requests = cpus.asSequence()
- .take(burst.size)
- .mapIndexed { i, cpu ->
- CpuRequest(
- this,
- cpu,
- burst[i],
- limit[i]
- )
+ @OptIn(InternalCoroutinesApi::class)
+ override fun onRun(
+ batch: Sequence<ServerContext.Slice>,
+ triggerMode: ServerContext.TriggerMode,
+ merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice
+ ): SelectClause0 = object : SelectClause0 {
+ @InternalCoroutinesApi
+ override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
+ vm.triggerMode = triggerMode
+ vm.merge = merge
+ vm.select = {
+ if (select.trySelect()) {
+ block.startCoroutineCancellable(select.completion)
+ }
}
- .toList()
-
- // Wait until the burst has been run or the coroutine is cancelled
- try {
- schedulingQueue.offer(SchedulerCommand.Schedule(this, requests))
- suspendCoroutine<Unit> { chan = it }
- } catch (e: CancellationException) {
- // Deschedule the VM
- requests.forEach { it.isCancelled = true }
- schedulingQueue.offer(SchedulerCommand.Interrupt)
- suspendCoroutine<Unit> { chan = it }
- e.assertFailure()
+ vm.schedule(batch)
+ // Indicate to the hypervisor that the VM should be re-scheduled
+ schedulingQueue.offer(SchedulerCommand.Schedule(vm))
+ select.disposeOnSelect(this@VmServerContext)
}
}
- @OptIn(InternalCoroutinesApi::class)
- override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 = TODO()
+ override fun dispose() {
+ if (!vm.isIdle) {
+ vm.cancel()
+ schedulingQueue.offer(SchedulerCommand.Deschedule(vm))
+ }
+ }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index c3d9c745..ff4aa3d7 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -180,7 +180,7 @@ class SimpleVirtProvisioningService(
}
try {
- logger.info { "Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
+ logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -214,7 +214,7 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
- logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
+ logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
this@SimpleVirtProvisioningService,
@@ -254,6 +254,8 @@ class SimpleVirtProvisioningService(
private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
+ logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" }
+
if (server in hypervisors) {
// Corner case for when the hypervisor already exists
availableHypervisors += hypervisors.getValue(server)
@@ -280,8 +282,14 @@ class SimpleVirtProvisioningService(
queuedVms,
unscheduledVms
))
+
+ // Re-schedule on the new machine
+ if (incomingImages.isNotEmpty()) {
+ requestCycle()
+ }
}
ServerState.SHUTOFF, ServerState.ERROR -> {
+ logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index 0fc64373..071c0626 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -25,12 +25,15 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
@@ -61,9 +64,15 @@ internal class SimpleBareMetalDriverTest {
driver.init()
driver.setImage(image)
val server = driver.start().server!!
+ driver.usage
+ .onEach { println("${simulationContext.clock.millis()} $it") }
+ .launchIn(this)
server.events.collect { event ->
when (event) {
- is ServerEvent.StateChanged -> { println(event); finalState = event.server.state }
+ is ServerEvent.StateChanged -> {
+ println("${simulationContext.clock.millis()} $event")
+ finalState = event.server.state
+ }
}
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 4f3abc02..ca00fc94 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -29,6 +29,8 @@ import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -37,7 +39,10 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
import java.util.ServiceLoader
import java.util.UUID
@@ -50,6 +55,7 @@ internal class HypervisorTest {
*/
@OptIn(ExperimentalCoroutinesApi::class)
@Test
+ @Disabled
fun smoke() {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider("test")
@@ -87,4 +93,75 @@ internal class HypervisorTest {
system.terminate()
}
}
+
+ /**
+ * Test overcommissioning of a hypervisor.
+ */
+ @Test
+ fun overcommission() {
+ val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+ val system = provider("test")
+ val root = system.newDomain("root")
+
+ var requestedBurst = 0L
+ var grantedBurst = 0L
+ var overcommissionedBurst = 0L
+
+ root.launch {
+ val vmm = HypervisorImage
+ val duration = 5 * 60L
+ val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2)
+ ), 2, 0)
+ val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2)
+ ), 2, 0)
+
+ val driverDom = root.newDomain("driver")
+
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) }
+ val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+
+ metalDriver.init()
+ metalDriver.setImage(vmm)
+ metalDriver.start()
+
+ delay(5)
+
+ val flavor = Flavor(2, 0)
+ val vmDriver = metalDriver.refresh().server!!.services[VirtDriver]
+ vmDriver.events
+ .onEach { event ->
+ when (event) {
+ is HypervisorEvent.SliceFinished -> {
+ requestedBurst += event.requestedBurst
+ grantedBurst += event.grantedBurst
+ overcommissionedBurst += event.overcommissionedBurst
+ }
+ }
+ }
+ .launchIn(this)
+
+ vmDriver.spawn("a", vmImageA, flavor)
+ vmDriver.spawn("b", vmImageB, flavor)
+ }
+
+ runBlocking {
+ system.run()
+ system.terminate()
+ }
+
+ assertAll(
+ { assertEquals(2073600, requestedBurst, "Requested Burst does not match") },
+ { assertEquals(2013600, grantedBurst, "Granted Burst does not match") },
+ { assertEquals(60000, overcommissionedBurst, "Overcommissioned Burst does not match") }
+ )
+ }
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 83952d43..9d2b0247 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -172,7 +172,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
.onEach { event ->
when (event) {
is HypervisorEvent.SliceFinished -> monitor.reportHostSlice(
- simulationContext.clock.millis(),
+ clock.millis(),
event.requestedBurst,
event.grantedBurst,
event.overcommissionedBurst,
@@ -242,10 +242,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
// Monitor server events
server.events
.onEach {
- val time = simulationContext.clock.millis()
-
if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(time, it.server)
+ monitor.reportVmStateChange(simulationContext.clock.millis(), it.server)
}
delay(1)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
index 138905a4..7f71eb3e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.experiments.sc20.experiment.monitor
import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
import com.atlarge.opendc.experiments.sc20.experiment.Run
@@ -54,9 +53,17 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"),
run.parent.parent.parent.bufferSize
)
- private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
+ private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private var startTime = -1L
- override fun reportVmStateChange(time: Long, server: Server) {}
+ override fun reportVmStateChange(time: Long, server: Server) {
+ if (startTime < 0) {
+ startTime = time
+
+ // Update timestamp of initial event
+ currentHostEvent.replaceAll { k, v -> v.copy(timestamp = startTime) }
+ }
+ }
override fun reportHostStateChange(
time: Long,
@@ -65,27 +72,31 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
) {
logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
- val lastServerState = lastServerStates[server]
- if (server.state == ServerState.SHUTOFF && lastServerState != null) {
- val duration = time - lastServerState.second
- reportHostSlice(
- time,
- 0,
- 0,
- 0,
- 0,
- 0.0,
- 0.0,
- 0,
- server,
- duration
- )
+ val previousEvent = currentHostEvent[server]
- lastServerStates.remove(server)
- lastPowerConsumption.remove(server)
- } else {
- lastServerStates[server] = Pair(server.state, time)
- }
+ val roundedTime = previousEvent?.let {
+ val duration = time - it.timestamp
+ val k = 5 * 60 * 1000L // 5 min in ms
+ val rem = duration % k
+
+ if (rem == 0L) {
+ time
+ } else {
+ it.timestamp + duration + k - rem
+ }
+ } ?: time
+
+ reportHostSlice(
+ roundedTime,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0.0,
+ 0.0,
+ 0,
+ server
+ )
}
private val lastPowerConsumption = mutableMapOf<Server, Double>()
@@ -106,23 +117,62 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
hostServer: Server,
duration: Long
) {
- lastServerStates[hostServer] = Pair(hostServer.state, time)
+ val previousEvent = currentHostEvent[hostServer]
+ when {
+ previousEvent == null -> {
+ val event = HostEvent(
+ time,
+ 5 * 60 * 1000L,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0
+ )
- hostWriter.write(
- HostEvent(
- time,
- duration,
- hostServer,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
- cpuUsage,
- cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0
- )
- )
+ currentHostEvent[hostServer] = event
+ }
+ previousEvent.timestamp == time -> {
+ val event = HostEvent(
+ time,
+ previousEvent.duration,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ else -> {
+ hostWriter.write(previousEvent)
+
+ val event = HostEvent(
+ time,
+ time - previousEvent.timestamp,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ }
}
override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
@@ -141,6 +191,12 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
}
override fun close() {
+ // Flush remaining events
+ for ((_, event) in currentHostEvent) {
+ hostWriter.write(event)
+ }
+ currentHostEvent.clear()
+
hostWriter.close()
provisionerWriter.close()
}
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index abd5c961..68c2cbc5 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -47,9 +47,11 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
import java.io.File
import java.util.ServiceLoader
@@ -134,6 +136,7 @@ class Sc20IntegrationTest {
failureDomain?.cancel()
scheduler.terminate()
+ monitor.close()
}
runSimulation()
@@ -147,6 +150,48 @@ class Sc20IntegrationTest {
assertEquals(0, monitor.totalInterferedBurst)
}
+ @Test
+ fun small() {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val traceReader = createTestTraceReader(0.5, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+ lateinit var scheduler: SimpleVirtProvisioningService
+
+ root.launch {
+ val res = createProvisioner(
+ root,
+ environmentReader,
+ allocationPolicy
+ )
+ scheduler = res.second
+
+ attachMonitor(scheduler, monitor)
+ processTrace(
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ scheduler.terminate()
+ monitor.close()
+ }
+
+ runSimulation()
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(96344114723, monitor.totalRequestedBurst) },
+ { assertEquals(96324378235, monitor.totalGrantedBurst) },
+ { assertEquals(19736424, monitor.totalOvercommissionedBurst) },
+ { assertEquals(0, monitor.totalInterferedBurst) }
+ )
+ }
+
/**
* Run the simulation.
*/
@@ -157,20 +202,20 @@ class Sc20IntegrationTest {
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(): TraceReader<VmWorkload> {
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> {
return Sc20ParquetTraceReader(
Sc20RawParquetTraceReader(File("src/test/resources/trace")),
emptyMap(),
- Workload("test", 1.0),
- 0
+ Workload("test", fraction),
+ seed
)
}
/**
* Obtain the environment reader for the test.
*/
- private fun createTestEnvironmentReader(): EnvironmentReader {
- val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt")
+ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
+ val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
return Sc20ClusterEnvironmentReader(stream)
}
@@ -197,6 +242,7 @@ class Sc20IntegrationTest {
totalOvercommissionedBurst += overcommissionedBurst
totalInterferedBurst += interferedBurst
}
+
override fun close() {}
}
}
diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt
new file mode 100644
index 00000000..53b3c2d7
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt
@@ -0,0 +1,3 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;8;3.2;64;1;64;8
+