diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-20 15:59:54 +0200 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-20 15:59:54 +0200 |
| commit | 70ad01d793f88b1bef7d7988d24bff384ddbb3b9 (patch) | |
| tree | 10b4d6053d1cd58e921f71ff7b0d6f0cf7bab75a /opendc | |
| parent | ee494d6ce6f817cf4e9ab0dba0d9f9f1987c0029 (diff) | |
| parent | 21eafd32c45495ab9e8ebbeffbdbe1d43ffe566b (diff) | |
Merge branch 'perf/batch-slices' into '2.x'
Batch VM slices
See merge request opendc/opendc-simulator!70
Diffstat (limited to 'opendc')
13 files changed, 772 insertions, 244 deletions
diff --git a/opendc/opendc-compute/build.gradle.kts b/opendc/opendc-compute/build.gradle.kts index 7d43b064..acdcd5a7 100644 --- a/opendc/opendc-compute/build.gradle.kts +++ b/opendc/opendc-compute/build.gradle.kts @@ -36,6 +36,7 @@ dependencies { implementation("io.github.microutils:kotlin-logging:1.7.9") testRuntimeOnly(project(":odcsim:odcsim-engine-omega")) + testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 663fa5e4..f770fa49 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -51,36 +51,113 @@ public interface ServerContext { public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) /** - * Request the specified burst time from the processor cores and suspend execution until a processor core finishes - * processing a **non-zero** burst or until the deadline is reached. + * Ask the processor cores to run the specified [slice] and suspend execution until the trigger condition is met as + * specified by [triggerMode]. * - * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be - * zero). + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. * - * Both [burst] and [limit] must be of the same size and in any other case the method will throw an - * [IllegalArgumentException]. + * @param slice The representation of work to run on the processors. + * @param triggerMode The trigger condition to resume execution. + */ + public suspend fun run(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST) = + select<Unit> { onRun(slice, triggerMode).invoke {} } + + /** + * Ask the processors cores to run the specified [batch] of work slices and suspend execution until the trigger + * condition is met as specified by [triggerMode]. * - * @param burst The burst time to request from each of the processor cores. - * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this request needs to be fulfilled. + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. + * + * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these + * slices with the next slice to be executed. + * + * @param batch The batch of work to run on the processors. + * @param triggerMode The trigger condition to resume execution. + * @param merge The merge function for consecutive slices in case the last slice was not completed within its + * deadline. */ - public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { - select<Unit> { onRun(burst, limit, deadline).invoke {} } - } + public suspend fun run( + batch: Sequence<Slice>, + triggerMode: TriggerMode = TriggerMode.FIRST, + merge: (Slice, Slice) -> Slice = { _, r -> r } + ) = select<Unit> { onRun(batch, triggerMode, merge).invoke {} } + + /** + * Ask the processor cores to run the specified [slice] and select when the trigger condition is met as specified + * by [triggerMode]. + * + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. + * + * @param slice The representation of work to request from the processors. + * @param triggerMode The trigger condition to resume execution. + */ + public fun onRun(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): SelectClause0 = + onRun(sequenceOf(slice), triggerMode) /** - * Request the specified burst time from the processor cores and suspend execution until a processor core finishes - * processing a **non-zero** burst or until the deadline is reached. + * Ask the processors cores to run the specified [batch] of work slices and select when the trigger condition is met + * as specified by [triggerMode]. + * + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. * - * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be - * zero). + * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these + * slices with the next slice to be executed. + * + * @param batch The batch of work to run on the processors. + * @param triggerMode The trigger condition to resume execution during the **last** slice. + * @param merge The merge function for consecutive slices in case the last slice was not completed within its + * deadline. + */ + public fun onRun( + batch: Sequence<Slice>, + triggerMode: TriggerMode = TriggerMode.FIRST, + merge: (Slice, Slice) -> Slice = { _, r -> r } + ): SelectClause0 + + /** + * A request to the host machine for a slice of CPU time from the processor cores. * * Both [burst] and [limit] must be of the same size and in any other case the method will throw an * [IllegalArgumentException]. * + * * @param burst The burst time to request from each of the processor cores. * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this request needs to be fulfilled. + * @param deadline The instant at which this slice needs to be fulfilled. */ - public fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 + public class Slice(val burst: LongArray, val limit: DoubleArray, val deadline: Long) { + init { + require(burst.size == limit.size) { "Incompatible array dimensions" } + } + } + + /** + * The modes for triggering a machine exit from the machine. + */ + public enum class TriggerMode { + /** + * A machine exit occurs when either the first processor finishes processing a **non-zero** burst or the + * deadline is reached. + */ + FIRST, + + /** + * A machine exit occurs when either the last processor finishes processing a **non-zero** burst or the deadline + * is reached. + */ + LAST, + + /** + * A machine exit occurs only when the deadline is reached. + */ + DEADLINE + } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index e77b55a6..d65e7e94 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,9 +26,7 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.ensureActive import java.util.UUID -import kotlin.coroutines.coroutineContext import kotlin.math.min /** @@ -64,9 +62,6 @@ data class FlopsApplicationImage( val burst = LongArray(cores) { flops / cores } val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - while (burst.any { it != 0L }) { - coroutineContext.ensureActive() - ctx.run(burst, maxUsage, Long.MAX_VALUE) - } + ctx.run(ServerContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = ServerContext.TriggerMode.LAST) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 36bbfa45..c615d865 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -3,11 +3,7 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay -import kotlinx.coroutines.ensureActive import java.util.UUID -import kotlin.coroutines.coroutineContext import kotlin.math.min class VmImage( @@ -21,21 +17,17 @@ class VmImage( override suspend fun invoke(ctx: ServerContext) { val clock = simulationContext.clock - val job = coroutineContext[Job]!! + var offset = clock.millis() - for (fragment in flopsHistory) { - job.ensureActive() - - if (fragment.flops == 0L) { - delay(fragment.duration) - } else { - val cores = min(fragment.cores, ctx.server.flavor.cpuCount) - val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage / cores } - - ctx.run(burst, usage, clock.millis() + fragment.duration) - } + val batch = flopsHistory.map { fragment -> + val cores = min(fragment.cores, ctx.server.flavor.cpuCount) + val burst = LongArray(cores) { fragment.flops / cores } + val usage = DoubleArray(cores) { fragment.usage / cores } + offset += fragment.duration + ServerContext.Slice(burst, usage, offset) } + + ctx.run(batch) } override fun toString(): String = "VmImage(uid=$uid, name=$name, cores=$maxCores, requiredMemory=$requiredMemory)" diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 08f04760..6a77415c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -34,6 +34,7 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException import com.atlarge.opendc.compute.core.image.EmptyImage @@ -48,9 +49,12 @@ import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 @@ -61,6 +65,7 @@ import kotlin.math.max import kotlin.math.min import kotlinx.coroutines.withContext import java.lang.Exception +import java.time.Clock import kotlin.coroutines.ContinuationInterceptor import kotlin.random.Random @@ -75,6 +80,7 @@ import kotlin.random.Random * @param memoryUnits The memory units in this machine. * @param powerModel The power model of this machine. */ +@OptIn(ExperimentalCoroutinesApi::class) public class SimpleBareMetalDriver( private val domain: Domain, uid: UUID, @@ -104,7 +110,7 @@ public class SimpleBareMetalDriver( /** * The flow containing the load of the server. */ - private val usageState = StateFlow(0.0) + private val usageState = MutableStateFlow(0.0) /** * The machine state. @@ -113,6 +119,7 @@ public class SimpleBareMetalDriver( override val node: Flow<Node> = nodeState + @OptIn(FlowPreview::class) override val usage: Flow<Double> = usageState override val powerDraw: Flow<Double> = powerModel(this) @@ -252,79 +259,200 @@ public class SimpleBareMetalDriver( setNode(nodeState.value.copy(state = newNodeState, server = server)) } - private var flush: DisposableHandle? = null + /** + * A disposable to prevent resetting the usage state for subsequent calls to onRun. + */ + private var usageFlush: DisposableHandle? = null + + /** + * Cache the [Clock] for timing. + */ + private val clock = domain.coroutineContext[SimulationContext]!!.clock + + /** + * Cache the [Delay] instance for timing. + * + * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. + * XXX Note however that this is an ugly hack which may break in the future. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay @OptIn(InternalCoroutinesApi::class) - override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 { - require(burst.size == limit.size) { "Array dimensions do not match" } + override fun onRun( + batch: Sequence<ServerContext.Slice>, + triggerMode: ServerContext.TriggerMode, + merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice + ): SelectClause0 { assert(!finalized) { "Server instance is already finalized" } return object : SelectClause0 { @InternalCoroutinesApi override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { - // If run is called in at the same timestamp as the previous call, cancel the load flush - flush?.dispose() - flush = null + // Do not reset the usage state: we will set it ourselves + usageFlush?.dispose() + usageFlush = null + + val queue = batch.iterator() + var start = Long.MIN_VALUE + var currentWork: SliceWork? = null + var currentDisposable: DisposableHandle? = null + + fun schedule(slice: ServerContext.Slice) { + start = clock.millis() + + val isLastSlice = !queue.hasNext() + val work = SliceWork(slice) + val candidateDuration = when (triggerMode) { + ServerContext.TriggerMode.FIRST -> work.minExit + ServerContext.TriggerMode.LAST -> work.maxExit + ServerContext.TriggerMode.DEADLINE -> slice.deadline - start + } + + // Check whether the deadline is exceeded during the run of the slice. + val duration = min(candidateDuration, slice.deadline - start) + + val action = Runnable { + currentWork = null + + // Flush all the work that was performed + val hasFinished = work.stop(duration) + + if (!isLastSlice) { + val candidateSlice = queue.next() + val nextSlice = + // If our previous slice exceeds its deadline, merge it with the next candidate slice + if (hasFinished) + candidateSlice + else + merge(candidateSlice, slice) + schedule(nextSlice) + } else if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } + } + + // Schedule the flush after the entire slice has finished + currentDisposable = delay.invokeOnTimeout(duration, action) - val context = select.completion.context - val simulationContext = context[SimulationContext]!! - val delay = context[ContinuationInterceptor] as Delay + // Start the slice work + currentWork = work + work.start() + } - val start = simulationContext.clock.millis() - var duration = max(0, deadline - start) - var totalUsage = 0.0 + // Schedule the first work + if (queue.hasNext()) { + schedule(queue.next()) - // Determine the duration of the first CPU to finish - for (i in 0 until min(cpus.size, burst.size)) { - val cpu = cpus[i] - val usage = min(limit[i], cpu.frequency) - val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + // A DisposableHandle to flush the work in case the call is cancelled + val disposable = DisposableHandle { + val end = clock.millis() + val duration = end - start - totalUsage += usage / cpu.frequency + currentWork?.stop(duration) + currentDisposable?.dispose() - if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst - duration = min(duration, cpuDuration) + // Schedule reset the usage of the machine since the call is returning + usageFlush = delay.invokeOnTimeout(1, Runnable { + usageState.value = 0.0 + usageFlush = null + }) } - } - if (!unavailable) { - delay.invokeOnTimeout(1, Runnable { - usageState.value = totalUsage / cpus.size - }) + select.disposeOnSelect(disposable) + } else if (select.trySelect()) { + // No work has been given: select immediately + block.startCoroutineCancellable(select.completion) } + } + } + } - val action = Runnable { - // todo: we could have replaced startCoroutine with startCoroutineUndispatched - // But we need a way to know that Delay.invokeOnTimeout had used the right thread - if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) // shall be cancellable while waits for dispatch - } + /** + * A slice to be processed. + */ + private inner class SliceWork(val slice: ServerContext.Slice) { + /** + * The duration after which the first processor finishes processing this slice. + */ + public val minExit: Long + + /** + * The duration after which the last processor finishes processing this slice. + */ + public val maxExit: Long + + /** + * A flag to indicate that the slice will exceed the deadline. + */ + public val exceedsDeadline: Boolean + get() = slice.deadline < maxExit + + /** + * The total amount of CPU usage. + */ + public val totalUsage: Double + + /** + * A flag to indicate that this slice is empty. + */ + public val isEmpty: Boolean + + init { + var totalUsage = 0.0 + var minExit = Long.MAX_VALUE + var maxExit = 0L + var nonEmpty = false + + // Determine the duration of the first/last CPU to finish + for (i in 0 until min(cpus.size, slice.burst.size)) { + val cpu = cpus[i] + val usage = min(slice.limit[i], cpu.frequency) + val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + totalUsage += usage / cpu.frequency + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + minExit = min(minExit, cpuDuration) + maxExit = max(maxExit, cpuDuration) + nonEmpty = true } + } - val disposable = delay.invokeOnTimeout(duration, action) - val flush = DisposableHandle { - val end = simulationContext.clock.millis() - - // Flush the load if they do not receive a new run call for the same timestamp - flush = delay.invokeOnTimeout(1, Runnable { - usageState.value = 0.0 - flush = null - }) - - if (!unavailable) { - // Write back the remaining burst time - for (i in 0 until min(cpus.size, burst.size)) { - val usage = min(limit[i], cpus[i].frequency) - val granted = ceil((end - start) / 1000.0 * usage).toLong() - burst[i] = max(0, burst[i] - granted) - } - } + this.isEmpty = !nonEmpty + this.totalUsage = totalUsage + this.minExit = minExit + this.maxExit = maxExit + } - disposable.dispose() - } + /** + * Indicate that the work on the slice has started. + */ + public fun start() { + usageState.value = totalUsage / cpus.size + } - select.disposeOnSelect(flush) + /** + * Flush the work performed on the slice. + */ + public fun stop(duration: Long): Boolean { + var hasFinished = true + + // Only flush the work if the machine is available + if (!unavailable) { + for (i in 0 until min(cpus.size, slice.burst.size)) { + val usage = min(slice.limit[i], cpus[i].frequency) + val granted = ceil(duration / 1000.0 * usage).toLong() + val res = max(0, slice.burst[i] - granted) + slice.burst[i] = res + + if (res != 0L) { + hasFinished = false + } + } } + + return hasFinished } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index ce814dd8..3c41f52e 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.compute.virt.driver -import com.atlarge.odcsim.Domain import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor @@ -35,7 +34,6 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException -import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey @@ -44,6 +42,7 @@ import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi @@ -51,15 +50,12 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select -import java.util.Objects -import java.util.TreeSet import java.util.UUID -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -81,7 +77,7 @@ class SimpleVirtDriver( /** * A set for tracking the VM context objects. */ - internal val vms: MutableSet<VmServerContext> = mutableSetOf() + private val vms: MutableSet<VmServerContext> = mutableSetOf() /** * Current total memory use of the images on this hypervisor. @@ -125,7 +121,7 @@ class SimpleVirtDriver( ServiceRegistry(), events ) availableMemory -= requiredMemory - vms.add(VmServerContext(server, events, simulationContext.domain)) + vms.add(VmServerContext(server, events)) vmStarted(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server @@ -156,9 +152,14 @@ class SimpleVirtDriver( */ private sealed class SchedulerCommand { /** - * Schedule the specified vCPUs of a single VM. + * Schedule the specified VM on the hypervisor. */ - data class Schedule(val vm: VmServerContext, val requests: Collection<CpuRequest>) : SchedulerCommand() + data class Schedule(val vm: Vm) : SchedulerCommand() + + /** + * De-schedule the specified VM on the hypervisor. + */ + data class Deschedule(val vm: Vm) : SchedulerCommand() /** * Interrupt the scheduler. @@ -184,8 +185,8 @@ class SimpleVirtDriver( val maxUsage = hostContext.cpus.sumByDouble { it.frequency } val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } - val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>() - val requests = TreeSet(cpuRequestComparator) + val vms = mutableSetOf<Vm>() + val vcpus = mutableListOf<VCpu>() val usage = DoubleArray(hostContext.cpus.size) val burst = LongArray(hostContext.cpus.size) @@ -193,10 +194,14 @@ class SimpleVirtDriver( fun process(command: SchedulerCommand) { when (command) { is SchedulerCommand.Schedule -> { - vms[command.vm] = command.requests - requests.removeAll(command.requests) - requests.addAll(command.requests) + vms += command.vm + vcpus.addAll(command.vm.vcpus) } + is SchedulerCommand.Deschedule -> { + vms -= command.vm + vcpus.removeAll(command.vm.vcpus) + } + is SchedulerCommand.Interrupt -> {} } } @@ -210,7 +215,7 @@ class SimpleVirtDriver( while (!stopped) { // Wait for a request to be submitted if we have no work yet. - if (requests.isEmpty()) { + if (vcpus.isEmpty()) { process(schedulingQueue.receive()) } @@ -225,21 +230,33 @@ class SimpleVirtDriver( var totalRequestedUsage = 0.0 var totalRequestedBurst = 0L + // Sort the vCPUs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + vcpus.sort() + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - for ((i, req) in requests.withIndex()) { - val remaining = requests.size - i + for ((i, req) in vcpus.withIndex()) { + val remaining = vcpus.size - i val availableShare = availableUsage / remaining val grantedUsage = min(req.limit, availableShare) + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, req.vm.deadline) + + // Ignore empty CPUs + if (grantedUsage <= 0 || req.burst <= 0) { + req.allocatedLimit = 0.0 + continue + } + totalRequestedUsage += req.limit totalRequestedBurst += req.burst - req.allocatedUsage = grantedUsage + req.allocatedLimit = grantedUsage availableUsage -= grantedUsage // The duration that we want to run is that of the shortest request from a vCPU duration = min(duration, req.burst / grantedUsage) - deadline = min(deadline, req.vm.deadline) } // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. @@ -265,9 +282,9 @@ class SimpleVirtDriver( // We run the total burst on the host processor. Note that this call may be cancelled at any moment in // time, so not all of the burst may be executed. - val interrupted = select<Boolean> { + select<Boolean> { schedulingQueue.onReceive { schedulingQueue.offer(it); true } - hostContext.onRun(burst, usage, deadline).invoke { false } + hostContext.onRun(ServerContext.Slice(burst, usage, deadline), ServerContext.TriggerMode.DEADLINE).invoke { false } } val end = clock.millis() @@ -278,7 +295,7 @@ class SimpleVirtDriver( } // The total requested burst that the VMs wanted to run in the time-frame that we ran. - val totalRequestedSubBurst = requests.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() + val totalRequestedSubBurst = vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() val totalRemainder = burst.sum() val totalGrantedBurst = totalAllocatedBurst - totalRemainder @@ -287,19 +304,19 @@ class SimpleVirtDriver( // The burst that was lost due to interference. var totalInterferedBurst = 0L - val entryIterator = vms.entries.iterator() - while (entryIterator.hasNext()) { - val (vm, vmRequests) = entryIterator.next() + val vmIterator = vms.iterator() + while (vmIterator.hasNext()) { + val vm = vmIterator.next() // Apply performance interference model val performanceModel = - vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + vm.ctx.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? val performanceScore = performanceModel?.apply(serverLoad) ?: 1.0 var hasFinished = false - for ((i, req) in vmRequests.withIndex()) { + for (vcpu in vm.vcpus) { // Compute the fraction of compute time allocated to the VM - val fraction = req.allocatedUsage / totalAllocatedUsage + val fraction = vcpu.allocatedLimit / totalAllocatedUsage // Compute the burst time that the VM was actually granted val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() @@ -310,25 +327,21 @@ class SimpleVirtDriver( totalInterferedBurst += grantedBurst - usedBurst // Compute remaining burst time to be executed for the request - req.burst = max(0, vm.burst[i] - usedBurst) - vm.burst[i] = req.burst - - if (req.burst <= 0L || req.isCancelled) { + if (vcpu.consume(usedBurst)) { hasFinished = true } else if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) { // Request must have its entire burst consumed or otherwise we have overcommission // Note that we count the overcommissioned burst if the hypervisor has failed. - totalOvercommissionedBurst += req.burst + totalOvercommissionedBurst += vcpu.burst } } if (hasFinished || vm.deadline <= end) { - // Deschedule all requests from this VM - entryIterator.remove() - requests.removeAll(vmRequests) - - // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan?.resume(Unit) + // Mark the VM as finished and deschedule the VMs if needed + if (vm.finish()) { + vmIterator.remove() + vcpus.removeAll(vm.vcpus) + } } } @@ -349,57 +362,182 @@ class SimpleVirtDriver( } /** - * The [Comparator] for [CpuRequest]. + * A virtual machine running on the hypervisor. + * + * @param ctx The execution context the vCPU runs in. + * @param triggerMode The mode when to trigger the VM exit. + * @param merge The function to merge consecutive slices on spillover. + * @param select The function to select on finish. */ - private val cpuRequestComparator: Comparator<CpuRequest> = Comparator { lhs, rhs -> - var cmp = lhs.limit.compareTo(rhs.limit) + @OptIn(InternalCoroutinesApi::class) + private data class Vm( + val ctx: VmServerContext, + var triggerMode: ServerContext.TriggerMode = ServerContext.TriggerMode.FIRST, + var merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice = { _, r -> r }, + var select: () -> Unit = {} + ) { + /** + * The vCPUs of this virtual machine. + */ + val vcpus: List<VCpu> + + /** + * The slices that the VM wants to run. + */ + var queue: Iterator<ServerContext.Slice> = emptyList<ServerContext.Slice>().iterator() + + /** + * The current active slice. + */ + var activeSlice: ServerContext.Slice? = null + + /** + * The current deadline of the VM. + */ + val deadline: Long + get() = activeSlice?.deadline ?: Long.MAX_VALUE - if (cmp != 0) { - return@Comparator cmp + /** + * A flag to indicate that the VM is idle. + */ + val isIdle: Boolean + get() = activeSlice == null + + init { + vcpus = ctx.cpus.mapIndexed { i, model -> VCpu(this, model, i) } } - cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid) + /** + * Schedule the given slices on this vCPU, replacing the existing slices. + */ + fun schedule(slices: Sequence<ServerContext.Slice>) { + queue = slices.iterator() + + if (queue.hasNext()) { + activeSlice = queue.next() + vcpus.forEach { it.refresh() } + } + } - if (cmp != 0) { - return@Comparator cmp + /** + * Cancel the existing workload on the VM. + */ + fun cancel() { + queue = emptyList<ServerContext.Slice>().iterator() + activeSlice = null + vcpus.forEach { it.refresh() } } - lhs.vcpu.id.compareTo(rhs.vcpu.id) + /** + * Finish the current slice of the VM. + * + * @return `true` if the vCPUs may be descheduled, `false` otherwise. + */ + fun finish(): Boolean { + val activeSlice = activeSlice ?: return true + + return if (queue.hasNext()) { + val needsMerge = activeSlice.burst.any { it > 0 } + val candidateSlice = queue.next() + val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice + + this.activeSlice = slice + + // Update the vCPU cache + vcpus.forEach { it.refresh() } + + false + } else { + this.activeSlice = null + select() + true + } + } } /** - * A request to schedule a virtual CPU on the host cpu. + * A virtual CPU that can be scheduled on a physical CPU. + * + * @param vm The VM of which this vCPU is part. + * @param model The model of CPU that this vCPU models. + * @param id The id of the vCPU with respect to the VM. */ - internal data class CpuRequest( - val vm: VmServerContext, - val vcpu: ProcessingUnit, - var burst: Long, - var limit: Double - ) { + private data class VCpu( + val vm: Vm, + val model: ProcessingUnit, + val id: Int + ) : Comparable<VCpu> { + /** + * The current limit on the vCPU. + */ + var limit: Double = 0.0 + + /** + * The limit allocated by the hypervisor. + */ + var allocatedLimit: Double = 0.0 + /** - * The usage that was actually granted. + * The current burst running on the vCPU. */ - var allocatedUsage: Double = 0.0 + var burst: Long = 0L + + /** + * Consume the specified burst on this vCPU. + */ + fun consume(burst: Long): Boolean { + this.burst = max(0, this.burst - burst) + + // Flush the result to the slice if it exists + vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst) + + return allocatedLimit > 0.0 && this.burst == 0L + } + + /** + * Refresh the information of this vCPU based on the current slice. + */ + fun refresh() { + limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 + burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 + } /** - * A flag to indicate the request was cancelled. + * Compare to another vCPU based on the current load of the vCPU. */ - var isCancelled: Boolean = false + override fun compareTo(other: VCpu): Int { + var cmp = limit.compareTo(other.limit) - override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu - override fun hashCode(): Int = Objects.hash(vm, vcpu) + if (cmp != 0) { + return cmp + } + + cmp = vm.ctx.server.uid.compareTo(other.vm.ctx.server.uid) + + if (cmp != 0) { + return cmp + } + + return id.compareTo(other.id) + } + + /** + * Create a string representation of the vCPU. + */ + override fun toString(): String = + "vCPU(vm=${vm.ctx.server.uid},id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" } - internal inner class VmServerContext( - server: Server, - val events: EventFlow<ServerEvent>, - val domain: Domain - ) : ServerManagementContext { + /** + * The execution context in which a VM runs. + * + * @param server The details of the VM. + * @param events The event stream to publish to. + */ + private inner class VmServerContext(server: Server, val events: EventFlow<ServerEvent>) : ServerManagementContext, DisposableHandle { private var finalized: Boolean = false - lateinit var burst: LongArray - var deadline: Long = 0L - var chan: Continuation<Unit>? = null private var initialized: Boolean = false + private val vm: Vm internal val job: Job = launch { delay(1) // TODO Introduce boot time @@ -423,6 +561,10 @@ class SimpleVirtDriver( override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) + init { + vm = Vm(this) + } + override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { server = server.copy(services = server.services.put(key, service)) events.emit(ServerEvent.ServicePublished(server, key)) @@ -451,37 +593,33 @@ class SimpleVirtDriver( events.close() } - override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { - require(burst.size == limit.size) { "Array dimensions do not match" } - this.deadline = deadline - this.burst = burst - - val requests = cpus.asSequence() - .take(burst.size) - .mapIndexed { i, cpu -> - CpuRequest( - this, - cpu, - burst[i], - limit[i] - ) + @OptIn(InternalCoroutinesApi::class) + override fun onRun( + batch: Sequence<ServerContext.Slice>, + triggerMode: ServerContext.TriggerMode, + merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice + ): SelectClause0 = object : SelectClause0 { + @InternalCoroutinesApi + override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { + vm.triggerMode = triggerMode + vm.merge = merge + vm.select = { + if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } } - .toList() - - // Wait until the burst has been run or the coroutine is cancelled - try { - schedulingQueue.offer(SchedulerCommand.Schedule(this, requests)) - suspendCoroutine<Unit> { chan = it } - } catch (e: CancellationException) { - // Deschedule the VM - requests.forEach { it.isCancelled = true } - schedulingQueue.offer(SchedulerCommand.Interrupt) - suspendCoroutine<Unit> { chan = it } - e.assertFailure() + vm.schedule(batch) + // Indicate to the hypervisor that the VM should be re-scheduled + schedulingQueue.offer(SchedulerCommand.Schedule(vm)) + select.disposeOnSelect(this@VmServerContext) } } - @OptIn(InternalCoroutinesApi::class) - override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 = TODO() + override fun dispose() { + if (!vm.isIdle) { + vm.cancel() + schedulingQueue.offer(SchedulerCommand.Deschedule(vm)) + } + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index c3d9c745..ff4aa3d7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -180,7 +180,7 @@ class SimpleVirtProvisioningService( } try { - logger.info { "Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } + logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -214,7 +214,7 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { - logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } + logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( this@SimpleVirtProvisioningService, @@ -254,6 +254,8 @@ class SimpleVirtProvisioningService( private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { + logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" } + if (server in hypervisors) { // Corner case for when the hypervisor already exists availableHypervisors += hypervisors.getValue(server) @@ -280,8 +282,14 @@ class SimpleVirtProvisioningService( queuedVms, unscheduledVms )) + + // Re-schedule on the new machine + if (incomingImages.isNotEmpty()) { + requestCycle() + } } ServerState.SHUTOFF, ServerState.ERROR -> { + logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } val hv = hypervisors[server] ?: return availableHypervisors -= hv diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 0fc64373..071c0626 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,12 +25,15 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext @@ -61,9 +64,15 @@ internal class SimpleBareMetalDriverTest { driver.init() driver.setImage(image) val server = driver.start().server!! + driver.usage + .onEach { println("${simulationContext.clock.millis()} $it") } + .launchIn(this) server.events.collect { event -> when (event) { - is ServerEvent.StateChanged -> { println(event); finalState = event.server.state } + is ServerEvent.StateChanged -> { + println("${simulationContext.clock.millis()} $event") + finalState = event.server.state + } } } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 4f3abc02..ca00fc94 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -29,6 +29,8 @@ import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.image.FlopsApplicationImage +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -37,7 +39,10 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll import java.util.ServiceLoader import java.util.UUID @@ -50,6 +55,7 @@ internal class HypervisorTest { */ @OptIn(ExperimentalCoroutinesApi::class) @Test + @Disabled fun smoke() { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider("test") @@ -87,4 +93,75 @@ internal class HypervisorTest { system.terminate() } } + + /** + * Test overcommissioning of a hypervisor. + */ + @Test + fun overcommission() { + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider("test") + val root = system.newDomain("root") + + var requestedBurst = 0L + var grantedBurst = 0L + var overcommissionedBurst = 0L + + root.launch { + val vmm = HypervisorImage + val duration = 5 * 60L + val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) + ), 2, 0) + val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) + ), 2, 0) + + val driverDom = root.newDomain("driver") + + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) } + val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + + metalDriver.init() + metalDriver.setImage(vmm) + metalDriver.start() + + delay(5) + + val flavor = Flavor(2, 0) + val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] + vmDriver.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> { + requestedBurst += event.requestedBurst + grantedBurst += event.grantedBurst + overcommissionedBurst += event.overcommissionedBurst + } + } + } + .launchIn(this) + + vmDriver.spawn("a", vmImageA, flavor) + vmDriver.spawn("b", vmImageB, flavor) + } + + runBlocking { + system.run() + system.terminate() + } + + assertAll( + { assertEquals(2073600, requestedBurst, "Requested Burst does not match") }, + { assertEquals(2013600, grantedBurst, "Granted Burst does not match") }, + { assertEquals(60000, overcommissionedBurst, "Overcommissioned Burst does not match") } + ) + } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 83952d43..9d2b0247 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -172,7 +172,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp .onEach { event -> when (event) { is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( - simulationContext.clock.millis(), + clock.millis(), event.requestedBurst, event.grantedBurst, event.overcommissionedBurst, @@ -242,10 +242,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP // Monitor server events server.events .onEach { - val time = simulationContext.clock.millis() - if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(time, it.server) + monitor.reportVmStateChange(simulationContext.clock.millis(), it.server) } delay(1) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt index 138905a4..7f71eb3e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.experiments.sc20.experiment.monitor import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import com.atlarge.opendc.experiments.sc20.experiment.Run @@ -54,9 +53,17 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"), run.parent.parent.parent.bufferSize ) - private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() + private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) {} + override fun reportVmStateChange(time: Long, server: Server) { + if (startTime < 0) { + startTime = time + + // Update timestamp of initial event + currentHostEvent.replaceAll { k, v -> v.copy(timestamp = startTime) } + } + } override fun reportHostStateChange( time: Long, @@ -65,27 +72,31 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { ) { logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = time - lastServerState.second - reportHostSlice( - time, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - duration - ) + val previousEvent = currentHostEvent[server] - lastServerStates.remove(server) - lastPowerConsumption.remove(server) - } else { - lastServerStates[server] = Pair(server.state, time) - } + val roundedTime = previousEvent?.let { + val duration = time - it.timestamp + val k = 5 * 60 * 1000L // 5 min in ms + val rem = duration % k + + if (rem == 0L) { + time + } else { + it.timestamp + duration + k - rem + } + } ?: time + + reportHostSlice( + roundedTime, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server + ) } private val lastPowerConsumption = mutableMapOf<Server, Double>() @@ -106,23 +117,62 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { hostServer: Server, duration: Long ) { - lastServerStates[hostServer] = Pair(hostServer.state, time) + val previousEvent = currentHostEvent[hostServer] + when { + previousEvent == null -> { + val event = HostEvent( + time, + 5 * 60 * 1000L, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) - hostWriter.write( - HostEvent( - time, - duration, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 - ) - ) + currentHostEvent[hostServer] = event + } + previousEvent.timestamp == time -> { + val event = HostEvent( + time, + previousEvent.duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) + + currentHostEvent[hostServer] = event + } + else -> { + hostWriter.write(previousEvent) + + val event = HostEvent( + time, + time - previousEvent.timestamp, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) + + currentHostEvent[hostServer] = event + } + } } override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { @@ -141,6 +191,12 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { } override fun close() { + // Flush remaining events + for ((_, event) in currentHostEvent) { + hostWriter.write(event) + } + currentHostEvent.clear() + hostWriter.close() provisionerWriter.close() } diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index abd5c961..68c2cbc5 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -47,9 +47,11 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll import java.io.File import java.util.ServiceLoader @@ -134,6 +136,7 @@ class Sc20IntegrationTest { failureDomain?.cancel() scheduler.terminate() + monitor.close() } runSimulation() @@ -147,6 +150,48 @@ class Sc20IntegrationTest { assertEquals(0, monitor.totalInterferedBurst) } + @Test + fun small() { + val seed = 1 + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader(0.5, seed) + val environmentReader = createTestEnvironmentReader("single") + lateinit var scheduler: SimpleVirtProvisioningService + + root.launch { + val res = createProvisioner( + root, + environmentReader, + allocationPolicy + ) + scheduler = res.second + + attachMonitor(scheduler, monitor) + processTrace( + traceReader, + scheduler, + chan, + monitor + ) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + scheduler.terminate() + monitor.close() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(96344114723, monitor.totalRequestedBurst) }, + { assertEquals(96324378235, monitor.totalGrantedBurst) }, + { assertEquals(19736424, monitor.totalOvercommissionedBurst) }, + { assertEquals(0, monitor.totalInterferedBurst) } + ) + } + /** * Run the simulation. */ @@ -157,20 +202,20 @@ class Sc20IntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(): TraceReader<VmWorkload> { + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> { return Sc20ParquetTraceReader( Sc20RawParquetTraceReader(File("src/test/resources/trace")), emptyMap(), - Workload("test", 1.0), - 0 + Workload("test", fraction), + seed ) } /** * Obtain the environment reader for the test. */ - private fun createTestEnvironmentReader(): EnvironmentReader { - val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt") + private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { + val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") return Sc20ClusterEnvironmentReader(stream) } @@ -197,6 +242,7 @@ class Sc20IntegrationTest { totalOvercommissionedBurst += overcommissionedBurst totalInterferedBurst += interferedBurst } + override fun close() {} } } diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt new file mode 100644 index 00000000..53b3c2d7 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt @@ -0,0 +1,3 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;8;3.2;64;1;64;8 + |
