diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-19 02:00:39 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-20 01:36:02 +0200 |
| commit | ee9f39c98d2d00586ac33767bb3205774981a58f (patch) | |
| tree | 7af08c4dc4c2dffbf2718b2d92990a16fea0b75b | |
| parent | df5d9363e4e3558cb6e2f7f421412548b6d7d36a (diff) | |
perf: Add support for slice batches in VirtDriver
7 files changed, 296 insertions, 118 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 0133832c..e675b877 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -126,6 +126,7 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine * Schedule the specified event to be processed by the engine. */ private fun schedule(@Async.Schedule event: Event) { + assert(event.time >= clock.time) { "Message scheduled in the past [received=${event.time}, actual=${clock.time}]" } queue.add(event) } diff --git a/opendc/opendc-compute/build.gradle.kts b/opendc/opendc-compute/build.gradle.kts index 7d43b064..acdcd5a7 100644 --- a/opendc/opendc-compute/build.gradle.kts +++ b/opendc/opendc-compute/build.gradle.kts @@ -36,6 +36,7 @@ dependencies { implementation("io.github.microutils:kotlin-logging:1.7.9") testRuntimeOnly(project(":odcsim:odcsim-engine-omega")) + testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 30a091b1..c0abdd76 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -4,7 +4,6 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer import kotlinx.coroutines.Job -import kotlinx.coroutines.delay import kotlinx.coroutines.ensureActive import java.util.UUID import kotlin.coroutines.coroutineContext @@ -23,18 +22,20 @@ class VmImage( val clock = simulationContext.clock val job = coroutineContext[Job]!! - for (fragment in flopsHistory) { + for (fragments in flopsHistory.chunked(1024)) { job.ensureActive() - if (fragment.flops == 0L) { - delay(fragment.duration) - } else { + var offset = clock.millis() + + val batch = fragments.map { fragment -> val cores = min(fragment.cores, ctx.server.flavor.cpuCount) val burst = LongArray(cores) { fragment.flops / cores } val usage = DoubleArray(cores) { fragment.usage / cores } - - ctx.run(ServerContext.Slice(burst, usage, clock.millis() + fragment.duration)) + offset += fragment.duration + ServerContext.Slice(burst, usage, offset) } + + ctx.run(batch) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 2d885a8c..48937001 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -28,7 +28,6 @@ import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.flow.StateFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor @@ -66,7 +65,6 @@ import kotlin.math.min import kotlinx.coroutines.withContext import java.lang.Exception import kotlin.coroutines.ContinuationInterceptor -import kotlin.math.round import kotlin.random.Random /** @@ -286,16 +284,18 @@ public class SimpleBareMetalDriver( val isLastSlice = !queue.hasNext() val work = SliceWork(slice) - val duration = when (triggerMode) { - ServerContext.TriggerMode.FIRST -> min(work.minExit, slice.deadline - start) - ServerContext.TriggerMode.LAST -> min(work.maxExit, slice.deadline - start) + val candidateDuration = when (triggerMode) { + ServerContext.TriggerMode.FIRST -> work.minExit + ServerContext.TriggerMode.LAST -> work.maxExit ServerContext.TriggerMode.DEADLINE -> slice.deadline - start } + // Check whether the deadline is exceeded during the run of the slice. + val duration = min(candidateDuration, slice.deadline - start) + val action = Runnable { currentWork = null - // Flush all the work that was performed val hasFinished = work.stop(duration) @@ -368,10 +368,16 @@ public class SimpleBareMetalDriver( */ public val totalUsage: Double + /** + * A flag to indicate that this slice is empty. + */ + public val isEmpty: Boolean + init { var totalUsage = 0.0 var minExit = Long.MAX_VALUE var maxExit = 0L + var nonEmpty = false // Determine the duration of the first/last CPU to finish for (i in 0 until min(cpus.size, slice.burst.size)) { @@ -384,9 +390,11 @@ public class SimpleBareMetalDriver( if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst minExit = min(minExit, cpuDuration) maxExit = max(maxExit, cpuDuration) + nonEmpty = true } } + this.isEmpty = !nonEmpty this.totalUsage = totalUsage this.minExit = minExit this.maxExit = maxExit diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 9eab3353..b815a7ab 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.compute.virt.driver -import com.atlarge.odcsim.Domain import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor @@ -35,7 +34,6 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException -import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey @@ -44,6 +42,7 @@ import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi @@ -51,15 +50,13 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select -import java.util.Objects import java.util.TreeSet import java.util.UUID -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -81,7 +78,7 @@ class SimpleVirtDriver( /** * A set for tracking the VM context objects. */ - internal val vms: MutableSet<VmServerContext> = mutableSetOf() + private val vms: MutableSet<VmServerContext> = mutableSetOf() /** * Current total memory use of the images on this hypervisor. @@ -125,7 +122,7 @@ class SimpleVirtDriver( ServiceRegistry(), events ) availableMemory -= requiredMemory - vms.add(VmServerContext(server, events, simulationContext.domain)) + vms.add(VmServerContext(server, events)) vmStarted(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server @@ -156,9 +153,9 @@ class SimpleVirtDriver( */ private sealed class SchedulerCommand { /** - * Schedule the specified vCPUs of a single VM. + * Refresh the dirty datastructures of the specified VM. */ - data class Schedule(val vm: VmServerContext, val requests: Collection<CpuRequest>) : SchedulerCommand() + data class Refresh(val vm: Vm) : SchedulerCommand() /** * Interrupt the scheduler. @@ -184,19 +181,25 @@ class SimpleVirtDriver( val maxUsage = hostContext.cpus.sumByDouble { it.frequency } val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } - val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>() - val requests = TreeSet(cpuRequestComparator) + val vms = mutableSetOf<Vm>() + val vcpus = TreeSet<VCpu>() val usage = DoubleArray(hostContext.cpus.size) val burst = LongArray(hostContext.cpus.size) fun process(command: SchedulerCommand) { when (command) { - is SchedulerCommand.Schedule -> { - vms[command.vm] = command.requests - requests.removeAll(command.requests) - requests.addAll(command.requests) + is SchedulerCommand.Refresh -> { + if (command.vm.isIdle) { + vms -= command.vm + vcpus.removeAll(command.vm.vcpus) + } else { + vms += command.vm + vcpus.removeAll(command.vm.vcpus) + vcpus.addAll(command.vm.vcpus) + } } + is SchedulerCommand.Interrupt -> {} } } @@ -210,7 +213,7 @@ class SimpleVirtDriver( while (!stopped) { // Wait for a request to be submitted if we have no work yet. - if (requests.isEmpty()) { + if (vcpus.isEmpty()) { process(schedulingQueue.receive()) } @@ -226,20 +229,28 @@ class SimpleVirtDriver( var totalRequestedBurst = 0L // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - for ((i, req) in requests.withIndex()) { - val remaining = requests.size - i + for ((i, req) in vcpus.withIndex()) { + val remaining = vcpus.size - i val availableShare = availableUsage / remaining val grantedUsage = min(req.limit, availableShare) + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, req.vm.deadline) + + // Ignore empty CPUs + if (grantedUsage <= 0 || req.burst <= 0) { + req.allocatedLimit = 0.0 + continue + } + totalRequestedUsage += req.limit totalRequestedBurst += req.burst - req.allocatedUsage = grantedUsage + req.allocatedLimit = grantedUsage availableUsage -= grantedUsage // The duration that we want to run is that of the shortest request from a vCPU duration = min(duration, req.burst / grantedUsage) - deadline = min(deadline, req.vm.deadline) } // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. @@ -267,7 +278,7 @@ class SimpleVirtDriver( // time, so not all of the burst may be executed. select<Boolean> { schedulingQueue.onReceive { schedulingQueue.offer(it); true } - hostContext.onRun(ServerContext.Slice(burst, usage, deadline)).invoke { false } + hostContext.onRun(ServerContext.Slice(burst, usage, deadline), ServerContext.TriggerMode.DEADLINE).invoke { false } } val end = clock.millis() @@ -278,7 +289,7 @@ class SimpleVirtDriver( } // The total requested burst that the VMs wanted to run in the time-frame that we ran. - val totalRequestedSubBurst = requests.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() + val totalRequestedSubBurst = vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() val totalRemainder = burst.sum() val totalGrantedBurst = totalAllocatedBurst - totalRemainder @@ -287,19 +298,19 @@ class SimpleVirtDriver( // The burst that was lost due to interference. var totalInterferedBurst = 0L - val entryIterator = vms.entries.iterator() - while (entryIterator.hasNext()) { - val (vm, vmRequests) = entryIterator.next() + val vmIterator = vms.iterator() + while (vmIterator.hasNext()) { + val vm = vmIterator.next() // Apply performance interference model val performanceModel = - vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + vm.ctx.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? val performanceScore = performanceModel?.apply(serverLoad) ?: 1.0 var hasFinished = false - for ((i, req) in vmRequests.withIndex()) { + for (vcpu in vm.vcpus) { // Compute the fraction of compute time allocated to the VM - val fraction = req.allocatedUsage / totalAllocatedUsage + val fraction = vcpu.allocatedLimit / totalAllocatedUsage // Compute the burst time that the VM was actually granted val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() @@ -310,25 +321,23 @@ class SimpleVirtDriver( totalInterferedBurst += grantedBurst - usedBurst // Compute remaining burst time to be executed for the request - req.burst = max(0, vm.burst[i] - usedBurst) - vm.burst[i] = req.burst - - if (req.burst <= 0L || req.isCancelled) { + if (vcpu.consume(usedBurst)) { hasFinished = true } else if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) { // Request must have its entire burst consumed or otherwise we have overcommission // Note that we count the overcommissioned burst if the hypervisor has failed. - totalOvercommissionedBurst += req.burst + totalOvercommissionedBurst += vcpu.burst } } if (hasFinished || vm.deadline <= end) { - // Deschedule all requests from this VM - entryIterator.remove() - requests.removeAll(vmRequests) - - // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan?.resume(Unit) + vcpus.removeAll(vm.vcpus) + // Mark the VM as finished and deschedule the VMs if needed + if (vm.finish()) { + vmIterator.remove() + } else { + vcpus.addAll(vm.vcpus) + } } } @@ -349,57 +358,170 @@ class SimpleVirtDriver( } /** - * The [Comparator] for [CpuRequest]. + * A virtual machine running on the hypervisor. + * + * @param ctx The execution context the vCPU runs in. + * @param triggerMode The mode when to trigger the VM exit. + * @param merge The function to merge consecutive slices on spillover. + * @param select The function to select on finish. */ - private val cpuRequestComparator: Comparator<CpuRequest> = Comparator { lhs, rhs -> - var cmp = lhs.limit.compareTo(rhs.limit) + @OptIn(InternalCoroutinesApi::class) + private data class Vm( + val ctx: VmServerContext, + var triggerMode: ServerContext.TriggerMode = ServerContext.TriggerMode.FIRST, + var merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice = { _, r -> r }, + var select: () -> Unit = {} + ) { + /** + * The vCPUs of this virtual machine. + */ + val vcpus: List<VCpu> + + /** + * The slices that the VM wants to run. + */ + var queue: Iterator<ServerContext.Slice> = emptyList<ServerContext.Slice>().iterator() + + /** + * The current active slice. + */ + var activeSlice: ServerContext.Slice? = null + + /** + * The current deadline of the VM. + */ + val deadline: Long + get() = activeSlice?.deadline ?: Long.MAX_VALUE + + /** + * A flag to indicate that the VM is idle. + */ + val isIdle: Boolean + get() = activeSlice == null - if (cmp != 0) { - return@Comparator cmp + init { + vcpus = ctx.cpus.mapIndexed { i, model -> VCpu(this, model, i) } } - cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid) + /** + * Schedule the given slices on this vCPU, replacing the existing slices. + */ + fun schedule(slices: List<ServerContext.Slice>) { + queue = slices.iterator() - if (cmp != 0) { - return@Comparator cmp + if (queue.hasNext()) { + activeSlice = queue.next() + } } - lhs.vcpu.id.compareTo(rhs.vcpu.id) + /** + * Cancel the existing workload on the VM. + */ + fun cancel() { + queue = emptyList<ServerContext.Slice>().iterator() + activeSlice = null + } + + /** + * Finish the current slice of the VM. + * + * @return `true` if the vCPUs may be descheduled, `false` otherwise. + */ + fun finish(): Boolean { + val activeSlice = activeSlice ?: return true + + return if (queue.hasNext()) { + val needsMerge = activeSlice.burst.any { it > 0 } + val candidateSlice = queue.next() + val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice + + this.activeSlice = slice + false + } else { + this.activeSlice = null + select() + true + } + } } /** - * A request to schedule a virtual CPU on the host cpu. + * A virtual CPU that can be scheduled on a physical CPU. + * + * @param vm The VM of which this vCPU is part. + * @param model The model of CPU that this vCPU models. + * @param id The id of the vCPU with respect to the VM. */ - internal data class CpuRequest( - val vm: VmServerContext, - val vcpu: ProcessingUnit, - var burst: Long, - var limit: Double - ) { + private data class VCpu( + val vm: Vm, + val model: ProcessingUnit, + val id: Int + ) : Comparable<VCpu> { /** - * The usage that was actually granted. + * The current limit on the vCPU. */ - var allocatedUsage: Double = 0.0 + val limit: Double + get() = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 /** - * A flag to indicate the request was cancelled. + * The limit allocated by the hypervisor. */ - var isCancelled: Boolean = false + var allocatedLimit: Double = 0.0 - override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu - override fun hashCode(): Int = Objects.hash(vm, vcpu) + /** + * The current burst running on the vCPU. + */ + var burst: Long + get() = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 + set(value) { + vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, value) + } + + /** + * Consume the specified burst on this vCPU. + */ + fun consume(burst: Long): Boolean { + this.burst = max(0, this.burst - burst) + return this.burst == 0L + } + + /** + * Compare to another vCPU based on the current load of the vCPU. + */ + override fun compareTo(other: VCpu): Int { + var cmp = limit.compareTo(other.limit) + + if (cmp != 0) { + return cmp + } + + cmp = vm.ctx.server.uid.compareTo(other.vm.ctx.server.uid) + + if (cmp != 0) { + return cmp + } + + return id.compareTo(other.id) + } + + /** + * Create a string representation of the vCPU. + */ + override fun toString(): String = + "vCPU(vm=${vm.ctx.server.uid},id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" } - internal inner class VmServerContext( - server: Server, - val events: EventFlow<ServerEvent>, - val domain: Domain - ) : ServerManagementContext { + + /** + * The execution context in which a VM runs. + * + * @param server The details of the VM. + * @param events The event stream to publish to. + */ + private inner class VmServerContext(server: Server, val events: EventFlow<ServerEvent>) : ServerManagementContext, DisposableHandle { private var finalized: Boolean = false - lateinit var burst: LongArray - var deadline: Long = 0L - var chan: Continuation<Unit>? = null private var initialized: Boolean = false + private val vm: Vm internal val job: Job = launch { delay(1) // TODO Introduce boot time @@ -408,6 +530,7 @@ class SimpleVirtDriver( server.image(this@VmServerContext) exit() } catch (cause: Throwable) { + cause.printStackTrace() exit(cause) } } @@ -423,6 +546,10 @@ class SimpleVirtDriver( override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) + init { + vm = Vm(this) + } + override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { server = server.copy(services = server.services.put(key, service)) events.emit(ServerEvent.ServicePublished(server, key)) @@ -451,40 +578,33 @@ class SimpleVirtDriver( events.close() } - override suspend fun run(slice: ServerContext.Slice, triggerMode: ServerContext.TriggerMode) { - deadline = slice.deadline - burst = slice.burst - - val requests = cpus.asSequence() - .take(burst.size) - .mapIndexed { i, cpu -> - CpuRequest( - this, - cpu, - burst[i], - slice.limit[i] - ) - } - .toList() - - // Wait until the burst has been run or the coroutine is cancelled - try { - schedulingQueue.offer(SchedulerCommand.Schedule(this, requests)) - suspendCoroutine<Unit> { chan = it } - } catch (e: CancellationException) { - // Deschedule the VM - requests.forEach { it.isCancelled = true } - schedulingQueue.offer(SchedulerCommand.Interrupt) - suspendCoroutine<Unit> { chan = it } - e.assertFailure() - } - } - @OptIn(InternalCoroutinesApi::class) override fun onRun( batch: List<ServerContext.Slice>, triggerMode: ServerContext.TriggerMode, merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice - ): SelectClause0 = TODO() + ): SelectClause0 = object : SelectClause0 { + @InternalCoroutinesApi + override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { + vm.triggerMode = triggerMode + vm.merge = merge + vm.select = { + if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } + } + vm.schedule(batch) + // Indicate to the hypervisor that the VM should be re-scheduled + schedulingQueue.offer(SchedulerCommand.Refresh(vm)) + select.disposeOnSelect(this@VmServerContext) + } + } + + override fun dispose() { + if (!vm.isIdle) { + vm.cancel() + schedulingQueue.offer(SchedulerCommand.Refresh(vm)) + } + } } } diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index abd5c961..0a4a56a8 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -47,9 +47,11 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll import java.io.File import java.util.ServiceLoader @@ -147,6 +149,48 @@ class Sc20IntegrationTest { assertEquals(0, monitor.totalInterferedBurst) } + @Test + fun small() { + val seed = 1 + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader(0.5, seed) + val environmentReader = createTestEnvironmentReader("single") + lateinit var scheduler: SimpleVirtProvisioningService + + root.launch { + val res = createProvisioner( + root, + environmentReader, + allocationPolicy + ) + scheduler = res.second + + + attachMonitor(scheduler, monitor) + processTrace( + traceReader, + scheduler, + chan, + monitor + ) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + scheduler.terminate() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(96344114723, monitor.totalRequestedBurst) }, + { assertEquals(96324378235, monitor.totalGrantedBurst) }, + { assertEquals(19736424, monitor.totalOvercommissionedBurst) }, + { assertEquals(0, monitor.totalInterferedBurst) } + ) + } + /** * Run the simulation. */ @@ -157,20 +201,20 @@ class Sc20IntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(): TraceReader<VmWorkload> { + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> { return Sc20ParquetTraceReader( Sc20RawParquetTraceReader(File("src/test/resources/trace")), emptyMap(), - Workload("test", 1.0), - 0 + Workload("test", fraction), + seed ) } /** * Obtain the environment reader for the test. */ - private fun createTestEnvironmentReader(): EnvironmentReader { - val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt") + private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { + val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") return Sc20ClusterEnvironmentReader(stream) } diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt new file mode 100644 index 00000000..53b3c2d7 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt @@ -0,0 +1,3 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;8;3.2;64;1;64;8 + |
