diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-17 18:30:54 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-18 22:58:12 +0200 |
| commit | df5d9363e4e3558cb6e2f7f421412548b6d7d36a (patch) | |
| tree | b568fc48418b2146c989d7e519073d96e5d13073 | |
| parent | ee494d6ce6f817cf4e9ab0dba0d9f9f1987c0029 (diff) | |
perf: Batch slice submission
8 files changed, 270 insertions, 87 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 663fa5e4..027ba410 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -51,36 +51,113 @@ public interface ServerContext { public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) /** - * Request the specified burst time from the processor cores and suspend execution until a processor core finishes - * processing a **non-zero** burst or until the deadline is reached. + * Ask the processor cores to run the specified [slice] and suspend execution until the trigger condition is met as + * specified by [triggerMode]. * - * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be - * zero). + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. * - * Both [burst] and [limit] must be of the same size and in any other case the method will throw an - * [IllegalArgumentException]. + * @param slice The representation of work to run on the processors. + * @param triggerMode The trigger condition to resume execution. + */ + public suspend fun run(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST) = + select<Unit> { onRun(slice, triggerMode).invoke {} } + + /** + * Ask the processors cores to run the specified [batch] of work slices and suspend execution until the trigger + * condition is met as specified by [triggerMode]. * - * @param burst The burst time to request from each of the processor cores. - * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this request needs to be fulfilled. + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. + * + * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these + * slices with the next slice to be executed. + * + * @param batch The batch of work to run on the processors. + * @param triggerMode The trigger condition to resume execution. + * @param merge The merge function for consecutive slices in case the last slice was not completed within its + * deadline. */ - public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { - select<Unit> { onRun(burst, limit, deadline).invoke {} } - } + public suspend fun run( + batch: List<Slice>, + triggerMode: TriggerMode = TriggerMode.FIRST, + merge: (Slice, Slice) -> Slice = { _, r -> r } + ) = select<Unit> { onRun(batch, triggerMode, merge).invoke {} } + + /** + * Ask the processor cores to run the specified [slice] and select when the trigger condition is met as specified + * by [triggerMode]. + * + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. + * + * @param slice The representation of work to request from the processors. + * @param triggerMode The trigger condition to resume execution. + */ + public fun onRun(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): SelectClause0 = + onRun(listOf(slice), triggerMode) /** - * Request the specified burst time from the processor cores and suspend execution until a processor core finishes - * processing a **non-zero** burst or until the deadline is reached. + * Ask the processors cores to run the specified [batch] of work slices and select when the trigger condition is met + * as specified by [triggerMode]. + * + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. * - * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be - * zero). + * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these + * slices with the next slice to be executed. + * + * @param batch The batch of work to run on the processors. + * @param triggerMode The trigger condition to resume execution during the **last** slice. + * @param merge The merge function for consecutive slices in case the last slice was not completed within its + * deadline. + */ + public fun onRun( + batch: List<Slice>, + triggerMode: TriggerMode = TriggerMode.FIRST, + merge: (Slice, Slice) -> Slice = { _, r -> r } + ): SelectClause0 + + /** + * A request to the host machine for a slice of CPU time from the processor cores. * * Both [burst] and [limit] must be of the same size and in any other case the method will throw an * [IllegalArgumentException]. * + * * @param burst The burst time to request from each of the processor cores. * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this request needs to be fulfilled. + * @param deadline The instant at which this slice needs to be fulfilled. */ - public fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 + public class Slice(val burst: LongArray, val limit: DoubleArray, val deadline: Long) { + init { + require(burst.size == limit.size) { "Incompatible array dimensions" } + } + } + + /** + * The modes for triggering a machine exit from the machine. + */ + public enum class TriggerMode { + /** + * A machine exit occurs when either the first processor finishes processing a **non-zero** burst or the + * deadline is reached. + */ + FIRST, + + /** + * A machine exit occurs when either the last processor finishes processing a **non-zero** burst or the deadline + * is reached. + */ + LAST, + + /** + * A machine exit occurs only when the deadline is reached. + */ + DEADLINE + } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index e77b55a6..d65e7e94 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,9 +26,7 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.ensureActive import java.util.UUID -import kotlin.coroutines.coroutineContext import kotlin.math.min /** @@ -64,9 +62,6 @@ data class FlopsApplicationImage( val burst = LongArray(cores) { flops / cores } val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - while (burst.any { it != 0L }) { - coroutineContext.ensureActive() - ctx.run(burst, maxUsage, Long.MAX_VALUE) - } + ctx.run(ServerContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = ServerContext.TriggerMode.LAST) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 36bbfa45..30a091b1 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -33,7 +33,7 @@ class VmImage( val burst = LongArray(cores) { fragment.flops / cores } val usage = DoubleArray(cores) { fragment.usage / cores } - ctx.run(burst, usage, clock.millis() + fragment.duration) + ctx.run(ServerContext.Slice(burst, usage, clock.millis() + fragment.duration)) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 41cec291..a84fb905 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -43,8 +43,7 @@ public interface BareMetalDriver : Powerable, FailureDomain { public val node: Flow<Node> /** - * The amount of work done by the machine in percentage with respect to the total amount of processing power - * available. + * The amount of work done by the machine in MHz. */ public val usage: Flow<Double> diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 08f04760..2d885a8c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -28,12 +28,14 @@ import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.flow.StateFlow +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException import com.atlarge.opendc.compute.core.image.EmptyImage @@ -48,9 +50,11 @@ import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 @@ -62,6 +66,7 @@ import kotlin.math.min import kotlinx.coroutines.withContext import java.lang.Exception import kotlin.coroutines.ContinuationInterceptor +import kotlin.math.round import kotlin.random.Random /** @@ -113,7 +118,11 @@ public class SimpleBareMetalDriver( override val node: Flow<Node> = nodeState + @OptIn(FlowPreview::class) override val usage: Flow<Double> = usageState + // Debounce changes for 1 ms to prevent emitting two values during the same instant (e.g. slices finishes and + // new slice starts). + .debounce(1) override val powerDraw: Flow<Double> = powerModel(this) @@ -252,79 +261,168 @@ public class SimpleBareMetalDriver( setNode(nodeState.value.copy(state = newNodeState, server = server)) } - private var flush: DisposableHandle? = null - @OptIn(InternalCoroutinesApi::class) - override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 { - require(burst.size == limit.size) { "Array dimensions do not match" } + override fun onRun( + batch: List<ServerContext.Slice>, + triggerMode: ServerContext.TriggerMode, + merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice + ): SelectClause0 { assert(!finalized) { "Server instance is already finalized" } return object : SelectClause0 { @InternalCoroutinesApi override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { - // If run is called in at the same timestamp as the previous call, cancel the load flush - flush?.dispose() - flush = null - val context = select.completion.context - val simulationContext = context[SimulationContext]!! + val clock = context[SimulationContext]!!.clock val delay = context[ContinuationInterceptor] as Delay - val start = simulationContext.clock.millis() - var duration = max(0, deadline - start) - var totalUsage = 0.0 + val queue = batch.iterator() + var start = Long.MIN_VALUE + var currentWork: SliceWork? = null + var currentDisposable: DisposableHandle? = null - // Determine the duration of the first CPU to finish - for (i in 0 until min(cpus.size, burst.size)) { - val cpu = cpus[i] - val usage = min(limit[i], cpu.frequency) - val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + fun schedule(slice: ServerContext.Slice) { + start = clock.millis() - totalUsage += usage / cpu.frequency - - if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst - duration = min(duration, cpuDuration) + val isLastSlice = !queue.hasNext() + val work = SliceWork(slice) + val duration = when (triggerMode) { + ServerContext.TriggerMode.FIRST -> min(work.minExit, slice.deadline - start) + ServerContext.TriggerMode.LAST -> min(work.maxExit, slice.deadline - start) + ServerContext.TriggerMode.DEADLINE -> slice.deadline - start } - } - if (!unavailable) { - delay.invokeOnTimeout(1, Runnable { - usageState.value = totalUsage / cpus.size - }) - } + val action = Runnable { + currentWork = null + - val action = Runnable { - // todo: we could have replaced startCoroutine with startCoroutineUndispatched - // But we need a way to know that Delay.invokeOnTimeout had used the right thread - if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) // shall be cancellable while waits for dispatch + // Flush all the work that was performed + val hasFinished = work.stop(duration) + + if (!isLastSlice) { + val candidateSlice = queue.next() + val nextSlice = + // If our previous slice exceeds its deadline, merge it with the next candidate slice + if (hasFinished) + candidateSlice + else + merge(candidateSlice, slice) + schedule(nextSlice) + } else if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } } + + // Schedule the flush after the entire slice has finished + currentDisposable = delay.invokeOnTimeout(duration, action) + + // Start the slice work + currentWork = work + work.start() } - val disposable = delay.invokeOnTimeout(duration, action) - val flush = DisposableHandle { - val end = simulationContext.clock.millis() - - // Flush the load if they do not receive a new run call for the same timestamp - flush = delay.invokeOnTimeout(1, Runnable { - usageState.value = 0.0 - flush = null - }) - - if (!unavailable) { - // Write back the remaining burst time - for (i in 0 until min(cpus.size, burst.size)) { - val usage = min(limit[i], cpus[i].frequency) - val granted = ceil((end - start) / 1000.0 * usage).toLong() - burst[i] = max(0, burst[i] - granted) - } + // Schedule the first work + if (queue.hasNext()) { + schedule(queue.next()) + + // A DisposableHandle to flush the work in case the call is cancelled + val flush = DisposableHandle { + val end = clock.millis() + val duration = end - start + + currentWork?.stop(duration) + currentDisposable?.dispose() } - disposable.dispose() + select.disposeOnSelect(flush) + } else if (select.trySelect()) { + // No work has been given: select immediately + block.startCoroutineCancellable(select.completion) } + } + } + } + + /** + * A slice to be processed. + */ + private inner class SliceWork(val slice: ServerContext.Slice) { + /** + * The duration after which the first processor finishes processing this slice. + */ + public val minExit: Long + + /** + * The duration after which the last processor finishes processing this slice. + */ + public val maxExit: Long + + /** + * A flag to indicate that the slice will exceed the deadline. + */ + public val exceedsDeadline: Boolean + get() = slice.deadline < maxExit + + /** + * The total amount of CPU usage. + */ + public val totalUsage: Double + + init { + var totalUsage = 0.0 + var minExit = Long.MAX_VALUE + var maxExit = 0L + + // Determine the duration of the first/last CPU to finish + for (i in 0 until min(cpus.size, slice.burst.size)) { + val cpu = cpus[i] + val usage = min(slice.limit[i], cpu.frequency) + val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + totalUsage += usage + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + minExit = min(minExit, cpuDuration) + maxExit = max(maxExit, cpuDuration) + } + } + + this.totalUsage = totalUsage + this.minExit = minExit + this.maxExit = maxExit + } + + /** + * Indicate that the work on the slice has started. + */ + public fun start() { + usageState.value = totalUsage + } - select.disposeOnSelect(flush) + /** + * Flush the work performed on the slice. + */ + public fun stop(duration: Long): Boolean { + var hasFinished = true + + // Only flush the work if the machine is available + if (!unavailable) { + for (i in 0 until min(cpus.size, slice.burst.size)) { + val usage = min(slice.limit[i], cpus[i].frequency) + val granted = ceil(duration / 1000.0 * usage).toLong() + val res = max(0, slice.burst[i] - granted) + slice.burst[i] = res + + if (res != 0L) { + hasFinished = false + } + } } + + // Reset the usage of the machine since the slice has finished + usageState.value = 0.0 + + return hasFinished } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index ce814dd8..9eab3353 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -265,9 +265,9 @@ class SimpleVirtDriver( // We run the total burst on the host processor. Note that this call may be cancelled at any moment in // time, so not all of the burst may be executed. - val interrupted = select<Boolean> { + select<Boolean> { schedulingQueue.onReceive { schedulingQueue.offer(it); true } - hostContext.onRun(burst, usage, deadline).invoke { false } + hostContext.onRun(ServerContext.Slice(burst, usage, deadline)).invoke { false } } val end = clock.millis() @@ -451,10 +451,9 @@ class SimpleVirtDriver( events.close() } - override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { - require(burst.size == limit.size) { "Array dimensions do not match" } - this.deadline = deadline - this.burst = burst + override suspend fun run(slice: ServerContext.Slice, triggerMode: ServerContext.TriggerMode) { + deadline = slice.deadline + burst = slice.burst val requests = cpus.asSequence() .take(burst.size) @@ -463,7 +462,7 @@ class SimpleVirtDriver( this, cpu, burst[i], - limit[i] + slice.limit[i] ) } .toList() @@ -482,6 +481,10 @@ class SimpleVirtDriver( } @OptIn(InternalCoroutinesApi::class) - override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 = TODO() + override fun onRun( + batch: List<ServerContext.Slice>, + triggerMode: ServerContext.TriggerMode, + merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice + ): SelectClause0 = TODO() } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 0fc64373..1b5d62a2 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,12 +25,15 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext @@ -61,11 +64,18 @@ internal class SimpleBareMetalDriverTest { driver.init() driver.setImage(image) val server = driver.start().server!! + driver.usage + .onEach { println("${simulationContext.clock.millis()} $it") } + .launchIn(this) server.events.collect { event -> when (event) { - is ServerEvent.StateChanged -> { println(event); finalState = event.server.state } + is ServerEvent.StateChanged -> { + println("${simulationContext.clock.millis()} $event"); + finalState = event.server.state + } } } + } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 4f3abc02..318fc279 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -71,6 +71,7 @@ internal class HypervisorTest { val node = metalDriver.start() node.server?.events?.onEach { println(it) }?.launchIn(this) + delay(5) val flavor = Flavor(1, 0) |
