diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-07 23:59:15 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-01-07 23:59:15 +0100 |
| commit | 42e9a5b5b610f41a03e68f6fc781c54b9402925b (patch) | |
| tree | 650e886239e8983812d14f3108fdc895756a17d0 /simulator | |
| parent | 3441586e4a10fcc6b2f458d16301ae2770d4886a (diff) | |
| parent | 9cf24c9a8d3e96a29d9b111081bc3369aadd490d (diff) | |
Merge pull request #69 from atlarge-research/refactor/workflows-v1
Refactor workflow service to schedule tasks onto VMs
Diffstat (limited to 'simulator')
25 files changed, 637 insertions, 770 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt index ab96e0a3..3d722110 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt @@ -42,6 +42,11 @@ public interface VirtProvisioningService { public suspend fun drivers(): Set<VirtDriver> /** + * The number of hosts available in the system. + */ + public val hostCount: Int + + /** * Submit the specified [Image] to the provisioning service. * * @param name The name of the server to deploy. diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt index 09eec1ef..249979a8 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt @@ -32,11 +32,10 @@ import org.opendc.compute.core.virt.HypervisorEvent import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException import org.opendc.compute.core.virt.driver.VirtDriver import org.opendc.core.services.ServiceRegistry -import org.opendc.simulator.compute.SimExecutionContext -import org.opendc.simulator.compute.SimHypervisor -import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.* import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.utils.flow.EventFlow import java.time.Clock @@ -72,7 +71,7 @@ public class SimVirtDriver( /** * The hypervisor to run multiple workloads. */ - private val hypervisor = SimHypervisor( + private val hypervisor = SimFairSharedHypervisor( coroutineScope, clock, object : SimHypervisor.Listener { @@ -126,7 +125,14 @@ public class SimVirtDriver( events ) availableMemory -= requiredMemory - val vm = VirtualMachine(server, events, hypervisor.createMachine(ctx.machine)) + + val originalCpu = ctx.machine.cpus[0] + val processingNode = originalCpu.node.copy(coreCount = flavor.cpuCount) + val processingUnits = (0 until flavor.cpuCount).map { originalCpu.copy(id = it, node = processingNode) } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, flavor.memorySize)) + + val machine = SimMachineModel(processingUnits, memoryUnits) + val vm = VirtualMachine(server, events, hypervisor.createMachine(machine)) vms.add(vm) vmStarted(vm) eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt index 0144fd69..17de3de7 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt @@ -54,7 +54,8 @@ public class SimVirtProvisioningService( private val clock: Clock, private val provisioningService: ProvisioningService, public val allocationPolicy: AllocationPolicy, - private val tracer: EventTracer + private val tracer: EventTracer, + private val schedulingQuantum: Long = 300000 // 5 minutes in milliseconds ) : VirtProvisioningService { /** * The logger instance to use. @@ -134,6 +135,8 @@ public class SimVirtProvisioningService( return availableHypervisors.map { it.driver }.toSet() } + override val hostCount: Int = hypervisors.size + override suspend fun deploy( name: String, image: Image, @@ -173,11 +176,10 @@ public class SimVirtProvisioningService( return } - val quantum = 300000 // 5 minutes in milliseconds // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) + val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) val call = coroutineScope.launch { delay(delay) @@ -191,7 +193,7 @@ public class SimVirtProvisioningService( val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { - val requiredMemory = imageInstance.image.tags["required-memory"] as Long + val requiredMemory = imageInstance.flavor.memorySize val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) if (selectedHv == null) { diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt index 8defe8b7..4470eab9 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt @@ -40,7 +40,7 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic { ): HypervisorView? { return hypervisors.asSequence() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long) + val fitsMemory = hv.availableMemory >= (image.flavor.memorySize) val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount fitsMemory && fitsCpu } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt index a0c61f29..394e87c6 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt @@ -136,7 +136,7 @@ internal class SimVirtDriverTest { assertAll( { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, - { assertEquals(2073600, requestedBurst, "Requested Burst does not match") }, + { assertEquals(2082000, requestedBurst, "Requested Burst does not match") }, { assertEquals(2013600, grantedBurst, "Granted Burst does not match") }, { assertEquals(60000, overcommissionedBurst, "Overcommissioned Burst does not match") }, { assertEquals(1200007, scope.currentTime) } diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index 9cf72f18..ee2295d9 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -37,6 +37,7 @@ dependencies { implementation(project(":opendc-format")) implementation(project(":opendc-workflows")) implementation(project(":opendc-simulator:opendc-simulator-core")) + implementation(project(":opendc-compute:opendc-compute-simulator")) implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { exclude("org.jetbrains.kotlin", module = "kotlin-reflect") } diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt index 1221c7d3..9ad744f2 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt @@ -28,6 +28,8 @@ import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.test.TestCoroutineScope import org.opendc.compute.core.metal.service.ProvisioningService +import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -37,8 +39,6 @@ import org.opendc.workflows.service.WorkflowEvent import org.opendc.workflows.service.WorkflowSchedulerMode import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy -import org.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy import java.io.File @@ -63,21 +63,29 @@ public fun main(args: Array<String>) { val tracer = EventTracer(clock) val schedulerAsync = testScope.async { - val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) - .use { it.construct(this, clock) } + val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) + .use { it.construct(testScope, clock) } + + val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, schedulingQuantum = 1000) + + // Wait for the hypervisors to be spawned + delay(10) StageWorkflowService( - this, + testScope, clock, tracer, - environment.platforms[0].zones[0].services[ProvisioningService], + provisioner, mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), taskEligibilityPolicy = NullTaskEligibilityPolicy, taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - resourceFilterPolicy = FunctionalResourceFilterPolicy, - resourceSelectionPolicy = FirstFitResourceSelectionPolicy ) } diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt index 72a2484a..2eedb636 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -148,9 +148,9 @@ class Sc20IntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(207379117949, monitor.totalRequestedBurst) }, - { assertEquals(203388071813, monitor.totalGrantedBurst) }, - { assertEquals(3991046136, monitor.totalOvercommissionedBurst) }, + { assertEquals(207480856422, monitor.totalRequestedBurst) }, + { assertEquals(206510493178, monitor.totalGrantedBurst) }, + { assertEquals(336120436, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -195,9 +195,9 @@ class Sc20IntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(96344114723, monitor.totalRequestedBurst) }, - { assertEquals(96324378235, monitor.totalGrantedBurst) }, - { assertEquals(19736424, monitor.totalOvercommissionedBurst) }, + { assertEquals(96410877173, monitor.totalRequestedBurst) }, + { assertEquals(96046583992, monitor.totalGrantedBurst) }, + { assertEquals(19265632, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index a20b4f29..b721905d 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -29,6 +29,7 @@ import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task +import org.opendc.workflows.workload.WORKFLOW_TASK_CORES import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE import java.io.BufferedReader import java.io.File @@ -122,8 +123,8 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { val workflowId = values[workflowIdCol].trim().toLong() val taskId = values[taskIdCol].trim().toLong() - val submitTime = values[submitTimeCol].trim().toLong() - val runtime = max(0, values[runtimeCol].trim().toLong()) + val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms + val runtime = max(0, values[runtimeCol].trim().toLong()) // s val cores = values[coreCol].trim().toInt() val dependencies = values[dependencyCol].split(" ") .filter { it.isNotEmpty() } @@ -140,7 +141,10 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { "<unnamed>", SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops, cores)), HashSet(), - mapOf(WORKFLOW_TASK_DEADLINE to runtime) + mapOf( + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to (runtime * 1000) + ), ) entry.submissionTime = min(entry.submissionTime, submitTime) (workflow.tasks as MutableSet<Task>).add(task) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index b2931468..381a0b41 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -32,6 +32,7 @@ import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task +import org.opendc.workflows.workload.WORKFLOW_TASK_CORES import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE import java.util.UUID import kotlin.math.min @@ -82,7 +83,10 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { "<unnamed>", SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops, cores)), HashSet(), - mapOf(WORKFLOW_TASK_DEADLINE to runtime) + mapOf( + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to runtime + ) ) entry.submissionTime = min(entry.submissionTime, submitTime) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 4340708f..5e50a676 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -251,7 +251,7 @@ public class SimBareMetalMachine( this.isEmpty = !nonEmpty this.totalUsage = totalUsage - this.minExit = minExit + this.minExit = if (isEmpty) 0 else minExit this.maxExit = maxExit } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt new file mode 100644 index 00000000..b88871a5 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt @@ -0,0 +1,517 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance +import kotlinx.coroutines.selects.select +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single + * [SimBareMetalMachine] concurrently using weighted fair sharing. + * + * @param coroutineScope The [CoroutineScope] to run the simulated workloads in. + * @param clock The virtual clock to track the simulation time. + */ +@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) +public class SimFairSharedHypervisor( + private val coroutineScope: CoroutineScope, + private val clock: Clock, + private val listener: SimHypervisor.Listener? = null +) : SimHypervisor { + /** + * A flag to indicate the driver is stopped. + */ + private var stopped: Boolean = false + + /** + * The channel for scheduling new CPU requests. + */ + private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED) + + /** + * Create a [SimMachine] instance on which users may run a [SimWorkload]. + * + * @param model The machine to create. + */ + override fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel?): SimMachine { + val vm = VmSession(model, performanceInterferenceModel) + val vmCtx = VmExecutionContext(vm) + + return object : SimMachine { + override val model: SimMachineModel + get() = vmCtx.machine + + override val usage: StateFlow<Double> + get() = vm.usage + + /** + * The current active workload. + */ + private var activeWorkload: SimWorkload? = null + + override suspend fun run(workload: SimWorkload) { + require(activeWorkload == null) { "Run should not be called concurrently" } + + try { + activeWorkload = workload + workload.run(vmCtx) + } finally { + activeWorkload = null + } + } + + override fun toString(): String = "SimVirtualMachine" + } + } + + /** + * Run the scheduling process of the hypervisor. + */ + override suspend fun run(ctx: SimExecutionContext) { + val model = ctx.machine + val maxUsage = model.cpus.sumByDouble { it.frequency } + val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency } + + val vms = mutableSetOf<VmSession>() + val vcpus = mutableListOf<VCpu>() + + val usage = DoubleArray(model.cpus.size) + val burst = LongArray(model.cpus.size) + + fun process(command: SchedulerCommand) { + when (command) { + is SchedulerCommand.Schedule -> { + vms += command.vm + vcpus.addAll(command.vm.vcpus) + } + is SchedulerCommand.Deschedule -> { + vms -= command.vm + vcpus.removeAll(command.vm.vcpus) + } + is SchedulerCommand.Interrupt -> { + } + } + } + + fun processRemaining() { + var command = schedulingQueue.poll() + while (command != null) { + process(command) + command = schedulingQueue.poll() + } + } + + while (!stopped) { + // Wait for a request to be submitted if we have no work yet. + if (vcpus.isEmpty()) { + process(schedulingQueue.receive()) + } + + processRemaining() + + val start = clock.millis() + + var duration: Double = Double.POSITIVE_INFINITY + var deadline: Long = Long.MAX_VALUE + var availableUsage = maxUsage + var totalRequestedUsage = 0.0 + var totalRequestedBurst = 0L + + // Sort the vCPUs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + vcpus.sort() + + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing + for ((i, req) in vcpus.withIndex()) { + val remaining = vcpus.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(req.limit, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, req.vm.deadline) + + // Ignore empty CPUs + if (grantedUsage <= 0 || req.burst <= 0) { + req.allocatedLimit = 0.0 + continue + } + + totalRequestedUsage += req.limit + totalRequestedBurst += req.burst + + req.allocatedLimit = grantedUsage + availableUsage -= grantedUsage + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, req.burst / grantedUsage) + } + + val totalAllocatedUsage = maxUsage - availableUsage + var totalAllocatedBurst = 0L + availableUsage = totalAllocatedUsage + val serverLoad = totalAllocatedUsage / maxUsage + + // / XXX Ceil duration to eliminate rounding issues + duration = ceil(duration) + + // Divide the requests over the available capacity of the pCPUs fairly + for (i in pCPUs) { + val maxCpuUsage = model.cpus[i].frequency + val fraction = maxCpuUsage / maxUsage + val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) + val grantedBurst = ceil(duration * grantedUsage).toLong() + + usage[i] = grantedUsage + burst[i] = grantedBurst + totalAllocatedBurst += grantedBurst + availableUsage -= grantedUsage + } + + // XXX If none of the VMs require any computation, wait until their deadline, otherwise trigger on the + // first vCPU finished. + val triggerMode = + if (totalAllocatedBurst > 0 && totalAllocatedUsage > 0.0) + SimExecutionContext.TriggerMode.FIRST + else + SimExecutionContext.TriggerMode.DEADLINE + + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + val isInterrupted = select<Boolean> { + schedulingQueue.onReceive { schedulingQueue.offer(it); true } + ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), triggerMode) + .invoke { false } + } + + val end = clock.millis() + + // The total requested burst that the VMs wanted to run in the time-frame that we ran. + val totalRequestedSubBurst = + vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() + val totalRemainder = burst.sum() + val totalGrantedBurst = totalAllocatedBurst - totalRemainder + + // The burst that was lost due to overcommissioning of CPU resources + var totalOvercommissionedBurst = 0L + // The burst that was lost due to interference. + var totalInterferedBurst = 0L + + val vmIterator = vms.iterator() + while (vmIterator.hasNext()) { + val vm = vmIterator.next() + + // Apply performance interference model + val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0 + var hasFinished = false + + for (vcpu in vm.vcpus) { + // Compute the fraction of compute time allocated to the VM + val fraction = vcpu.allocatedLimit / totalAllocatedUsage + + // Compute the burst time that the VM was actually granted + val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() + + // The burst that was actually used by the VM + val usedBurst = ceil(grantedBurst * performanceScore).toLong() + + totalInterferedBurst += grantedBurst - usedBurst + + // Compute remaining burst time to be executed for the request + if (vcpu.consume(usedBurst)) { + hasFinished = true + } else if (vm.deadline <= end) { + // Request must have its entire burst consumed or otherwise we have overcommission + // Note that we count the overcommissioned burst if the hypervisor has failed. + totalOvercommissionedBurst += vcpu.burst + } + } + + if (hasFinished || vm.deadline <= end) { + // Mark the VM as finished and deschedule the VMs if needed + if (vm.finish()) { + vmIterator.remove() + vcpus.removeAll(vm.vcpus) + } + } + } + + listener?.onSliceFinish( + this, + totalRequestedBurst, + min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing + totalOvercommissionedBurst, + totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, + min( + totalAllocatedUsage, + totalRequestedUsage + ), // The allocated usage might be slightly higher due to FP rounding + totalRequestedUsage + ) + } + } + + /** + * A scheduling command processed by the scheduler. + */ + private sealed class SchedulerCommand { + /** + * Schedule the specified VM on the hypervisor. + */ + data class Schedule(val vm: VmSession) : SchedulerCommand() + + /** + * De-schedule the specified VM on the hypervisor. + */ + data class Deschedule(val vm: VmSession) : SchedulerCommand() + + /** + * Interrupt the scheduler. + */ + object Interrupt : SchedulerCommand() + } + + /** + * A virtual machine running on the hypervisor. + * + * @param ctx The execution context the vCPU runs in. + * @param triggerMode The mode when to trigger the VM exit. + * @param merge The function to merge consecutive slices on spillover. + * @param select The function to select on finish. + */ + @OptIn(InternalCoroutinesApi::class) + private data class VmSession( + val model: SimMachineModel, + val performanceInterferenceModel: PerformanceInterferenceModel? = null, + var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST, + var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r }, + var select: () -> Unit = {} + ) { + /** + * The vCPUs of this virtual machine. + */ + val vcpus: List<VCpu> + + /** + * The slices that the VM wants to run. + */ + var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator() + + /** + * The current active slice. + */ + var activeSlice: SimExecutionContext.Slice? = null + + /** + * The current deadline of the VM. + */ + val deadline: Long + get() = activeSlice?.deadline ?: Long.MAX_VALUE + + /** + * A flag to indicate that the VM is idle. + */ + val isIdle: Boolean + get() = activeSlice == null + + /** + * The usage of the virtual machine. + */ + val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) + + init { + vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) } + } + + /** + * Schedule the given slices on this vCPU, replacing the existing slices. + */ + fun schedule(slices: Sequence<SimExecutionContext.Slice>) { + queue = slices.iterator() + + if (queue.hasNext()) { + activeSlice = queue.next() + refresh() + } + } + + /** + * Cancel the existing workload on the VM. + */ + fun cancel() { + queue = emptyList<SimExecutionContext.Slice>().iterator() + activeSlice = null + refresh() + } + + /** + * Finish the current slice of the VM. + * + * @return `true` if the vCPUs may be descheduled, `false` otherwise. + */ + fun finish(): Boolean { + val activeSlice = activeSlice ?: return true + + return if (queue.hasNext()) { + val needsMerge = activeSlice.burst.any { it > 0 } + val candidateSlice = queue.next() + val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice + + this.activeSlice = slice + + // Update the vCPU cache + refresh() + + false + } else { + this.activeSlice = null + select() + true + } + } + + /** + * Refresh the vCPU cache. + */ + fun refresh() { + vcpus.forEach { it.refresh() } + usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size + } + } + + /** + * A virtual CPU that can be scheduled on a physical CPU. + * + * @param vm The VM of which this vCPU is part. + * @param model The model of CPU that this vCPU models. + * @param id The id of the vCPU with respect to the VM. + */ + private data class VCpu( + val vm: VmSession, + val model: ProcessingUnit, + val id: Int + ) : Comparable<VCpu> { + /** + * The current limit on the vCPU. + */ + var limit: Double = 0.0 + + /** + * The limit allocated by the hypervisor. + */ + var allocatedLimit: Double = 0.0 + + /** + * The current burst running on the vCPU. + */ + var burst: Long = 0L + + /** + * Consume the specified burst on this vCPU. + */ + fun consume(burst: Long): Boolean { + this.burst = max(0, this.burst - burst) + + // Flush the result to the slice if it exists + vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst) + + val actuallyExists = vm.activeSlice?.burst?.let { id < it.size } ?: false + return actuallyExists && this.burst == 0L + } + + /** + * Refresh the information of this vCPU based on the current slice. + */ + fun refresh() { + limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 + burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 + } + + /** + * Compare to another vCPU based on the current load of the vCPU. + */ + override fun compareTo(other: VCpu): Int { + return limit.compareTo(other.limit) + } + + /** + * Create a string representation of the vCPU. + */ + override fun toString(): String = + "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" + } + + /** + * The execution context in which a VM runs. + * + */ + private inner class VmExecutionContext(val session: VmSession) : + SimExecutionContext, DisposableHandle { + override val machine: SimMachineModel + get() = session.model + + override val clock: Clock + get() = this@SimFairSharedHypervisor.clock + + @OptIn(InternalCoroutinesApi::class) + override fun onRun( + batch: Sequence<SimExecutionContext.Slice>, + triggerMode: SimExecutionContext.TriggerMode, + merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice + ): SelectClause0 = object : SelectClause0 { + @InternalCoroutinesApi + override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { + session.triggerMode = triggerMode + session.merge = merge + session.select = { + if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } + } + session.schedule(batch) + // Indicate to the hypervisor that the VM should be re-scheduled + schedulingQueue.offer(SchedulerCommand.Schedule(session)) + select.disposeOnSelect(this@VmExecutionContext) + } + } + + override fun dispose() { + if (!session.isIdle) { + session.cancel() + schedulingQueue.offer(SchedulerCommand.Deschedule(session)) + } + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt index 6087227b..fb4cd137 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -22,267 +22,23 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.SelectInstance -import kotlinx.coroutines.selects.select import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload -import java.time.Clock -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** - * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] concurrently. - * - * @param coroutineScope The [CoroutineScope] to run the simulated workloads in. - * @param clock The virtual clock to track the simulation time. + * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] i + * concurrently. */ -@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) -public class SimHypervisor( - private val coroutineScope: CoroutineScope, - private val clock: Clock, - private val listener: Listener? = null -) : SimWorkload { - /** - * A set for tracking the VM context objects. - */ - private val vms: MutableSet<VmExecutionContext> = mutableSetOf() - - /** - * A flag to indicate the driver is stopped. - */ - private var stopped: Boolean = false - - /** - * The channel for scheduling new CPU requests. - */ - private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED) - +public interface SimHypervisor : SimWorkload { /** * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. */ - public fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel? = null): SimMachine { - val vm = VmSession(model, performanceInterferenceModel) - val vmCtx = VmExecutionContext(vm) - - return object : SimMachine { - override val model: SimMachineModel - get() = vmCtx.machine - - override val usage: StateFlow<Double> - get() = vm.usage - - /** - * The current active workload. - */ - private var activeWorkload: SimWorkload? = null - - override suspend fun run(workload: SimWorkload) { - require(activeWorkload == null) { "Run should not be called concurrently" } - - try { - activeWorkload = workload - workload.run(vmCtx) - } finally { - activeWorkload = null - } - } - - override fun toString(): String = "SimVirtualMachine" - } - } - - /** - * Run the scheduling process of the hypervisor. - */ - override suspend fun run(ctx: SimExecutionContext) { - val model = ctx.machine - val maxUsage = model.cpus.sumByDouble { it.frequency } - val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency } - - val vms = mutableSetOf<VmSession>() - val vcpus = mutableListOf<VCpu>() - - val usage = DoubleArray(model.cpus.size) - val burst = LongArray(model.cpus.size) - - fun process(command: SchedulerCommand) { - when (command) { - is SchedulerCommand.Schedule -> { - vms += command.vm - vcpus.addAll(command.vm.vcpus) - } - is SchedulerCommand.Deschedule -> { - vms -= command.vm - vcpus.removeAll(command.vm.vcpus) - } - is SchedulerCommand.Interrupt -> { - } - } - } - - fun processRemaining() { - var command = schedulingQueue.poll() - while (command != null) { - process(command) - command = schedulingQueue.poll() - } - } - - while (!stopped) { - // Wait for a request to be submitted if we have no work yet. - if (vcpus.isEmpty()) { - process(schedulingQueue.receive()) - } - - processRemaining() - - val start = clock.millis() - - var duration: Double = Double.POSITIVE_INFINITY - var deadline: Long = Long.MAX_VALUE - var availableUsage = maxUsage - var totalRequestedUsage = 0.0 - var totalRequestedBurst = 0L - - // Sort the vCPUs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - vcpus.sort() - - // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - for ((i, req) in vcpus.withIndex()) { - val remaining = vcpus.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(req.limit, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, req.vm.deadline) - - // Ignore empty CPUs - if (grantedUsage <= 0 || req.burst <= 0) { - req.allocatedLimit = 0.0 - continue - } - - totalRequestedUsage += req.limit - totalRequestedBurst += req.burst - - req.allocatedLimit = grantedUsage - availableUsage -= grantedUsage - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, req.burst / grantedUsage) - } - - // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. - duration = 300.0 - - val totalAllocatedUsage = maxUsage - availableUsage - var totalAllocatedBurst = 0L - availableUsage = totalAllocatedUsage - val serverLoad = totalAllocatedUsage / maxUsage - - // Divide the requests over the available capacity of the pCPUs fairly - for (i in pCPUs) { - val maxCpuUsage = model.cpus[i].frequency - val fraction = maxCpuUsage / maxUsage - val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) - val grantedBurst = ceil(duration * grantedUsage).toLong() - - usage[i] = grantedUsage - burst[i] = grantedBurst - totalAllocatedBurst += grantedBurst - availableUsage -= grantedUsage - } - - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - select<Boolean> { - schedulingQueue.onReceive { schedulingQueue.offer(it); true } - ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), SimExecutionContext.TriggerMode.DEADLINE) - .invoke { false } - } - - val end = clock.millis() - - // No work was performed - if ((end - start) <= 0) { - continue - } - - // The total requested burst that the VMs wanted to run in the time-frame that we ran. - val totalRequestedSubBurst = - vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() - val totalRemainder = burst.sum() - val totalGrantedBurst = totalAllocatedBurst - totalRemainder - - // The burst that was lost due to overcommissioning of CPU resources - var totalOvercommissionedBurst = 0L - // The burst that was lost due to interference. - var totalInterferedBurst = 0L - - val vmIterator = vms.iterator() - while (vmIterator.hasNext()) { - val vm = vmIterator.next() - - // Apply performance interference model - val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0 - var hasFinished = false - - for (vcpu in vm.vcpus) { - // Compute the fraction of compute time allocated to the VM - val fraction = vcpu.allocatedLimit / totalAllocatedUsage - - // Compute the burst time that the VM was actually granted - val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() - - // The burst that was actually used by the VM - val usedBurst = ceil(grantedBurst * performanceScore).toLong() - - totalInterferedBurst += grantedBurst - usedBurst - - // Compute remaining burst time to be executed for the request - if (vcpu.consume(usedBurst)) { - hasFinished = true - } else if (vm.deadline <= end) { - // Request must have its entire burst consumed or otherwise we have overcommission - // Note that we count the overcommissioned burst if the hypervisor has failed. - totalOvercommissionedBurst += vcpu.burst - } - } - - if (hasFinished || vm.deadline <= end) { - // Mark the VM as finished and deschedule the VMs if needed - if (vm.finish()) { - vmIterator.remove() - vcpus.removeAll(vm.vcpus) - } - } - } - - listener?.onSliceFinish( - this, - totalRequestedBurst, - min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing - totalOvercommissionedBurst, - totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, - min( - totalAllocatedUsage, - totalRequestedUsage - ), // The allocated usage might be slightly higher due to FP rounding - totalRequestedUsage - ) - } - } + public fun createMachine( + model: SimMachineModel, + performanceInterferenceModel: PerformanceInterferenceModel? = null + ): SimMachine /** * Event listener for hypervisor events. @@ -301,235 +57,4 @@ public class SimHypervisor( cpuDemand: Double ) } - - /** - * A scheduling command processed by the scheduler. - */ - private sealed class SchedulerCommand { - /** - * Schedule the specified VM on the hypervisor. - */ - data class Schedule(val vm: VmSession) : SchedulerCommand() - - /** - * De-schedule the specified VM on the hypervisor. - */ - data class Deschedule(val vm: VmSession) : SchedulerCommand() - - /** - * Interrupt the scheduler. - */ - object Interrupt : SchedulerCommand() - } - - /** - * A virtual machine running on the hypervisor. - * - * @param ctx The execution context the vCPU runs in. - * @param triggerMode The mode when to trigger the VM exit. - * @param merge The function to merge consecutive slices on spillover. - * @param select The function to select on finish. - */ - @OptIn(InternalCoroutinesApi::class) - private data class VmSession( - val model: SimMachineModel, - val performanceInterferenceModel: PerformanceInterferenceModel? = null, - var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST, - var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r }, - var select: () -> Unit = {} - ) { - /** - * The vCPUs of this virtual machine. - */ - val vcpus: List<VCpu> - - /** - * The slices that the VM wants to run. - */ - var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator() - - /** - * The current active slice. - */ - var activeSlice: SimExecutionContext.Slice? = null - - /** - * The current deadline of the VM. - */ - val deadline: Long - get() = activeSlice?.deadline ?: Long.MAX_VALUE - - /** - * A flag to indicate that the VM is idle. - */ - val isIdle: Boolean - get() = activeSlice == null - - /** - * The usage of the virtual machine. - */ - val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) - - init { - vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) } - } - - /** - * Schedule the given slices on this vCPU, replacing the existing slices. - */ - fun schedule(slices: Sequence<SimExecutionContext.Slice>) { - queue = slices.iterator() - - if (queue.hasNext()) { - activeSlice = queue.next() - refresh() - } - } - - /** - * Cancel the existing workload on the VM. - */ - fun cancel() { - queue = emptyList<SimExecutionContext.Slice>().iterator() - activeSlice = null - refresh() - } - - /** - * Finish the current slice of the VM. - * - * @return `true` if the vCPUs may be descheduled, `false` otherwise. - */ - fun finish(): Boolean { - val activeSlice = activeSlice ?: return true - - return if (queue.hasNext()) { - val needsMerge = activeSlice.burst.any { it > 0 } - val candidateSlice = queue.next() - val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice - - this.activeSlice = slice - - // Update the vCPU cache - refresh() - - false - } else { - this.activeSlice = null - select() - true - } - } - - /** - * Refresh the vCPU cache. - */ - fun refresh() { - vcpus.forEach { it.refresh() } - usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size - } - } - - /** - * A virtual CPU that can be scheduled on a physical CPU. - * - * @param vm The VM of which this vCPU is part. - * @param model The model of CPU that this vCPU models. - * @param id The id of the vCPU with respect to the VM. - */ - private data class VCpu( - val vm: VmSession, - val model: ProcessingUnit, - val id: Int - ) : Comparable<VCpu> { - /** - * The current limit on the vCPU. - */ - var limit: Double = 0.0 - - /** - * The limit allocated by the hypervisor. - */ - var allocatedLimit: Double = 0.0 - - /** - * The current burst running on the vCPU. - */ - var burst: Long = 0L - - /** - * Consume the specified burst on this vCPU. - */ - fun consume(burst: Long): Boolean { - this.burst = max(0, this.burst - burst) - - // Flush the result to the slice if it exists - vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst) - - return allocatedLimit > 0.0 && this.burst == 0L - } - - /** - * Refresh the information of this vCPU based on the current slice. - */ - fun refresh() { - limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 - burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 - } - - /** - * Compare to another vCPU based on the current load of the vCPU. - */ - override fun compareTo(other: VCpu): Int { - return limit.compareTo(other.limit) - } - - /** - * Create a string representation of the vCPU. - */ - override fun toString(): String = - "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" - } - - /** - * The execution context in which a VM runs. - * - */ - private inner class VmExecutionContext(val session: VmSession) : - SimExecutionContext, DisposableHandle { - override val machine: SimMachineModel - get() = session.model - - override val clock: Clock - get() = this@SimHypervisor.clock - - @OptIn(InternalCoroutinesApi::class) - override fun onRun( - batch: Sequence<SimExecutionContext.Slice>, - triggerMode: SimExecutionContext.TriggerMode, - merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice - ): SelectClause0 = object : SelectClause0 { - @InternalCoroutinesApi - override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { - session.triggerMode = triggerMode - session.merge = merge - session.select = { - if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) - } - } - session.schedule(batch) - // Indicate to the hypervisor that the VM should be re-scheduled - schedulingQueue.offer(SchedulerCommand.Schedule(session)) - select.disposeOnSelect(this@VmExecutionContext) - } - } - - override fun dispose() { - if (!session.isIdle) { - session.cancel() - schedulingQueue.offer(SchedulerCommand.Deschedule(session)) - } - } - } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 918a78bd..0d2c9374 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -39,7 +39,7 @@ public class SimFlopsWorkload( public val utilization: Double = 0.8 ) : SimWorkload { init { - require(flops >= 0) { "Negative number of flops" } + require(flops >= 0) { "Negative number of FLOPs" } require(cores > 0) { "Negative number of cores or no cores" } require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } @@ -54,4 +54,6 @@ public class SimFlopsWorkload( ctx.run(SimExecutionContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = SimExecutionContext.TriggerMode.LAST) } + + override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,cores=$cores,utilization=$utilization)" } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 78bd2940..e7fdd4b2 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -105,7 +105,7 @@ internal class SimHypervisorTest { ) val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimHypervisor(scope, clock, listener) + val hypervisor = SimFairSharedHypervisor(scope, clock, listener) launch { machine.run(hypervisor) @@ -120,7 +120,7 @@ internal class SimHypervisorTest { assertAll( { Assertions.assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, - { Assertions.assertEquals(2073600, listener.totalRequestedBurst, "Requested Burst does not match") }, + { Assertions.assertEquals(2082000, listener.totalRequestedBurst, "Requested Burst does not match") }, { Assertions.assertEquals(2013600, listener.totalGrantedBurst, "Granted Burst does not match") }, { Assertions.assertEquals(60000, listener.totalOvercommissionedBurst, "Overcommissioned Burst does not match") }, { Assertions.assertEquals(1200001, scope.currentTime) } diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts index fa03508c..4346efcc 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflows/build.gradle.kts @@ -32,8 +32,10 @@ dependencies { api(project(":opendc-compute:opendc-compute-core")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) + implementation("io.github.microutils:kotlin-logging:1.7.9") testImplementation(project(":opendc-simulator:opendc-simulator-core")) + testImplementation(project(":opendc-compute:opendc-compute-simulator")) testImplementation(project(":opendc-format")) testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { exclude("org.jetbrains.kotlin", module = "kotlin-reflect") diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index 91657f83..2c8d9a0b 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -29,41 +29,43 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.core.Flavor import org.opendc.compute.core.Server import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.service.ProvisioningService +import org.opendc.compute.core.virt.service.VirtProvisioningService import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow import org.opendc.trace.core.enable import org.opendc.workflows.service.stage.job.JobAdmissionPolicy import org.opendc.workflows.service.stage.job.JobOrderPolicy -import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy -import org.opendc.workflows.service.stage.resource.ResourceSelectionPolicy import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy import org.opendc.workflows.service.stage.task.TaskOrderPolicy import org.opendc.workflows.workload.Job +import org.opendc.workflows.workload.WORKFLOW_TASK_CORES import java.time.Clock import java.util.* /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for - * Topology Scheduling. + * Datacenter Scheduling. */ public class StageWorkflowService( internal val coroutineScope: CoroutineScope, internal val clock: Clock, internal val tracer: EventTracer, - private val provisioningService: ProvisioningService, + private val provisioningService: VirtProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, - taskOrderPolicy: TaskOrderPolicy, - resourceFilterPolicy: ResourceFilterPolicy, - resourceSelectionPolicy: ResourceSelectionPolicy + taskOrderPolicy: TaskOrderPolicy ) : WorkflowService { + /** + * The logger instance to use. + */ + private val logger = KotlinLogging.logger {} /** * The incoming jobs ready to be processed by the scheduler. @@ -101,25 +103,10 @@ public class StageWorkflowService( internal val taskByServer = mutableMapOf<Server, TaskState>() /** - * The nodes that are controlled by the service. - */ - internal lateinit var nodes: List<Node> - - /** - * The available nodes. - */ - internal val available: MutableSet<Node> = mutableSetOf() - - /** - * The maximum number of incoming jobs. - */ - private val throttleLimit: Int = 20000 - - /** * The load of the system. */ internal val load: Double - get() = (available.size / nodes.size.toDouble()) + get() = (activeTasks.size / provisioningService.hostCount.toDouble()) /** * The root listener of this scheduler. @@ -170,22 +157,13 @@ public class StageWorkflowService( private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic - private val resourceFilterPolicy: ResourceFilterPolicy.Logic - private val resourceSelectionPolicy: Comparator<Node> init { - coroutineScope.launch { - nodes = provisioningService.nodes().toList() - available.addAll(nodes) - } - this.mode = mode(this) this.jobAdmissionPolicy = jobAdmissionPolicy(this) this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) - this.resourceFilterPolicy = resourceFilterPolicy(this) - this.resourceSelectionPolicy = resourceSelectionPolicy(this) } override val events: Flow<WorkflowEvent> = tracer.openRecording().let { @@ -290,26 +268,25 @@ public class StageWorkflowService( // T3 Per task while (taskQueue.isNotEmpty()) { val instance = taskQueue.peek() - val host: Node? = available.firstOrNull() - if (host != null) { - // T4 Submit task to machine - available -= host + val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1 + val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task + val image = instance.task.image + coroutineScope.launch { + val server = provisioningService.deploy(instance.task.name, image, flavor) + instance.state = TaskStatus.ACTIVE - val newHost = provisioningService.deploy(host, instance.task.image) - val server = newHost.server!! - instance.host = newHost + instance.server = server taskByServer[server] = instance + server.events .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } .launchIn(coroutineScope) - - activeTasks += instance - taskQueue.poll() - rootListener.taskAssigned(instance) - } else { - break } + + activeTasks += instance + taskQueue.poll() + rootListener.taskAssigned(instance) } } @@ -333,7 +310,6 @@ public class StageWorkflowService( task.state = TaskStatus.FINISHED task.finishedAt = clock.millis() job.tasks.remove(task) - available += task.host!! activeTasks -= task tracer.commit( WorkflowEvent.TaskFinished( diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt index ed023c82..d1eb6704 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt @@ -22,7 +22,7 @@ package org.opendc.workflows.service -import org.opendc.compute.core.metal.Node +import org.opendc.compute.core.Server import org.opendc.workflows.workload.Task public class TaskState(public val job: JobState, public val task: Task) { @@ -62,7 +62,7 @@ public class TaskState(public val job: JobState, public val task: Task) { } } - public var host: Node? = null + public var server: Server? = null /** * Mark the specified [TaskView] as terminated. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt deleted file mode 100644 index 8dc323ec..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.StageWorkflowService - -/** - * A [ResourceSelectionPolicy] that selects the first machine that is available. - */ -public object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<Node> = - Comparator<Node> { _, _ -> 1 } - - override fun toString(): String = "First-Fit" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt deleted file mode 100644 index ac79a9ce..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState - -/** - * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for - * the task. - */ -public object FunctionalResourceFilterPolicy : ResourceFilterPolicy { - override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic = - object : ResourceFilterPolicy.Logic { - override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> = - hosts.filter { it in scheduler.available } - } - - override fun toString(): String = "functional" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt deleted file mode 100644 index caf87c70..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.StageWorkflowService -import java.util.* - -/** - * A [ResourceSelectionPolicy] that randomly orders the machines. - */ -public object RandomResourceSelectionPolicy : ResourceSelectionPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<Node> = object : Comparator<Node> { - private val ids: Map<Node, Long> - - init { - val random = Random(123) - ids = scheduler.nodes.associateWith { random.nextLong() } - } - - override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] } - } - - override fun toString(): String = "Random" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt deleted file mode 100644 index 4923a34b..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.service.stage.StagePolicy - -/** - * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and - * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic - * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc. - */ -public interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> { - public interface Logic { - /** - * Filter the list of machines based on dynamic information. - * - * @param hosts The hosts to filter. - * @param task The task that is to be scheduled. - * @return The machines on which the task can be scheduled. - */ - public operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> - } -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt deleted file mode 100644 index 990b990a..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.stage.StagePolicy - -/** - * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected - * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. - */ -public interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>> diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt index d02e2b4e..4305aa57 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt @@ -28,3 +28,8 @@ package org.opendc.workflows.workload * Meta-data key for the deadline of a task. */ public const val WORKFLOW_TASK_DEADLINE: String = "workflow:task:deadline" + +/** + * Meta-data key for the number of cores needed for a task. + */ +public const val WORKFLOW_TASK_CORES: String = "workflow:task:cores" diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 62955a11..b97cb915 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -37,14 +37,14 @@ import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.core.metal.service.ProvisioningService +import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy -import org.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy import kotlin.math.max @@ -74,18 +74,26 @@ internal class StageWorkflowSchedulerIntegrationTest { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) .use { it.construct(testScope, clock) } + val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, schedulingQuantum = 1000) + + // Wait for the hypervisors to be spawned + delay(10) + StageWorkflowService( testScope, clock, tracer, - environment.platforms[0].zones[0].services[ProvisioningService], + provisioner, mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), taskEligibilityPolicy = NullTaskEligibilityPolicy, taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - resourceFilterPolicy = FunctionalResourceFilterPolicy, - resourceSelectionPolicy = FirstFitResourceSelectionPolicy ) } @@ -110,7 +118,7 @@ internal class StageWorkflowSchedulerIntegrationTest { while (reader.hasNext()) { val (time, job) = reader.next() jobsSubmitted++ - delay(max(0, time * 1000 - clock.millis())) + delay(max(0, time - clock.millis())) scheduler.submit(job) } } @@ -118,6 +126,7 @@ internal class StageWorkflowSchedulerIntegrationTest { testScope.advanceUntilIdle() assertAll( + { assertEquals(emptyList<Throwable>(), testScope.uncaughtExceptions) }, { assertNotEquals(0, jobsSubmitted, "No jobs submitted") }, { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") }, { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") }, |
