diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-03 16:29:55 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-03 16:29:55 +0200 |
| commit | c8567a567348e13c341bf1a1ec64ed34ce25815a (patch) | |
| tree | e3818756de1c14846b57bef27adc3cef51a72279 /simulator | |
| parent | ac7016c4c5f15bf20b21e7d34e93d8b963aab231 (diff) | |
Implement VirtDriver using opendc-simulator-compute module
This change adds an implementation of the VirtDriver interface that uses
the functionality provided by the opendc-simulator-compute module.
Diffstat (limited to 'simulator')
12 files changed, 1378 insertions, 238 deletions
diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ComputeSimExecutionContext.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ComputeSimExecutionContext.kt new file mode 100644 index 00000000..3295a8e8 --- /dev/null +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ComputeSimExecutionContext.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.execution + +import org.opendc.compute.core.Server +import org.opendc.simulator.compute.SimExecutionContext + +/** + * Extended [SimExecutionContext] in which workloads within the OpenDC Compute module run. + */ +public interface ComputeSimExecutionContext : SimExecutionContext { + /** + * The server on which the image runs. + */ + public val server: Server +} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimBareMetalDriver.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimBareMetalDriver.kt index e9346a6c..4f440342 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimBareMetalDriver.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimBareMetalDriver.kt @@ -28,6 +28,7 @@ import org.opendc.compute.core.Flavor import org.opendc.compute.core.Server import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState +import org.opendc.compute.core.execution.ComputeSimExecutionContext import org.opendc.compute.core.execution.ShutdownException import org.opendc.compute.core.image.EmptyImage import org.opendc.compute.core.image.Image @@ -38,8 +39,10 @@ import org.opendc.compute.metal.NodeState import org.opendc.compute.metal.power.ConstantPowerModel import org.opendc.core.power.PowerModel import org.opendc.core.services.ServiceRegistry -import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.SimExecutionContext import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.utils.flow.EventFlow import org.opendc.utils.flow.StateFlow import java.time.Clock @@ -99,9 +102,9 @@ public class SimBareMetalDriver( private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) /** - * The [SimMachine] we use to run the workload. + * The [SimBareMetalMachine] we use to run the workload. */ - private val machine = SimMachine(coroutineScope, clock, machine) + private val machine = SimBareMetalMachine(coroutineScope, clock, machine) /** * The [Job] that runs the simulated workload. @@ -136,11 +139,22 @@ public class SimBareMetalDriver( events ) + // Wrap the workload to pass in a ComputeSimExecutionContext + val workload = object : SimWorkload { + override suspend fun run(ctx: SimExecutionContext) { + val wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx { + override val server: Server + get() = nodeState.value.server!! + } + (node.image as SimWorkloadImage).workload.run(wrappedCtx) + } + } + job = coroutineScope.launch { delay(1) // TODO Introduce boot time initMachine() try { - machine.run((node.image as SimWorkloadImage).workload) + machine.run(workload) exitMachine(null) } catch (_: CancellationException) { // Ignored diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriver.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriver.kt new file mode 100644 index 00000000..758315aa --- /dev/null +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriver.kt @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.virt.driver + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch +import org.opendc.compute.core.* +import org.opendc.compute.core.execution.ComputeSimExecutionContext +import org.opendc.compute.core.execution.ShutdownException +import org.opendc.compute.core.image.Image +import org.opendc.compute.core.image.SimWorkloadImage +import org.opendc.compute.virt.HypervisorEvent +import org.opendc.core.services.ServiceRegistry +import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimHypervisor +import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.utils.flow.EventFlow +import java.time.Clock +import java.util.* + +/** + * A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor]. + */ +public class SimVirtDriver( + private val coroutineScope: CoroutineScope, + clock: Clock, + private val ctx: SimExecutionContext +) : VirtDriver { + + /** + * The [EventFlow] to emit the events. + */ + internal val eventFlow = EventFlow<HypervisorEvent>() + + override val events: Flow<HypervisorEvent> = eventFlow + + /** + * Current total memory use of the images on this hypervisor. + */ + private var availableMemory: Long = ctx.machine.memory.map { it.size }.sum() + + /** + * The hypervisor to run multiple workloads. + */ + private val hypervisor = SimHypervisor( + coroutineScope, + clock, + object : SimHypervisor.Listener { + override fun onSliceFinish( + hypervisor: SimHypervisor, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + eventFlow.emit( + HypervisorEvent.SliceFinished( + this@SimVirtDriver, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + vms.size, + (ctx as ComputeSimExecutionContext).server + ) + ) + } + } + ) + + /** + * The virtual machines running on the hypervisor. + */ + private val vms = HashSet<VirtualMachine>() + + override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server { + val requiredMemory = flavor.memorySize + if (availableMemory - requiredMemory < 0) { + throw InsufficientMemoryOnServerException() + } + require(flavor.cpuCount <= ctx.machine.cpus.size) { "Machine does not fit" } + + val events = EventFlow<ServerEvent>() + val server = Server( + UUID.randomUUID(), + name, + emptyMap(), + flavor, + image, + ServerState.BUILD, + ServiceRegistry(), + events + ) + availableMemory -= requiredMemory + vms.add(VirtualMachine(server, events, hypervisor.createMachine(ctx.machine))) + eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) + return server + } + + /** + * A virtual machine instance that the driver manages. + */ + private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, machine: SimMachine) { + val job = coroutineScope.launch { + val workload = object : SimWorkload { + override suspend fun run(ctx: SimExecutionContext) { + val wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx { + override val server: Server + get() = this@VirtualMachine.server + } + (server.image as SimWorkloadImage).workload.run(wrappedCtx) + } + } + + delay(1) // TODO Introduce boot time + init() + try { + machine.run(workload) + exit(null) + } catch (cause: Throwable) { + exit(cause) + } + } + + var server: Server = server + set(value) { + if (field.state != value.state) { + events.emit(ServerEvent.StateChanged(value, field.state)) + } + + field = value + } + + private fun init() { + server = server.copy(state = ServerState.ACTIVE) + } + + private fun exit(cause: Throwable?) { + val serverState = + if (cause == null || (cause is ShutdownException && cause.cause == null)) + ServerState.SHUTOFF + else + ServerState.ERROR + server = server.copy(state = serverState) + availableMemory += server.flavor.memorySize + vms.remove(this) + eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimVirtDriver, vms.size, availableMemory)) + events.close() + } + } + + public suspend fun run() { + hypervisor.run(ctx) + } +} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriverWorkload.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriverWorkload.kt new file mode 100644 index 00000000..91d400a4 --- /dev/null +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriverWorkload.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.virt.driver + +import kotlinx.coroutines.coroutineScope +import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.workload.SimWorkload + +public class SimVirtDriverWorkload : SimWorkload { + public lateinit var driver: SimVirtDriver + + override suspend fun run(ctx: SimExecutionContext) { + coroutineScope { + driver = SimVirtDriver(this, ctx.clock, ctx) + driver.run() + } + } +} diff --git a/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/HypervisorTest.kt index 369b9538..82a82044 100644 --- a/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/HypervisorTest.kt +++ b/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/HypervisorTest.kt @@ -29,7 +29,6 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.core.Flavor @@ -53,7 +52,6 @@ internal class HypervisorTest { */ @OptIn(ExperimentalCoroutinesApi::class) @Test - @Disabled fun smoke() { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) @@ -75,7 +73,7 @@ internal class HypervisorTest { delay(5) - val flavor = org.opendc.compute.core.Flavor(1, 0) + val flavor = Flavor(1, 0) val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] vmDriver.events.onEach { println(it) }.launchIn(this) val vmA = vmDriver.spawn("a", workloadA, flavor) @@ -140,7 +138,7 @@ internal class HypervisorTest { delay(5) - val flavor = org.opendc.compute.core.Flavor(2, 0) + val flavor = Flavor(2, 0) val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] vmDriver.events .onEach { event -> diff --git a/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/SimHypervisorTest.kt b/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/SimHypervisorTest.kt new file mode 100644 index 00000000..9d33d395 --- /dev/null +++ b/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/SimHypervisorTest.kt @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.virt + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.core.Flavor +import org.opendc.compute.core.image.SimWorkloadImage +import org.opendc.compute.metal.driver.SimBareMetalDriver +import org.opendc.compute.virt.driver.SimVirtDriverWorkload +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.utils.DelayControllerClockAdapter +import java.time.Clock +import java.util.UUID + +/** + * Basic test-suite for the hypervisor. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimHypervisorTest { + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + private lateinit var machineModel: SimMachineModel + + @BeforeEach + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + machineModel = SimMachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + /** + * Test overcommissioning of a hypervisor. + */ + @Test + fun overcommission() { + var requestedBurst = 0L + var grantedBurst = 0L + var overcommissionedBurst = 0L + + scope.launch { + val virtDriverWorkload = SimVirtDriverWorkload() + val vmm = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), virtDriverWorkload) + val duration = 5 * 60L + val vmImageA = SimWorkloadImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(0, 3500L * duration, duration * 1000, 3500.0, 2), + SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(0, 183L * duration, duration * 1000, 183.0, 2) + ), + ) + ) + val vmImageB = SimWorkloadImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(0, 3100L * duration, duration * 1000, 3100.0, 2), + SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(0, 73L * duration, duration * 1000, 73.0, 2) + ) + ), + ) + + val metalDriver = + SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) + + metalDriver.init() + metalDriver.setImage(vmm) + metalDriver.start() + + delay(5) + + val flavor = Flavor(2, 0) + val vmDriver = virtDriverWorkload.driver + vmDriver.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> { + requestedBurst += event.requestedBurst + grantedBurst += event.grantedBurst + overcommissionedBurst += event.overcommissionedBurst + } + } + } + .launchIn(this) + + vmDriver.spawn("a", vmImageA, flavor) + vmDriver.spawn("b", vmImageB, flavor) + } + + scope.advanceUntilIdle() + + assertAll( + { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, + { assertEquals(2073600, requestedBurst, "Requested Burst does not match") }, + { assertEquals(2013600, grantedBurst, "Granted Burst does not match") }, + { assertEquals(60000, overcommissionedBurst, "Overcommissioned Burst does not match") }, + { assertEquals(1200007, scope.currentTime) } + ) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt new file mode 100644 index 00000000..c6d5bdd1 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -0,0 +1,281 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance +import org.opendc.simulator.compute.workload.SimWorkload +import java.lang.Runnable +import java.time.Clock +import kotlin.coroutines.ContinuationInterceptor +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A simulated bare-metal machine that is able to run a single workload. + * + * A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For + * example. the class expects only a single concurrent call to [run]. + * + * @param coroutineScope The [CoroutineScope] to run the simulated workload in. + * @param clock The virtual clock to track the simulation time. + * @param model The machine model to simulate. + */ +@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) +public class SimBareMetalMachine( + private val coroutineScope: CoroutineScope, + private val clock: Clock, + override val model: SimMachineModel +) : SimMachine { + /** + * A [StateFlow] representing the CPU usage of the simulated machine. + */ + override val usage: StateFlow<Double> + get() = usageState + + /** + * The current active workload. + */ + private var activeWorkload: SimWorkload? = null + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload) { + require(activeWorkload == null) { "Run should not be called concurrently" } + + try { + activeWorkload = workload + workload.run(ctx) + } finally { + activeWorkload = null + } + } + + /** + * The execution context in which the workload runs. + */ + private val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = this@SimBareMetalMachine.model + + override val clock: Clock + get() = this@SimBareMetalMachine.clock + + override fun onRun( + batch: Sequence<SimExecutionContext.Slice>, + triggerMode: SimExecutionContext.TriggerMode, + merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice + ): SelectClause0 { + return object : SelectClause0 { + @InternalCoroutinesApi + override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { + // Do not reset the usage state: we will set it ourselves + usageFlush?.dispose() + usageFlush = null + + val queue = batch.iterator() + var start = Long.MIN_VALUE + var currentWork: SliceWork? = null + var currentDisposable: DisposableHandle? = null + + fun schedule(slice: SimExecutionContext.Slice) { + start = clock.millis() + + val isLastSlice = !queue.hasNext() + val work = SliceWork(slice) + val candidateDuration = when (triggerMode) { + SimExecutionContext.TriggerMode.FIRST -> work.minExit + SimExecutionContext.TriggerMode.LAST -> work.maxExit + SimExecutionContext.TriggerMode.DEADLINE -> slice.deadline - start + } + + // Check whether the deadline is exceeded during the run of the slice. + val duration = min(candidateDuration, slice.deadline - start) + + val action = Runnable { + currentWork = null + + // Flush all the work that was performed + val hasFinished = work.stop(duration) + + if (!isLastSlice) { + val candidateSlice = queue.next() + val nextSlice = + // If our previous slice exceeds its deadline, merge it with the next candidate slice + if (hasFinished) + candidateSlice + else + merge(candidateSlice, slice) + schedule(nextSlice) + } else if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } + } + + // Schedule the flush after the entire slice has finished + currentDisposable = delay.invokeOnTimeout(duration, action) + + // Start the slice work + currentWork = work + work.start() + } + + // Schedule the first work + if (queue.hasNext()) { + schedule(queue.next()) + + // A DisposableHandle to flush the work in case the call is cancelled + val disposable = DisposableHandle { + val end = clock.millis() + val duration = end - start + + currentWork?.stop(duration) + currentDisposable?.dispose() + + // Schedule reset the usage of the machine since the call is returning + usageFlush = delay.invokeOnTimeout(1) { + usageState.value = 0.0 + usageFlush = null + } + } + + select.disposeOnSelect(disposable) + } else if (select.trySelect()) { + // No work has been given: select immediately + block.startCoroutineCancellable(select.completion) + } + } + } + } + } + + /** + * The [MutableStateFlow] containing the load of the server. + */ + private val usageState = MutableStateFlow(0.0) + + /** + * A disposable to prevent resetting the usage state for subsequent calls to onRun. + */ + private var usageFlush: DisposableHandle? = null + + /** + * Cache the [Delay] instance for timing. + * + * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. + * XXX Note however that this is an ugly hack which may break in the future. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay + + /** + * A slice to be processed. + */ + private inner class SliceWork(val slice: SimExecutionContext.Slice) { + /** + * The duration after which the first processor finishes processing this slice. + */ + val minExit: Long + + /** + * The duration after which the last processor finishes processing this slice. + */ + val maxExit: Long + + /** + * A flag to indicate that the slice will exceed the deadline. + */ + val exceedsDeadline: Boolean + get() = slice.deadline < maxExit + + /** + * The total amount of CPU usage. + */ + val totalUsage: Double + + /** + * A flag to indicate that this slice is empty. + */ + val isEmpty: Boolean + + init { + var totalUsage = 0.0 + var minExit = Long.MAX_VALUE + var maxExit = 0L + var nonEmpty = false + + // Determine the duration of the first/last CPU to finish + for (i in 0 until min(model.cpus.size, slice.burst.size)) { + val cpu = model.cpus[i] + val usage = min(slice.limit[i], cpu.frequency) + val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + totalUsage += usage / cpu.frequency + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + minExit = min(minExit, cpuDuration) + maxExit = max(maxExit, cpuDuration) + nonEmpty = true + } + } + + this.isEmpty = !nonEmpty + this.totalUsage = totalUsage + this.minExit = minExit + this.maxExit = maxExit + } + + /** + * Indicate that the work on the slice has started. + */ + fun start() { + usageState.value = totalUsage / model.cpus.size + } + + /** + * Flush the work performed on the slice. + */ + fun stop(duration: Long): Boolean { + var hasFinished = true + + for (i in 0 until min(model.cpus.size, slice.burst.size)) { + val usage = min(slice.limit[i], model.cpus[i].frequency) + val granted = ceil(duration / 1000.0 * usage).toLong() + val res = max(0, slice.burst[i] - granted) + slice.burst[i] = res + + if (res != 0L) { + hasFinished = false + } + } + + return hasFinished + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt new file mode 100644 index 00000000..7c2cfbe3 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -0,0 +1,529 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance +import kotlinx.coroutines.selects.select +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] concurrently. + * + * @param coroutineScope The [CoroutineScope] to run the simulated workloads in. + * @param clock The virtual clock to track the simulation time. + */ +@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) +public class SimHypervisor( + private val coroutineScope: CoroutineScope, + private val clock: Clock, + private val listener: Listener? = null +) : SimWorkload { + /** + * A set for tracking the VM context objects. + */ + private val vms: MutableSet<VmExecutionContext> = mutableSetOf() + + /** + * A flag to indicate the driver is stopped. + */ + private var stopped: Boolean = false + + /** + * The channel for scheduling new CPU requests. + */ + private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED) + + /** + * Create a [SimMachine] instance on which users may run a [SimWorkload]. + * + * @param model The machine to create. + */ + public fun createMachine(model: SimMachineModel): SimMachine { + val vmCtx = VmExecutionContext(model) + + return object : SimMachine { + override val model: SimMachineModel + get() = vmCtx.machine + + override val usage: StateFlow<Double> + get() = vmCtx.session.usage + + /** + * The current active workload. + */ + private var activeWorkload: SimWorkload? = null + + override suspend fun run(workload: SimWorkload) { + require(activeWorkload == null) { "Run should not be called concurrently" } + + try { + activeWorkload = workload + workload.run(vmCtx) + } finally { + activeWorkload = null + } + } + + override fun toString(): String = "SimVirtualMachine" + } + } + + /** + * Run the scheduling process of the hypervisor. + */ + override suspend fun run(ctx: SimExecutionContext) { + val maxUsage = ctx.machine.cpus.sumByDouble { it.frequency } + val pCPUs = ctx.machine.cpus.indices.sortedBy { ctx.machine.cpus[it].frequency } + + val vms = mutableSetOf<VmSession>() + val vcpus = mutableListOf<VCpu>() + + val usage = DoubleArray(ctx.machine.cpus.size) + val burst = LongArray(ctx.machine.cpus.size) + + fun process(command: SchedulerCommand) { + when (command) { + is SchedulerCommand.Schedule -> { + vms += command.vm + vcpus.addAll(command.vm.vcpus) + } + is SchedulerCommand.Deschedule -> { + vms -= command.vm + vcpus.removeAll(command.vm.vcpus) + } + is SchedulerCommand.Interrupt -> { + } + } + } + + fun processRemaining() { + var command = schedulingQueue.poll() + while (command != null) { + process(command) + command = schedulingQueue.poll() + } + } + + while (!stopped) { + // Wait for a request to be submitted if we have no work yet. + if (vcpus.isEmpty()) { + process(schedulingQueue.receive()) + } + + processRemaining() + + val start = clock.millis() + + var duration: Double = Double.POSITIVE_INFINITY + var deadline: Long = Long.MAX_VALUE + var availableUsage = maxUsage + var totalRequestedUsage = 0.0 + var totalRequestedBurst = 0L + + // Sort the vCPUs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + vcpus.sort() + + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing + for ((i, req) in vcpus.withIndex()) { + val remaining = vcpus.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(req.limit, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, req.vm.deadline) + + // Ignore empty CPUs + if (grantedUsage <= 0 || req.burst <= 0) { + req.allocatedLimit = 0.0 + continue + } + + totalRequestedUsage += req.limit + totalRequestedBurst += req.burst + + req.allocatedLimit = grantedUsage + availableUsage -= grantedUsage + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, req.burst / grantedUsage) + } + + val totalAllocatedUsage = maxUsage - availableUsage + var totalAllocatedBurst = 0L + availableUsage = totalAllocatedUsage + + // Divide the requests over the available capacity of the pCPUs fairly + for (i in pCPUs) { + val maxCpuUsage = ctx.machine.cpus[i].frequency + val fraction = maxCpuUsage / maxUsage + val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) + val grantedBurst = ceil(duration * grantedUsage).toLong() + + usage[i] = grantedUsage + burst[i] = grantedBurst + totalAllocatedBurst += grantedBurst + availableUsage -= grantedUsage + } + + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + select<Boolean> { + schedulingQueue.onReceive { schedulingQueue.offer(it); true } + ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), SimExecutionContext.TriggerMode.DEADLINE) + .invoke { false } + } + + val end = clock.millis() + + // No work was performed + if ((end - start) <= 0) { + continue + } + + // The total requested burst that the VMs wanted to run in the time-frame that we ran. + val totalRequestedSubBurst = + vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() + val totalRemainder = burst.sum() + val totalGrantedBurst = totalAllocatedBurst - totalRemainder + + // The burst that was lost due to overcommissioning of CPU resources + var totalOvercommissionedBurst = 0L + // The burst that was lost due to interference. + var totalInterferedBurst = 0L + + val vmIterator = vms.iterator() + while (vmIterator.hasNext()) { + val vm = vmIterator.next() + + // Apply performance interference model + val performanceScore = 1.0 // TODO Performance interference + var hasFinished = false + + for (vcpu in vm.vcpus) { + // Compute the fraction of compute time allocated to the VM + val fraction = vcpu.allocatedLimit / totalAllocatedUsage + + // Compute the burst time that the VM was actually granted + val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() + + // The burst that was actually used by the VM + val usedBurst = ceil(grantedBurst * performanceScore).toLong() + + totalInterferedBurst += grantedBurst - usedBurst + + // Compute remaining burst time to be executed for the request + if (vcpu.consume(usedBurst)) { + hasFinished = true + } else if (vm.deadline <= end) { + // Request must have its entire burst consumed or otherwise we have overcommission + // Note that we count the overcommissioned burst if the hypervisor has failed. + totalOvercommissionedBurst += vcpu.burst + } + } + + if (hasFinished || vm.deadline <= end) { + // Mark the VM as finished and deschedule the VMs if needed + if (vm.finish()) { + vmIterator.remove() + vcpus.removeAll(vm.vcpus) + } + } + } + + listener?.onSliceFinish( + this, + totalRequestedBurst, + min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing + totalOvercommissionedBurst, + totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, + min( + totalAllocatedUsage, + totalRequestedUsage + ), // The allocated usage might be slightly higher due to FP rounding + totalRequestedUsage + ) + } + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + hypervisor: SimHypervisor, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } + + /** + * A scheduling command processed by the scheduler. + */ + private sealed class SchedulerCommand { + /** + * Schedule the specified VM on the hypervisor. + */ + data class Schedule(val vm: VmSession) : SchedulerCommand() + + /** + * De-schedule the specified VM on the hypervisor. + */ + data class Deschedule(val vm: VmSession) : SchedulerCommand() + + /** + * Interrupt the scheduler. + */ + object Interrupt : SchedulerCommand() + } + + /** + * A virtual machine running on the hypervisor. + * + * @param ctx The execution context the vCPU runs in. + * @param triggerMode The mode when to trigger the VM exit. + * @param merge The function to merge consecutive slices on spillover. + * @param select The function to select on finish. + */ + @OptIn(InternalCoroutinesApi::class) + private data class VmSession( + val model: SimMachineModel, + var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST, + var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r }, + var select: () -> Unit = {} + ) { + /** + * The vCPUs of this virtual machine. + */ + val vcpus: List<VCpu> + + /** + * The slices that the VM wants to run. + */ + var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator() + + /** + * The current active slice. + */ + var activeSlice: SimExecutionContext.Slice? = null + + /** + * The current deadline of the VM. + */ + val deadline: Long + get() = activeSlice?.deadline ?: Long.MAX_VALUE + + /** + * A flag to indicate that the VM is idle. + */ + val isIdle: Boolean + get() = activeSlice == null + + /** + * The usage of the virtual machine. + */ + val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) + + init { + vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) } + } + + /** + * Schedule the given slices on this vCPU, replacing the existing slices. + */ + fun schedule(slices: Sequence<SimExecutionContext.Slice>) { + queue = slices.iterator() + + if (queue.hasNext()) { + activeSlice = queue.next() + refresh() + } + } + + /** + * Cancel the existing workload on the VM. + */ + fun cancel() { + queue = emptyList<SimExecutionContext.Slice>().iterator() + activeSlice = null + refresh() + } + + /** + * Finish the current slice of the VM. + * + * @return `true` if the vCPUs may be descheduled, `false` otherwise. + */ + fun finish(): Boolean { + val activeSlice = activeSlice ?: return true + + return if (queue.hasNext()) { + val needsMerge = activeSlice.burst.any { it > 0 } + val candidateSlice = queue.next() + val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice + + this.activeSlice = slice + + // Update the vCPU cache + refresh() + + false + } else { + this.activeSlice = null + select() + true + } + } + + /** + * Refresh the vCPU cache. + */ + fun refresh() { + vcpus.forEach { it.refresh() } + usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size + } + } + + /** + * A virtual CPU that can be scheduled on a physical CPU. + * + * @param vm The VM of which this vCPU is part. + * @param model The model of CPU that this vCPU models. + * @param id The id of the vCPU with respect to the VM. + */ + private data class VCpu( + val vm: VmSession, + val model: ProcessingUnit, + val id: Int + ) : Comparable<VCpu> { + /** + * The current limit on the vCPU. + */ + var limit: Double = 0.0 + + /** + * The limit allocated by the hypervisor. + */ + var allocatedLimit: Double = 0.0 + + /** + * The current burst running on the vCPU. + */ + var burst: Long = 0L + + /** + * Consume the specified burst on this vCPU. + */ + fun consume(burst: Long): Boolean { + this.burst = max(0, this.burst - burst) + + // Flush the result to the slice if it exists + vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst) + + return allocatedLimit > 0.0 && this.burst == 0L + } + + /** + * Refresh the information of this vCPU based on the current slice. + */ + fun refresh() { + limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 + burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 + } + + /** + * Compare to another vCPU based on the current load of the vCPU. + */ + override fun compareTo(other: VCpu): Int { + return limit.compareTo(other.limit) + } + + /** + * Create a string representation of the vCPU. + */ + override fun toString(): String = + "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" + } + + /** + * The execution context in which a VM runs. + * + */ + private inner class VmExecutionContext(override val machine: SimMachineModel) : + SimExecutionContext, + DisposableHandle { + private var finalized: Boolean = false + private var initialized: Boolean = false + val session: VmSession = VmSession(machine) + + override val clock: Clock + get() = this@SimHypervisor.clock + + @OptIn(InternalCoroutinesApi::class) + override fun onRun( + batch: Sequence<SimExecutionContext.Slice>, + triggerMode: SimExecutionContext.TriggerMode, + merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice + ): SelectClause0 = object : SelectClause0 { + @InternalCoroutinesApi + override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { + session.triggerMode = triggerMode + session.merge = merge + session.select = { + if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } + } + session.schedule(batch) + // Indicate to the hypervisor that the VM should be re-scheduled + schedulingQueue.offer(SchedulerCommand.Schedule(session)) + select.disposeOnSelect(this@VmExecutionContext) + } + } + + override fun dispose() { + if (!session.isIdle) { + session.cancel() + schedulingQueue.offer(SchedulerCommand.Deschedule(session)) + } + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index df74a5f1..f66085af 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -22,245 +22,27 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.SelectInstance import org.opendc.simulator.compute.workload.SimWorkload -import java.lang.Runnable -import java.time.Clock -import kotlin.coroutines.ContinuationInterceptor -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** - * A simulated bare-metal machine that is able to run a single workload. - * - * @param coroutineScope The [CoroutineScope] to run the simulated workload in. - * @param clock The virtual clock to track the simulation time. - * @param model The machine model to simulate. + * A generic machine that is able to run a [SimWorkload]. */ -@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) -public class SimMachine( - private val coroutineScope: CoroutineScope, - private val clock: Clock, +@OptIn(ExperimentalCoroutinesApi::class) +public interface SimMachine { + /** + * The model of the machine containing its specifications. + */ public val model: SimMachineModel -) { + /** * A [StateFlow] representing the CPU usage of the simulated machine. */ public val usage: StateFlow<Double> - get() = usageState /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - public suspend fun run(workload: SimWorkload) { - workload.run(ctx) - } - - /** - * The execution context in which the workload runs. - */ - private val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = this@SimMachine.model - - override val clock: Clock - get() = this@SimMachine.clock - - override fun onRun( - batch: Sequence<SimExecutionContext.Slice>, - triggerMode: SimExecutionContext.TriggerMode, - merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice - ): SelectClause0 { - return object : SelectClause0 { - @InternalCoroutinesApi - override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { - // Do not reset the usage state: we will set it ourselves - usageFlush?.dispose() - usageFlush = null - - val queue = batch.iterator() - var start = Long.MIN_VALUE - var currentWork: SliceWork? = null - var currentDisposable: DisposableHandle? = null - - fun schedule(slice: SimExecutionContext.Slice) { - start = clock.millis() - - val isLastSlice = !queue.hasNext() - val work = SliceWork(slice) - val candidateDuration = when (triggerMode) { - SimExecutionContext.TriggerMode.FIRST -> work.minExit - SimExecutionContext.TriggerMode.LAST -> work.maxExit - SimExecutionContext.TriggerMode.DEADLINE -> slice.deadline - start - } - - // Check whether the deadline is exceeded during the run of the slice. - val duration = min(candidateDuration, slice.deadline - start) - - val action = Runnable { - currentWork = null - - // Flush all the work that was performed - val hasFinished = work.stop(duration) - - if (!isLastSlice) { - val candidateSlice = queue.next() - val nextSlice = - // If our previous slice exceeds its deadline, merge it with the next candidate slice - if (hasFinished) - candidateSlice - else - merge(candidateSlice, slice) - schedule(nextSlice) - } else if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) - } - } - - // Schedule the flush after the entire slice has finished - currentDisposable = delay.invokeOnTimeout(duration, action) - - // Start the slice work - currentWork = work - work.start() - } - - // Schedule the first work - if (queue.hasNext()) { - schedule(queue.next()) - - // A DisposableHandle to flush the work in case the call is cancelled - val disposable = DisposableHandle { - val end = clock.millis() - val duration = end - start - - currentWork?.stop(duration) - currentDisposable?.dispose() - - // Schedule reset the usage of the machine since the call is returning - usageFlush = delay.invokeOnTimeout(1) { - usageState.value = 0.0 - usageFlush = null - } - } - - select.disposeOnSelect(disposable) - } else if (select.trySelect()) { - // No work has been given: select immediately - block.startCoroutineCancellable(select.completion) - } - } - } - } - } - - /** - * The [MutableStateFlow] containing the load of the server. - */ - private val usageState = MutableStateFlow(0.0) - - /** - * A disposable to prevent resetting the usage state for subsequent calls to onRun. - */ - private var usageFlush: DisposableHandle? = null - - /** - * Cache the [Delay] instance for timing. - * - * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. - * XXX Note however that this is an ugly hack which may break in the future. - */ - @OptIn(InternalCoroutinesApi::class) - private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay - - /** - * A slice to be processed. - */ - private inner class SliceWork(val slice: SimExecutionContext.Slice) { - /** - * The duration after which the first processor finishes processing this slice. - */ - val minExit: Long - - /** - * The duration after which the last processor finishes processing this slice. - */ - val maxExit: Long - - /** - * A flag to indicate that the slice will exceed the deadline. - */ - val exceedsDeadline: Boolean - get() = slice.deadline < maxExit - - /** - * The total amount of CPU usage. - */ - val totalUsage: Double - - /** - * A flag to indicate that this slice is empty. - */ - val isEmpty: Boolean - - init { - var totalUsage = 0.0 - var minExit = Long.MAX_VALUE - var maxExit = 0L - var nonEmpty = false - - // Determine the duration of the first/last CPU to finish - for (i in 0 until min(model.cpus.size, slice.burst.size)) { - val cpu = model.cpus[i] - val usage = min(slice.limit[i], cpu.frequency) - val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds - - totalUsage += usage / cpu.frequency - - if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst - minExit = min(minExit, cpuDuration) - maxExit = max(maxExit, cpuDuration) - nonEmpty = true - } - } - - this.isEmpty = !nonEmpty - this.totalUsage = totalUsage - this.minExit = minExit - this.maxExit = maxExit - } - - /** - * Indicate that the work on the slice has started. - */ - fun start() { - usageState.value = totalUsage / model.cpus.size - } - - /** - * Flush the work performed on the slice. - */ - fun stop(duration: Long): Boolean { - var hasFinished = true - - for (i in 0 until min(model.cpus.size, slice.burst.size)) { - val usage = min(slice.limit[i], model.cpus[i].frequency) - val granted = ceil(duration / 1000.0 * usage).toLong() - val res = max(0, slice.burst[i] - granted) - slice.burst[i] = res - - if (res != 0L) { - hasFinished = false - } - } - - return hasFinished - } - } + public suspend fun run(workload: SimWorkload) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index 0ef4130e..2add8cce 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -26,6 +26,9 @@ import org.opendc.simulator.compute.SimExecutionContext /** * A model that characterizes the runtime behavior of some particular workload. + * + * Workloads are stateful objects that may be paused and resumed at a later moment. As such, be careful when using the + * same [SimWorkload] from multiple contexts as only a single concurrent [run] call is expected. */ public interface SimWorkload { /** diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt new file mode 100644 index 00000000..b9cd1b06 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.utils.DelayControllerClockAdapter +import java.time.Clock +import java.util.* + +/** + * Test suite for the [SimHypervisor] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimHypervisorTest { + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + private lateinit var machineModel: SimMachineModel + + @BeforeEach + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + machineModel = SimMachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + /** + * Test overcommissioning of a hypervisor. + */ + @Test + fun overcommission() { + val listener = object : SimHypervisor.Listener { + var totalRequestedBurst = 0L + var totalGrantedBurst = 0L + var totalOvercommissionedBurst = 0L + + override fun onSliceFinish( + hypervisor: SimHypervisor, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + totalRequestedBurst += requestedBurst + totalGrantedBurst += grantedBurst + totalOvercommissionedBurst += overcommissionedBurst + } + } + + scope.launch { + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(0, 3500L * duration, duration * 1000, 3500.0, 2), + SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(0, 183L * duration, duration * 1000, 183.0, 2) + ), + ) + val workloadB = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(0, 3100L * duration, duration * 1000, 3100.0, 2), + SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(0, 73L * duration, duration * 1000, 73.0, 2) + ) + ) + + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimHypervisor(scope, clock, listener) + + launch { + machine.run(hypervisor) + } + + yield() + launch { hypervisor.createMachine(machineModel).run(workloadA) } + launch { hypervisor.createMachine(machineModel).run(workloadB) } + } + + scope.advanceUntilIdle() + + assertAll( + { Assertions.assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, + { Assertions.assertEquals(2073600, listener.totalRequestedBurst, "Requested Burst does not match") }, + { Assertions.assertEquals(2013600, listener.totalGrantedBurst, "Granted Burst does not match") }, + { Assertions.assertEquals(60000, listener.totalOvercommissionedBurst, "Overcommissioned Burst does not match") }, + { Assertions.assertEquals(1200001, scope.currentTime) } + ) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index f6fb8e04..332ca8e9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -35,7 +35,7 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter /** - * Test suite for the [SimMachine] class. + * Test suite for the [SimBareMetalMachine] class. */ @OptIn(ExperimentalCoroutinesApi::class) class SimMachineTest { @@ -55,7 +55,7 @@ class SimMachineTest { fun testFlopsWorkload() { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) - val machine = SimMachine(testScope, clock, machineModel) + val machine = SimBareMetalMachine(testScope, clock, machineModel) testScope.runBlockingTest { machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0)) @@ -69,7 +69,7 @@ class SimMachineTest { fun testUsage() { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) - val machine = SimMachine(testScope, clock, machineModel) + val machine = SimBareMetalMachine(testScope, clock, machineModel) testScope.runBlockingTest { machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0)) |
