summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/build.gradle.kts1
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt15
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt20
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt320
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt54
-rw-r--r--opendc/opendc-experiments-sc20/src/test/resources/env/single.txt3
6 files changed, 295 insertions, 118 deletions
diff --git a/opendc/opendc-compute/build.gradle.kts b/opendc/opendc-compute/build.gradle.kts
index 7d43b064..acdcd5a7 100644
--- a/opendc/opendc-compute/build.gradle.kts
+++ b/opendc/opendc-compute/build.gradle.kts
@@ -36,6 +36,7 @@ dependencies {
implementation("io.github.microutils:kotlin-logging:1.7.9")
testRuntimeOnly(project(":odcsim:odcsim-engine-omega"))
+ testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}")
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index 30a091b1..c0abdd76 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -4,7 +4,6 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
import kotlinx.coroutines.ensureActive
import java.util.UUID
import kotlin.coroutines.coroutineContext
@@ -23,18 +22,20 @@ class VmImage(
val clock = simulationContext.clock
val job = coroutineContext[Job]!!
- for (fragment in flopsHistory) {
+ for (fragments in flopsHistory.chunked(1024)) {
job.ensureActive()
- if (fragment.flops == 0L) {
- delay(fragment.duration)
- } else {
+ var offset = clock.millis()
+
+ val batch = fragments.map { fragment ->
val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
val burst = LongArray(cores) { fragment.flops / cores }
val usage = DoubleArray(cores) { fragment.usage / cores }
-
- ctx.run(ServerContext.Slice(burst, usage, clock.millis() + fragment.duration))
+ offset += fragment.duration
+ ServerContext.Slice(burst, usage, offset)
}
+
+ ctx.run(batch)
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 2d885a8c..48937001 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -28,7 +28,6 @@ import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.flow.StateFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
@@ -66,7 +65,6 @@ import kotlin.math.min
import kotlinx.coroutines.withContext
import java.lang.Exception
import kotlin.coroutines.ContinuationInterceptor
-import kotlin.math.round
import kotlin.random.Random
/**
@@ -286,16 +284,18 @@ public class SimpleBareMetalDriver(
val isLastSlice = !queue.hasNext()
val work = SliceWork(slice)
- val duration = when (triggerMode) {
- ServerContext.TriggerMode.FIRST -> min(work.minExit, slice.deadline - start)
- ServerContext.TriggerMode.LAST -> min(work.maxExit, slice.deadline - start)
+ val candidateDuration = when (triggerMode) {
+ ServerContext.TriggerMode.FIRST -> work.minExit
+ ServerContext.TriggerMode.LAST -> work.maxExit
ServerContext.TriggerMode.DEADLINE -> slice.deadline - start
}
+ // Check whether the deadline is exceeded during the run of the slice.
+ val duration = min(candidateDuration, slice.deadline - start)
+
val action = Runnable {
currentWork = null
-
// Flush all the work that was performed
val hasFinished = work.stop(duration)
@@ -368,10 +368,16 @@ public class SimpleBareMetalDriver(
*/
public val totalUsage: Double
+ /**
+ * A flag to indicate that this slice is empty.
+ */
+ public val isEmpty: Boolean
+
init {
var totalUsage = 0.0
var minExit = Long.MAX_VALUE
var maxExit = 0L
+ var nonEmpty = false
// Determine the duration of the first/last CPU to finish
for (i in 0 until min(cpus.size, slice.burst.size)) {
@@ -384,9 +390,11 @@ public class SimpleBareMetalDriver(
if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
minExit = min(minExit, cpuDuration)
maxExit = max(maxExit, cpuDuration)
+ nonEmpty = true
}
}
+ this.isEmpty = !nonEmpty
this.totalUsage = totalUsage
this.minExit = minExit
this.maxExit = maxExit
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 9eab3353..b815a7ab 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.compute.virt.driver
-import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
@@ -35,7 +34,6 @@ import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
-import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
@@ -44,6 +42,7 @@ import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
@@ -51,15 +50,13 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
+import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
-import java.util.Objects
import java.util.TreeSet
import java.util.UUID
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.coroutines.suspendCoroutine
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -81,7 +78,7 @@ class SimpleVirtDriver(
/**
* A set for tracking the VM context objects.
*/
- internal val vms: MutableSet<VmServerContext> = mutableSetOf()
+ private val vms: MutableSet<VmServerContext> = mutableSetOf()
/**
* Current total memory use of the images on this hypervisor.
@@ -125,7 +122,7 @@ class SimpleVirtDriver(
ServiceRegistry(), events
)
availableMemory -= requiredMemory
- vms.add(VmServerContext(server, events, simulationContext.domain))
+ vms.add(VmServerContext(server, events))
vmStarted(server)
eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
@@ -156,9 +153,9 @@ class SimpleVirtDriver(
*/
private sealed class SchedulerCommand {
/**
- * Schedule the specified vCPUs of a single VM.
+ * Refresh the dirty datastructures of the specified VM.
*/
- data class Schedule(val vm: VmServerContext, val requests: Collection<CpuRequest>) : SchedulerCommand()
+ data class Refresh(val vm: Vm) : SchedulerCommand()
/**
* Interrupt the scheduler.
@@ -184,19 +181,25 @@ class SimpleVirtDriver(
val maxUsage = hostContext.cpus.sumByDouble { it.frequency }
val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }
- val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>()
- val requests = TreeSet(cpuRequestComparator)
+ val vms = mutableSetOf<Vm>()
+ val vcpus = TreeSet<VCpu>()
val usage = DoubleArray(hostContext.cpus.size)
val burst = LongArray(hostContext.cpus.size)
fun process(command: SchedulerCommand) {
when (command) {
- is SchedulerCommand.Schedule -> {
- vms[command.vm] = command.requests
- requests.removeAll(command.requests)
- requests.addAll(command.requests)
+ is SchedulerCommand.Refresh -> {
+ if (command.vm.isIdle) {
+ vms -= command.vm
+ vcpus.removeAll(command.vm.vcpus)
+ } else {
+ vms += command.vm
+ vcpus.removeAll(command.vm.vcpus)
+ vcpus.addAll(command.vm.vcpus)
+ }
}
+ is SchedulerCommand.Interrupt -> {}
}
}
@@ -210,7 +213,7 @@ class SimpleVirtDriver(
while (!stopped) {
// Wait for a request to be submitted if we have no work yet.
- if (requests.isEmpty()) {
+ if (vcpus.isEmpty()) {
process(schedulingQueue.receive())
}
@@ -226,20 +229,28 @@ class SimpleVirtDriver(
var totalRequestedBurst = 0L
// Divide the available host capacity fairly across the vCPUs using max-min fair sharing
- for ((i, req) in requests.withIndex()) {
- val remaining = requests.size - i
+ for ((i, req) in vcpus.withIndex()) {
+ val remaining = vcpus.size - i
val availableShare = availableUsage / remaining
val grantedUsage = min(req.limit, availableShare)
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, req.vm.deadline)
+
+ // Ignore empty CPUs
+ if (grantedUsage <= 0 || req.burst <= 0) {
+ req.allocatedLimit = 0.0
+ continue
+ }
+
totalRequestedUsage += req.limit
totalRequestedBurst += req.burst
- req.allocatedUsage = grantedUsage
+ req.allocatedLimit = grantedUsage
availableUsage -= grantedUsage
// The duration that we want to run is that of the shortest request from a vCPU
duration = min(duration, req.burst / grantedUsage)
- deadline = min(deadline, req.vm.deadline)
}
// XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs.
@@ -267,7 +278,7 @@ class SimpleVirtDriver(
// time, so not all of the burst may be executed.
select<Boolean> {
schedulingQueue.onReceive { schedulingQueue.offer(it); true }
- hostContext.onRun(ServerContext.Slice(burst, usage, deadline)).invoke { false }
+ hostContext.onRun(ServerContext.Slice(burst, usage, deadline), ServerContext.TriggerMode.DEADLINE).invoke { false }
}
val end = clock.millis()
@@ -278,7 +289,7 @@ class SimpleVirtDriver(
}
// The total requested burst that the VMs wanted to run in the time-frame that we ran.
- val totalRequestedSubBurst = requests.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
+ val totalRequestedSubBurst = vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
val totalRemainder = burst.sum()
val totalGrantedBurst = totalAllocatedBurst - totalRemainder
@@ -287,19 +298,19 @@ class SimpleVirtDriver(
// The burst that was lost due to interference.
var totalInterferedBurst = 0L
- val entryIterator = vms.entries.iterator()
- while (entryIterator.hasNext()) {
- val (vm, vmRequests) = entryIterator.next()
+ val vmIterator = vms.iterator()
+ while (vmIterator.hasNext()) {
+ val vm = vmIterator.next()
// Apply performance interference model
val performanceModel =
- vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ vm.ctx.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
val performanceScore = performanceModel?.apply(serverLoad) ?: 1.0
var hasFinished = false
- for ((i, req) in vmRequests.withIndex()) {
+ for (vcpu in vm.vcpus) {
// Compute the fraction of compute time allocated to the VM
- val fraction = req.allocatedUsage / totalAllocatedUsage
+ val fraction = vcpu.allocatedLimit / totalAllocatedUsage
// Compute the burst time that the VM was actually granted
val grantedBurst = ceil(totalGrantedBurst * fraction).toLong()
@@ -310,25 +321,23 @@ class SimpleVirtDriver(
totalInterferedBurst += grantedBurst - usedBurst
// Compute remaining burst time to be executed for the request
- req.burst = max(0, vm.burst[i] - usedBurst)
- vm.burst[i] = req.burst
-
- if (req.burst <= 0L || req.isCancelled) {
+ if (vcpu.consume(usedBurst)) {
hasFinished = true
} else if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) {
// Request must have its entire burst consumed or otherwise we have overcommission
// Note that we count the overcommissioned burst if the hypervisor has failed.
- totalOvercommissionedBurst += req.burst
+ totalOvercommissionedBurst += vcpu.burst
}
}
if (hasFinished || vm.deadline <= end) {
- // Deschedule all requests from this VM
- entryIterator.remove()
- requests.removeAll(vmRequests)
-
- // Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vm.chan?.resume(Unit)
+ vcpus.removeAll(vm.vcpus)
+ // Mark the VM as finished and deschedule the VMs if needed
+ if (vm.finish()) {
+ vmIterator.remove()
+ } else {
+ vcpus.addAll(vm.vcpus)
+ }
}
}
@@ -349,57 +358,170 @@ class SimpleVirtDriver(
}
/**
- * The [Comparator] for [CpuRequest].
+ * A virtual machine running on the hypervisor.
+ *
+ * @param ctx The execution context the vCPU runs in.
+ * @param triggerMode The mode when to trigger the VM exit.
+ * @param merge The function to merge consecutive slices on spillover.
+ * @param select The function to select on finish.
*/
- private val cpuRequestComparator: Comparator<CpuRequest> = Comparator { lhs, rhs ->
- var cmp = lhs.limit.compareTo(rhs.limit)
+ @OptIn(InternalCoroutinesApi::class)
+ private data class Vm(
+ val ctx: VmServerContext,
+ var triggerMode: ServerContext.TriggerMode = ServerContext.TriggerMode.FIRST,
+ var merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice = { _, r -> r },
+ var select: () -> Unit = {}
+ ) {
+ /**
+ * The vCPUs of this virtual machine.
+ */
+ val vcpus: List<VCpu>
+
+ /**
+ * The slices that the VM wants to run.
+ */
+ var queue: Iterator<ServerContext.Slice> = emptyList<ServerContext.Slice>().iterator()
+
+ /**
+ * The current active slice.
+ */
+ var activeSlice: ServerContext.Slice? = null
+
+ /**
+ * The current deadline of the VM.
+ */
+ val deadline: Long
+ get() = activeSlice?.deadline ?: Long.MAX_VALUE
+
+ /**
+ * A flag to indicate that the VM is idle.
+ */
+ val isIdle: Boolean
+ get() = activeSlice == null
- if (cmp != 0) {
- return@Comparator cmp
+ init {
+ vcpus = ctx.cpus.mapIndexed { i, model -> VCpu(this, model, i) }
}
- cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid)
+ /**
+ * Schedule the given slices on this vCPU, replacing the existing slices.
+ */
+ fun schedule(slices: List<ServerContext.Slice>) {
+ queue = slices.iterator()
- if (cmp != 0) {
- return@Comparator cmp
+ if (queue.hasNext()) {
+ activeSlice = queue.next()
+ }
}
- lhs.vcpu.id.compareTo(rhs.vcpu.id)
+ /**
+ * Cancel the existing workload on the VM.
+ */
+ fun cancel() {
+ queue = emptyList<ServerContext.Slice>().iterator()
+ activeSlice = null
+ }
+
+ /**
+ * Finish the current slice of the VM.
+ *
+ * @return `true` if the vCPUs may be descheduled, `false` otherwise.
+ */
+ fun finish(): Boolean {
+ val activeSlice = activeSlice ?: return true
+
+ return if (queue.hasNext()) {
+ val needsMerge = activeSlice.burst.any { it > 0 }
+ val candidateSlice = queue.next()
+ val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice
+
+ this.activeSlice = slice
+ false
+ } else {
+ this.activeSlice = null
+ select()
+ true
+ }
+ }
}
/**
- * A request to schedule a virtual CPU on the host cpu.
+ * A virtual CPU that can be scheduled on a physical CPU.
+ *
+ * @param vm The VM of which this vCPU is part.
+ * @param model The model of CPU that this vCPU models.
+ * @param id The id of the vCPU with respect to the VM.
*/
- internal data class CpuRequest(
- val vm: VmServerContext,
- val vcpu: ProcessingUnit,
- var burst: Long,
- var limit: Double
- ) {
+ private data class VCpu(
+ val vm: Vm,
+ val model: ProcessingUnit,
+ val id: Int
+ ) : Comparable<VCpu> {
/**
- * The usage that was actually granted.
+ * The current limit on the vCPU.
*/
- var allocatedUsage: Double = 0.0
+ val limit: Double
+ get() = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0
/**
- * A flag to indicate the request was cancelled.
+ * The limit allocated by the hypervisor.
*/
- var isCancelled: Boolean = false
+ var allocatedLimit: Double = 0.0
- override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu
- override fun hashCode(): Int = Objects.hash(vm, vcpu)
+ /**
+ * The current burst running on the vCPU.
+ */
+ var burst: Long
+ get() = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0
+ set(value) {
+ vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, value)
+ }
+
+ /**
+ * Consume the specified burst on this vCPU.
+ */
+ fun consume(burst: Long): Boolean {
+ this.burst = max(0, this.burst - burst)
+ return this.burst == 0L
+ }
+
+ /**
+ * Compare to another vCPU based on the current load of the vCPU.
+ */
+ override fun compareTo(other: VCpu): Int {
+ var cmp = limit.compareTo(other.limit)
+
+ if (cmp != 0) {
+ return cmp
+ }
+
+ cmp = vm.ctx.server.uid.compareTo(other.vm.ctx.server.uid)
+
+ if (cmp != 0) {
+ return cmp
+ }
+
+ return id.compareTo(other.id)
+ }
+
+ /**
+ * Create a string representation of the vCPU.
+ */
+ override fun toString(): String =
+ "vCPU(vm=${vm.ctx.server.uid},id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)"
}
- internal inner class VmServerContext(
- server: Server,
- val events: EventFlow<ServerEvent>,
- val domain: Domain
- ) : ServerManagementContext {
+
+ /**
+ * The execution context in which a VM runs.
+ *
+ * @param server The details of the VM.
+ * @param events The event stream to publish to.
+ */
+ private inner class VmServerContext(server: Server, val events: EventFlow<ServerEvent>) : ServerManagementContext, DisposableHandle {
private var finalized: Boolean = false
- lateinit var burst: LongArray
- var deadline: Long = 0L
- var chan: Continuation<Unit>? = null
private var initialized: Boolean = false
+ private val vm: Vm
internal val job: Job = launch {
delay(1) // TODO Introduce boot time
@@ -408,6 +530,7 @@ class SimpleVirtDriver(
server.image(this@VmServerContext)
exit()
} catch (cause: Throwable) {
+ cause.printStackTrace()
exit(cause)
}
}
@@ -423,6 +546,10 @@ class SimpleVirtDriver(
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
+ init {
+ vm = Vm(this)
+ }
+
override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
server = server.copy(services = server.services.put(key, service))
events.emit(ServerEvent.ServicePublished(server, key))
@@ -451,40 +578,33 @@ class SimpleVirtDriver(
events.close()
}
- override suspend fun run(slice: ServerContext.Slice, triggerMode: ServerContext.TriggerMode) {
- deadline = slice.deadline
- burst = slice.burst
-
- val requests = cpus.asSequence()
- .take(burst.size)
- .mapIndexed { i, cpu ->
- CpuRequest(
- this,
- cpu,
- burst[i],
- slice.limit[i]
- )
- }
- .toList()
-
- // Wait until the burst has been run or the coroutine is cancelled
- try {
- schedulingQueue.offer(SchedulerCommand.Schedule(this, requests))
- suspendCoroutine<Unit> { chan = it }
- } catch (e: CancellationException) {
- // Deschedule the VM
- requests.forEach { it.isCancelled = true }
- schedulingQueue.offer(SchedulerCommand.Interrupt)
- suspendCoroutine<Unit> { chan = it }
- e.assertFailure()
- }
- }
-
@OptIn(InternalCoroutinesApi::class)
override fun onRun(
batch: List<ServerContext.Slice>,
triggerMode: ServerContext.TriggerMode,
merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice
- ): SelectClause0 = TODO()
+ ): SelectClause0 = object : SelectClause0 {
+ @InternalCoroutinesApi
+ override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
+ vm.triggerMode = triggerMode
+ vm.merge = merge
+ vm.select = {
+ if (select.trySelect()) {
+ block.startCoroutineCancellable(select.completion)
+ }
+ }
+ vm.schedule(batch)
+ // Indicate to the hypervisor that the VM should be re-scheduled
+ schedulingQueue.offer(SchedulerCommand.Refresh(vm))
+ select.disposeOnSelect(this@VmServerContext)
+ }
+ }
+
+ override fun dispose() {
+ if (!vm.isIdle) {
+ vm.cancel()
+ schedulingQueue.offer(SchedulerCommand.Refresh(vm))
+ }
+ }
}
}
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index abd5c961..0a4a56a8 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -47,9 +47,11 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
import java.io.File
import java.util.ServiceLoader
@@ -147,6 +149,48 @@ class Sc20IntegrationTest {
assertEquals(0, monitor.totalInterferedBurst)
}
+ @Test
+ fun small() {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val traceReader = createTestTraceReader(0.5, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+ lateinit var scheduler: SimpleVirtProvisioningService
+
+ root.launch {
+ val res = createProvisioner(
+ root,
+ environmentReader,
+ allocationPolicy
+ )
+ scheduler = res.second
+
+
+ attachMonitor(scheduler, monitor)
+ processTrace(
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ scheduler.terminate()
+ }
+
+ runSimulation()
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(96344114723, monitor.totalRequestedBurst) },
+ { assertEquals(96324378235, monitor.totalGrantedBurst) },
+ { assertEquals(19736424, monitor.totalOvercommissionedBurst) },
+ { assertEquals(0, monitor.totalInterferedBurst) }
+ )
+ }
+
/**
* Run the simulation.
*/
@@ -157,20 +201,20 @@ class Sc20IntegrationTest {
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(): TraceReader<VmWorkload> {
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> {
return Sc20ParquetTraceReader(
Sc20RawParquetTraceReader(File("src/test/resources/trace")),
emptyMap(),
- Workload("test", 1.0),
- 0
+ Workload("test", fraction),
+ seed
)
}
/**
* Obtain the environment reader for the test.
*/
- private fun createTestEnvironmentReader(): EnvironmentReader {
- val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt")
+ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
+ val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
return Sc20ClusterEnvironmentReader(stream)
}
diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt
new file mode 100644
index 00000000..53b3c2d7
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt
@@ -0,0 +1,3 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;8;3.2;64;1;64;8
+