summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-01-07 23:59:15 +0100
committerGitHub <noreply@github.com>2021-01-07 23:59:15 +0100
commit42e9a5b5b610f41a03e68f6fc781c54b9402925b (patch)
tree650e886239e8983812d14f3108fdc895756a17d0 /simulator
parent3441586e4a10fcc6b2f458d16301ae2770d4886a (diff)
parent9cf24c9a8d3e96a29d9b111081bc3369aadd490d (diff)
Merge pull request #69 from atlarge-research/refactor/workflows-v1
Refactor workflow service to schedule tasks onto VMs
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt5
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt16
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt10
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts1
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt24
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt12
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt10
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt6
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt517
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt489
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt4
-rw-r--r--simulator/opendc-workflows/build.gradle.kts2
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt72
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt4
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt36
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt41
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt45
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt45
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt32
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt5
-rw-r--r--simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt21
25 files changed, 637 insertions, 770 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt
index ab96e0a3..3d722110 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt
@@ -42,6 +42,11 @@ public interface VirtProvisioningService {
public suspend fun drivers(): Set<VirtDriver>
/**
+ * The number of hosts available in the system.
+ */
+ public val hostCount: Int
+
+ /**
* Submit the specified [Image] to the provisioning service.
*
* @param name The name of the server to deploy.
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
index 09eec1ef..249979a8 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
@@ -32,11 +32,10 @@ import org.opendc.compute.core.virt.HypervisorEvent
import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
import org.opendc.compute.core.virt.driver.VirtDriver
import org.opendc.core.services.ServiceRegistry
-import org.opendc.simulator.compute.SimExecutionContext
-import org.opendc.simulator.compute.SimHypervisor
-import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.utils.flow.EventFlow
import java.time.Clock
@@ -72,7 +71,7 @@ public class SimVirtDriver(
/**
* The hypervisor to run multiple workloads.
*/
- private val hypervisor = SimHypervisor(
+ private val hypervisor = SimFairSharedHypervisor(
coroutineScope,
clock,
object : SimHypervisor.Listener {
@@ -126,7 +125,14 @@ public class SimVirtDriver(
events
)
availableMemory -= requiredMemory
- val vm = VirtualMachine(server, events, hypervisor.createMachine(ctx.machine))
+
+ val originalCpu = ctx.machine.cpus[0]
+ val processingNode = originalCpu.node.copy(coreCount = flavor.cpuCount)
+ val processingUnits = (0 until flavor.cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, flavor.memorySize))
+
+ val machine = SimMachineModel(processingUnits, memoryUnits)
+ val vm = VirtualMachine(server, events, hypervisor.createMachine(machine))
vms.add(vm)
vmStarted(vm)
eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
index 0144fd69..17de3de7 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
@@ -54,7 +54,8 @@ public class SimVirtProvisioningService(
private val clock: Clock,
private val provisioningService: ProvisioningService,
public val allocationPolicy: AllocationPolicy,
- private val tracer: EventTracer
+ private val tracer: EventTracer,
+ private val schedulingQuantum: Long = 300000 // 5 minutes in milliseconds
) : VirtProvisioningService {
/**
* The logger instance to use.
@@ -134,6 +135,8 @@ public class SimVirtProvisioningService(
return availableHypervisors.map { it.driver }.toSet()
}
+ override val hostCount: Int = hypervisors.size
+
override suspend fun deploy(
name: String,
image: Image,
@@ -173,11 +176,10 @@ public class SimVirtProvisioningService(
return
}
- val quantum = 300000 // 5 minutes in milliseconds
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = quantum - (clock.millis() % quantum)
+ val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
val call = coroutineScope.launch {
delay(delay)
@@ -191,7 +193,7 @@ public class SimVirtProvisioningService(
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
- val requiredMemory = imageInstance.image.tags["required-memory"] as Long
+ val requiredMemory = imageInstance.flavor.memorySize
val selectedHv = allocationLogic.select(availableHypervisors, imageInstance)
if (selectedHv == null) {
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
index 8defe8b7..4470eab9 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
@@ -40,7 +40,7 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic {
): HypervisorView? {
return hypervisors.asSequence()
.filter { hv ->
- val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long)
+ val fitsMemory = hv.availableMemory >= (image.flavor.memorySize)
val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount
fitsMemory && fitsCpu
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt
index a0c61f29..394e87c6 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt
@@ -136,7 +136,7 @@ internal class SimVirtDriverTest {
assertAll(
{ assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
- { assertEquals(2073600, requestedBurst, "Requested Burst does not match") },
+ { assertEquals(2082000, requestedBurst, "Requested Burst does not match") },
{ assertEquals(2013600, grantedBurst, "Granted Burst does not match") },
{ assertEquals(60000, overcommissionedBurst, "Overcommissioned Burst does not match") },
{ assertEquals(1200007, scope.currentTime) }
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
index 9cf72f18..ee2295d9 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
@@ -37,6 +37,7 @@ dependencies {
implementation(project(":opendc-format"))
implementation(project(":opendc-workflows"))
implementation(project(":opendc-simulator:opendc-simulator-core"))
+ implementation(project(":opendc-compute:opendc-compute-simulator"))
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") {
exclude("org.jetbrains.kotlin", module = "kotlin-reflect")
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
index 1221c7d3..9ad744f2 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
@@ -28,6 +28,8 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.test.TestCoroutineScope
import org.opendc.compute.core.metal.service.ProvisioningService
+import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
@@ -37,8 +39,6 @@ import org.opendc.workflows.service.WorkflowEvent
import org.opendc.workflows.service.WorkflowSchedulerMode
import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
-import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
-import org.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
import java.io.File
@@ -63,21 +63,29 @@ public fun main(args: Array<String>) {
val tracer = EventTracer(clock)
val schedulerAsync = testScope.async {
- val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
- .use { it.construct(this, clock) }
+ val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
+ .use { it.construct(testScope, clock) }
+
+ val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
+ val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, schedulingQuantum = 1000)
+
+ // Wait for the hypervisors to be spawned
+ delay(10)
StageWorkflowService(
- this,
+ testScope,
clock,
tracer,
- environment.platforms[0].zones[0].services[ProvisioningService],
+ provisioner,
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
taskEligibilityPolicy = NullTaskEligibilityPolicy,
taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
- resourceFilterPolicy = FunctionalResourceFilterPolicy,
- resourceSelectionPolicy = FirstFitResourceSelectionPolicy
)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 72a2484a..2eedb636 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -148,9 +148,9 @@ class Sc20IntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(207379117949, monitor.totalRequestedBurst) },
- { assertEquals(203388071813, monitor.totalGrantedBurst) },
- { assertEquals(3991046136, monitor.totalOvercommissionedBurst) },
+ { assertEquals(207480856422, monitor.totalRequestedBurst) },
+ { assertEquals(206510493178, monitor.totalGrantedBurst) },
+ { assertEquals(336120436, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
}
@@ -195,9 +195,9 @@ class Sc20IntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(96344114723, monitor.totalRequestedBurst) },
- { assertEquals(96324378235, monitor.totalGrantedBurst) },
- { assertEquals(19736424, monitor.totalOvercommissionedBurst) },
+ { assertEquals(96410877173, monitor.totalRequestedBurst) },
+ { assertEquals(96046583992, monitor.totalGrantedBurst) },
+ { assertEquals(19265632, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
index a20b4f29..b721905d 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -29,6 +29,7 @@ import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.workflows.workload.Job
import org.opendc.workflows.workload.Task
+import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
import java.io.BufferedReader
import java.io.File
@@ -122,8 +123,8 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
val workflowId = values[workflowIdCol].trim().toLong()
val taskId = values[taskIdCol].trim().toLong()
- val submitTime = values[submitTimeCol].trim().toLong()
- val runtime = max(0, values[runtimeCol].trim().toLong())
+ val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms
+ val runtime = max(0, values[runtimeCol].trim().toLong()) // s
val cores = values[coreCol].trim().toInt()
val dependencies = values[dependencyCol].split(" ")
.filter { it.isNotEmpty() }
@@ -140,7 +141,10 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
"<unnamed>",
SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops, cores)),
HashSet(),
- mapOf(WORKFLOW_TASK_DEADLINE to runtime)
+ mapOf(
+ WORKFLOW_TASK_CORES to cores,
+ WORKFLOW_TASK_DEADLINE to (runtime * 1000)
+ ),
)
entry.submissionTime = min(entry.submissionTime, submitTime)
(workflow.tasks as MutableSet<Task>).add(task)
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index b2931468..381a0b41 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -32,6 +32,7 @@ import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.workflows.workload.Job
import org.opendc.workflows.workload.Task
+import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
import java.util.UUID
import kotlin.math.min
@@ -82,7 +83,10 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
"<unnamed>",
SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops, cores)),
HashSet(),
- mapOf(WORKFLOW_TASK_DEADLINE to runtime)
+ mapOf(
+ WORKFLOW_TASK_CORES to cores,
+ WORKFLOW_TASK_DEADLINE to runtime
+ )
)
entry.submissionTime = min(entry.submissionTime, submitTime)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 4340708f..5e50a676 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -251,7 +251,7 @@ public class SimBareMetalMachine(
this.isEmpty = !nonEmpty
this.totalUsage = totalUsage
- this.minExit = minExit
+ this.minExit = if (isEmpty) 0 else minExit
this.maxExit = maxExit
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt
new file mode 100644
index 00000000..b88871a5
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt
@@ -0,0 +1,517 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.intrinsics.startCoroutineCancellable
+import kotlinx.coroutines.selects.SelectClause0
+import kotlinx.coroutines.selects.SelectInstance
+import kotlinx.coroutines.selects.select
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
+ * [SimBareMetalMachine] concurrently using weighted fair sharing.
+ *
+ * @param coroutineScope The [CoroutineScope] to run the simulated workloads in.
+ * @param clock The virtual clock to track the simulation time.
+ */
+@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
+public class SimFairSharedHypervisor(
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
+ private val listener: SimHypervisor.Listener? = null
+) : SimHypervisor {
+ /**
+ * A flag to indicate the driver is stopped.
+ */
+ private var stopped: Boolean = false
+
+ /**
+ * The channel for scheduling new CPU requests.
+ */
+ private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED)
+
+ /**
+ * Create a [SimMachine] instance on which users may run a [SimWorkload].
+ *
+ * @param model The machine to create.
+ */
+ override fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel?): SimMachine {
+ val vm = VmSession(model, performanceInterferenceModel)
+ val vmCtx = VmExecutionContext(vm)
+
+ return object : SimMachine {
+ override val model: SimMachineModel
+ get() = vmCtx.machine
+
+ override val usage: StateFlow<Double>
+ get() = vm.usage
+
+ /**
+ * The current active workload.
+ */
+ private var activeWorkload: SimWorkload? = null
+
+ override suspend fun run(workload: SimWorkload) {
+ require(activeWorkload == null) { "Run should not be called concurrently" }
+
+ try {
+ activeWorkload = workload
+ workload.run(vmCtx)
+ } finally {
+ activeWorkload = null
+ }
+ }
+
+ override fun toString(): String = "SimVirtualMachine"
+ }
+ }
+
+ /**
+ * Run the scheduling process of the hypervisor.
+ */
+ override suspend fun run(ctx: SimExecutionContext) {
+ val model = ctx.machine
+ val maxUsage = model.cpus.sumByDouble { it.frequency }
+ val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency }
+
+ val vms = mutableSetOf<VmSession>()
+ val vcpus = mutableListOf<VCpu>()
+
+ val usage = DoubleArray(model.cpus.size)
+ val burst = LongArray(model.cpus.size)
+
+ fun process(command: SchedulerCommand) {
+ when (command) {
+ is SchedulerCommand.Schedule -> {
+ vms += command.vm
+ vcpus.addAll(command.vm.vcpus)
+ }
+ is SchedulerCommand.Deschedule -> {
+ vms -= command.vm
+ vcpus.removeAll(command.vm.vcpus)
+ }
+ is SchedulerCommand.Interrupt -> {
+ }
+ }
+ }
+
+ fun processRemaining() {
+ var command = schedulingQueue.poll()
+ while (command != null) {
+ process(command)
+ command = schedulingQueue.poll()
+ }
+ }
+
+ while (!stopped) {
+ // Wait for a request to be submitted if we have no work yet.
+ if (vcpus.isEmpty()) {
+ process(schedulingQueue.receive())
+ }
+
+ processRemaining()
+
+ val start = clock.millis()
+
+ var duration: Double = Double.POSITIVE_INFINITY
+ var deadline: Long = Long.MAX_VALUE
+ var availableUsage = maxUsage
+ var totalRequestedUsage = 0.0
+ var totalRequestedBurst = 0L
+
+ // Sort the vCPUs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ vcpus.sort()
+
+ // Divide the available host capacity fairly across the vCPUs using max-min fair sharing
+ for ((i, req) in vcpus.withIndex()) {
+ val remaining = vcpus.size - i
+ val availableShare = availableUsage / remaining
+ val grantedUsage = min(req.limit, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, req.vm.deadline)
+
+ // Ignore empty CPUs
+ if (grantedUsage <= 0 || req.burst <= 0) {
+ req.allocatedLimit = 0.0
+ continue
+ }
+
+ totalRequestedUsage += req.limit
+ totalRequestedBurst += req.burst
+
+ req.allocatedLimit = grantedUsage
+ availableUsage -= grantedUsage
+
+ // The duration that we want to run is that of the shortest request from a vCPU
+ duration = min(duration, req.burst / grantedUsage)
+ }
+
+ val totalAllocatedUsage = maxUsage - availableUsage
+ var totalAllocatedBurst = 0L
+ availableUsage = totalAllocatedUsage
+ val serverLoad = totalAllocatedUsage / maxUsage
+
+ // / XXX Ceil duration to eliminate rounding issues
+ duration = ceil(duration)
+
+ // Divide the requests over the available capacity of the pCPUs fairly
+ for (i in pCPUs) {
+ val maxCpuUsage = model.cpus[i].frequency
+ val fraction = maxCpuUsage / maxUsage
+ val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction)
+ val grantedBurst = ceil(duration * grantedUsage).toLong()
+
+ usage[i] = grantedUsage
+ burst[i] = grantedBurst
+ totalAllocatedBurst += grantedBurst
+ availableUsage -= grantedUsage
+ }
+
+ // XXX If none of the VMs require any computation, wait until their deadline, otherwise trigger on the
+ // first vCPU finished.
+ val triggerMode =
+ if (totalAllocatedBurst > 0 && totalAllocatedUsage > 0.0)
+ SimExecutionContext.TriggerMode.FIRST
+ else
+ SimExecutionContext.TriggerMode.DEADLINE
+
+ // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
+ // time, so not all of the burst may be executed.
+ val isInterrupted = select<Boolean> {
+ schedulingQueue.onReceive { schedulingQueue.offer(it); true }
+ ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), triggerMode)
+ .invoke { false }
+ }
+
+ val end = clock.millis()
+
+ // The total requested burst that the VMs wanted to run in the time-frame that we ran.
+ val totalRequestedSubBurst =
+ vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
+ val totalRemainder = burst.sum()
+ val totalGrantedBurst = totalAllocatedBurst - totalRemainder
+
+ // The burst that was lost due to overcommissioning of CPU resources
+ var totalOvercommissionedBurst = 0L
+ // The burst that was lost due to interference.
+ var totalInterferedBurst = 0L
+
+ val vmIterator = vms.iterator()
+ while (vmIterator.hasNext()) {
+ val vm = vmIterator.next()
+
+ // Apply performance interference model
+ val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0
+ var hasFinished = false
+
+ for (vcpu in vm.vcpus) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = vcpu.allocatedLimit / totalAllocatedUsage
+
+ // Compute the burst time that the VM was actually granted
+ val grantedBurst = ceil(totalGrantedBurst * fraction).toLong()
+
+ // The burst that was actually used by the VM
+ val usedBurst = ceil(grantedBurst * performanceScore).toLong()
+
+ totalInterferedBurst += grantedBurst - usedBurst
+
+ // Compute remaining burst time to be executed for the request
+ if (vcpu.consume(usedBurst)) {
+ hasFinished = true
+ } else if (vm.deadline <= end) {
+ // Request must have its entire burst consumed or otherwise we have overcommission
+ // Note that we count the overcommissioned burst if the hypervisor has failed.
+ totalOvercommissionedBurst += vcpu.burst
+ }
+ }
+
+ if (hasFinished || vm.deadline <= end) {
+ // Mark the VM as finished and deschedule the VMs if needed
+ if (vm.finish()) {
+ vmIterator.remove()
+ vcpus.removeAll(vm.vcpus)
+ }
+ }
+ }
+
+ listener?.onSliceFinish(
+ this,
+ totalRequestedBurst,
+ min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
+ totalOvercommissionedBurst,
+ totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
+ min(
+ totalAllocatedUsage,
+ totalRequestedUsage
+ ), // The allocated usage might be slightly higher due to FP rounding
+ totalRequestedUsage
+ )
+ }
+ }
+
+ /**
+ * A scheduling command processed by the scheduler.
+ */
+ private sealed class SchedulerCommand {
+ /**
+ * Schedule the specified VM on the hypervisor.
+ */
+ data class Schedule(val vm: VmSession) : SchedulerCommand()
+
+ /**
+ * De-schedule the specified VM on the hypervisor.
+ */
+ data class Deschedule(val vm: VmSession) : SchedulerCommand()
+
+ /**
+ * Interrupt the scheduler.
+ */
+ object Interrupt : SchedulerCommand()
+ }
+
+ /**
+ * A virtual machine running on the hypervisor.
+ *
+ * @param ctx The execution context the vCPU runs in.
+ * @param triggerMode The mode when to trigger the VM exit.
+ * @param merge The function to merge consecutive slices on spillover.
+ * @param select The function to select on finish.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private data class VmSession(
+ val model: SimMachineModel,
+ val performanceInterferenceModel: PerformanceInterferenceModel? = null,
+ var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST,
+ var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r },
+ var select: () -> Unit = {}
+ ) {
+ /**
+ * The vCPUs of this virtual machine.
+ */
+ val vcpus: List<VCpu>
+
+ /**
+ * The slices that the VM wants to run.
+ */
+ var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator()
+
+ /**
+ * The current active slice.
+ */
+ var activeSlice: SimExecutionContext.Slice? = null
+
+ /**
+ * The current deadline of the VM.
+ */
+ val deadline: Long
+ get() = activeSlice?.deadline ?: Long.MAX_VALUE
+
+ /**
+ * A flag to indicate that the VM is idle.
+ */
+ val isIdle: Boolean
+ get() = activeSlice == null
+
+ /**
+ * The usage of the virtual machine.
+ */
+ val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
+
+ init {
+ vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) }
+ }
+
+ /**
+ * Schedule the given slices on this vCPU, replacing the existing slices.
+ */
+ fun schedule(slices: Sequence<SimExecutionContext.Slice>) {
+ queue = slices.iterator()
+
+ if (queue.hasNext()) {
+ activeSlice = queue.next()
+ refresh()
+ }
+ }
+
+ /**
+ * Cancel the existing workload on the VM.
+ */
+ fun cancel() {
+ queue = emptyList<SimExecutionContext.Slice>().iterator()
+ activeSlice = null
+ refresh()
+ }
+
+ /**
+ * Finish the current slice of the VM.
+ *
+ * @return `true` if the vCPUs may be descheduled, `false` otherwise.
+ */
+ fun finish(): Boolean {
+ val activeSlice = activeSlice ?: return true
+
+ return if (queue.hasNext()) {
+ val needsMerge = activeSlice.burst.any { it > 0 }
+ val candidateSlice = queue.next()
+ val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice
+
+ this.activeSlice = slice
+
+ // Update the vCPU cache
+ refresh()
+
+ false
+ } else {
+ this.activeSlice = null
+ select()
+ true
+ }
+ }
+
+ /**
+ * Refresh the vCPU cache.
+ */
+ fun refresh() {
+ vcpus.forEach { it.refresh() }
+ usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size
+ }
+ }
+
+ /**
+ * A virtual CPU that can be scheduled on a physical CPU.
+ *
+ * @param vm The VM of which this vCPU is part.
+ * @param model The model of CPU that this vCPU models.
+ * @param id The id of the vCPU with respect to the VM.
+ */
+ private data class VCpu(
+ val vm: VmSession,
+ val model: ProcessingUnit,
+ val id: Int
+ ) : Comparable<VCpu> {
+ /**
+ * The current limit on the vCPU.
+ */
+ var limit: Double = 0.0
+
+ /**
+ * The limit allocated by the hypervisor.
+ */
+ var allocatedLimit: Double = 0.0
+
+ /**
+ * The current burst running on the vCPU.
+ */
+ var burst: Long = 0L
+
+ /**
+ * Consume the specified burst on this vCPU.
+ */
+ fun consume(burst: Long): Boolean {
+ this.burst = max(0, this.burst - burst)
+
+ // Flush the result to the slice if it exists
+ vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst)
+
+ val actuallyExists = vm.activeSlice?.burst?.let { id < it.size } ?: false
+ return actuallyExists && this.burst == 0L
+ }
+
+ /**
+ * Refresh the information of this vCPU based on the current slice.
+ */
+ fun refresh() {
+ limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0
+ burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0
+ }
+
+ /**
+ * Compare to another vCPU based on the current load of the vCPU.
+ */
+ override fun compareTo(other: VCpu): Int {
+ return limit.compareTo(other.limit)
+ }
+
+ /**
+ * Create a string representation of the vCPU.
+ */
+ override fun toString(): String =
+ "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)"
+ }
+
+ /**
+ * The execution context in which a VM runs.
+ *
+ */
+ private inner class VmExecutionContext(val session: VmSession) :
+ SimExecutionContext, DisposableHandle {
+ override val machine: SimMachineModel
+ get() = session.model
+
+ override val clock: Clock
+ get() = this@SimFairSharedHypervisor.clock
+
+ @OptIn(InternalCoroutinesApi::class)
+ override fun onRun(
+ batch: Sequence<SimExecutionContext.Slice>,
+ triggerMode: SimExecutionContext.TriggerMode,
+ merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice
+ ): SelectClause0 = object : SelectClause0 {
+ @InternalCoroutinesApi
+ override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
+ session.triggerMode = triggerMode
+ session.merge = merge
+ session.select = {
+ if (select.trySelect()) {
+ block.startCoroutineCancellable(select.completion)
+ }
+ }
+ session.schedule(batch)
+ // Indicate to the hypervisor that the VM should be re-scheduled
+ schedulingQueue.offer(SchedulerCommand.Schedule(session))
+ select.disposeOnSelect(this@VmExecutionContext)
+ }
+ }
+
+ override fun dispose() {
+ if (!session.isIdle) {
+ session.cancel()
+ schedulingQueue.offer(SchedulerCommand.Deschedule(session))
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
index 6087227b..fb4cd137 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
@@ -22,267 +22,23 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.intrinsics.startCoroutineCancellable
-import kotlinx.coroutines.selects.SelectClause0
-import kotlinx.coroutines.selects.SelectInstance
-import kotlinx.coroutines.selects.select
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
-import java.time.Clock
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
/**
- * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] concurrently.
- *
- * @param coroutineScope The [CoroutineScope] to run the simulated workloads in.
- * @param clock The virtual clock to track the simulation time.
+ * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] i
+ * concurrently.
*/
-@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
-public class SimHypervisor(
- private val coroutineScope: CoroutineScope,
- private val clock: Clock,
- private val listener: Listener? = null
-) : SimWorkload {
- /**
- * A set for tracking the VM context objects.
- */
- private val vms: MutableSet<VmExecutionContext> = mutableSetOf()
-
- /**
- * A flag to indicate the driver is stopped.
- */
- private var stopped: Boolean = false
-
- /**
- * The channel for scheduling new CPU requests.
- */
- private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED)
-
+public interface SimHypervisor : SimWorkload {
/**
* Create a [SimMachine] instance on which users may run a [SimWorkload].
*
* @param model The machine to create.
*/
- public fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel? = null): SimMachine {
- val vm = VmSession(model, performanceInterferenceModel)
- val vmCtx = VmExecutionContext(vm)
-
- return object : SimMachine {
- override val model: SimMachineModel
- get() = vmCtx.machine
-
- override val usage: StateFlow<Double>
- get() = vm.usage
-
- /**
- * The current active workload.
- */
- private var activeWorkload: SimWorkload? = null
-
- override suspend fun run(workload: SimWorkload) {
- require(activeWorkload == null) { "Run should not be called concurrently" }
-
- try {
- activeWorkload = workload
- workload.run(vmCtx)
- } finally {
- activeWorkload = null
- }
- }
-
- override fun toString(): String = "SimVirtualMachine"
- }
- }
-
- /**
- * Run the scheduling process of the hypervisor.
- */
- override suspend fun run(ctx: SimExecutionContext) {
- val model = ctx.machine
- val maxUsage = model.cpus.sumByDouble { it.frequency }
- val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency }
-
- val vms = mutableSetOf<VmSession>()
- val vcpus = mutableListOf<VCpu>()
-
- val usage = DoubleArray(model.cpus.size)
- val burst = LongArray(model.cpus.size)
-
- fun process(command: SchedulerCommand) {
- when (command) {
- is SchedulerCommand.Schedule -> {
- vms += command.vm
- vcpus.addAll(command.vm.vcpus)
- }
- is SchedulerCommand.Deschedule -> {
- vms -= command.vm
- vcpus.removeAll(command.vm.vcpus)
- }
- is SchedulerCommand.Interrupt -> {
- }
- }
- }
-
- fun processRemaining() {
- var command = schedulingQueue.poll()
- while (command != null) {
- process(command)
- command = schedulingQueue.poll()
- }
- }
-
- while (!stopped) {
- // Wait for a request to be submitted if we have no work yet.
- if (vcpus.isEmpty()) {
- process(schedulingQueue.receive())
- }
-
- processRemaining()
-
- val start = clock.millis()
-
- var duration: Double = Double.POSITIVE_INFINITY
- var deadline: Long = Long.MAX_VALUE
- var availableUsage = maxUsage
- var totalRequestedUsage = 0.0
- var totalRequestedBurst = 0L
-
- // Sort the vCPUs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
- vcpus.sort()
-
- // Divide the available host capacity fairly across the vCPUs using max-min fair sharing
- for ((i, req) in vcpus.withIndex()) {
- val remaining = vcpus.size - i
- val availableShare = availableUsage / remaining
- val grantedUsage = min(req.limit, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
- deadline = min(deadline, req.vm.deadline)
-
- // Ignore empty CPUs
- if (grantedUsage <= 0 || req.burst <= 0) {
- req.allocatedLimit = 0.0
- continue
- }
-
- totalRequestedUsage += req.limit
- totalRequestedBurst += req.burst
-
- req.allocatedLimit = grantedUsage
- availableUsage -= grantedUsage
-
- // The duration that we want to run is that of the shortest request from a vCPU
- duration = min(duration, req.burst / grantedUsage)
- }
-
- // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs.
- duration = 300.0
-
- val totalAllocatedUsage = maxUsage - availableUsage
- var totalAllocatedBurst = 0L
- availableUsage = totalAllocatedUsage
- val serverLoad = totalAllocatedUsage / maxUsage
-
- // Divide the requests over the available capacity of the pCPUs fairly
- for (i in pCPUs) {
- val maxCpuUsage = model.cpus[i].frequency
- val fraction = maxCpuUsage / maxUsage
- val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction)
- val grantedBurst = ceil(duration * grantedUsage).toLong()
-
- usage[i] = grantedUsage
- burst[i] = grantedBurst
- totalAllocatedBurst += grantedBurst
- availableUsage -= grantedUsage
- }
-
- // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
- // time, so not all of the burst may be executed.
- select<Boolean> {
- schedulingQueue.onReceive { schedulingQueue.offer(it); true }
- ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), SimExecutionContext.TriggerMode.DEADLINE)
- .invoke { false }
- }
-
- val end = clock.millis()
-
- // No work was performed
- if ((end - start) <= 0) {
- continue
- }
-
- // The total requested burst that the VMs wanted to run in the time-frame that we ran.
- val totalRequestedSubBurst =
- vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
- val totalRemainder = burst.sum()
- val totalGrantedBurst = totalAllocatedBurst - totalRemainder
-
- // The burst that was lost due to overcommissioning of CPU resources
- var totalOvercommissionedBurst = 0L
- // The burst that was lost due to interference.
- var totalInterferedBurst = 0L
-
- val vmIterator = vms.iterator()
- while (vmIterator.hasNext()) {
- val vm = vmIterator.next()
-
- // Apply performance interference model
- val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0
- var hasFinished = false
-
- for (vcpu in vm.vcpus) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = vcpu.allocatedLimit / totalAllocatedUsage
-
- // Compute the burst time that the VM was actually granted
- val grantedBurst = ceil(totalGrantedBurst * fraction).toLong()
-
- // The burst that was actually used by the VM
- val usedBurst = ceil(grantedBurst * performanceScore).toLong()
-
- totalInterferedBurst += grantedBurst - usedBurst
-
- // Compute remaining burst time to be executed for the request
- if (vcpu.consume(usedBurst)) {
- hasFinished = true
- } else if (vm.deadline <= end) {
- // Request must have its entire burst consumed or otherwise we have overcommission
- // Note that we count the overcommissioned burst if the hypervisor has failed.
- totalOvercommissionedBurst += vcpu.burst
- }
- }
-
- if (hasFinished || vm.deadline <= end) {
- // Mark the VM as finished and deschedule the VMs if needed
- if (vm.finish()) {
- vmIterator.remove()
- vcpus.removeAll(vm.vcpus)
- }
- }
- }
-
- listener?.onSliceFinish(
- this,
- totalRequestedBurst,
- min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
- totalOvercommissionedBurst,
- totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
- min(
- totalAllocatedUsage,
- totalRequestedUsage
- ), // The allocated usage might be slightly higher due to FP rounding
- totalRequestedUsage
- )
- }
- }
+ public fun createMachine(
+ model: SimMachineModel,
+ performanceInterferenceModel: PerformanceInterferenceModel? = null
+ ): SimMachine
/**
* Event listener for hypervisor events.
@@ -301,235 +57,4 @@ public class SimHypervisor(
cpuDemand: Double
)
}
-
- /**
- * A scheduling command processed by the scheduler.
- */
- private sealed class SchedulerCommand {
- /**
- * Schedule the specified VM on the hypervisor.
- */
- data class Schedule(val vm: VmSession) : SchedulerCommand()
-
- /**
- * De-schedule the specified VM on the hypervisor.
- */
- data class Deschedule(val vm: VmSession) : SchedulerCommand()
-
- /**
- * Interrupt the scheduler.
- */
- object Interrupt : SchedulerCommand()
- }
-
- /**
- * A virtual machine running on the hypervisor.
- *
- * @param ctx The execution context the vCPU runs in.
- * @param triggerMode The mode when to trigger the VM exit.
- * @param merge The function to merge consecutive slices on spillover.
- * @param select The function to select on finish.
- */
- @OptIn(InternalCoroutinesApi::class)
- private data class VmSession(
- val model: SimMachineModel,
- val performanceInterferenceModel: PerformanceInterferenceModel? = null,
- var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST,
- var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r },
- var select: () -> Unit = {}
- ) {
- /**
- * The vCPUs of this virtual machine.
- */
- val vcpus: List<VCpu>
-
- /**
- * The slices that the VM wants to run.
- */
- var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator()
-
- /**
- * The current active slice.
- */
- var activeSlice: SimExecutionContext.Slice? = null
-
- /**
- * The current deadline of the VM.
- */
- val deadline: Long
- get() = activeSlice?.deadline ?: Long.MAX_VALUE
-
- /**
- * A flag to indicate that the VM is idle.
- */
- val isIdle: Boolean
- get() = activeSlice == null
-
- /**
- * The usage of the virtual machine.
- */
- val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
-
- init {
- vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) }
- }
-
- /**
- * Schedule the given slices on this vCPU, replacing the existing slices.
- */
- fun schedule(slices: Sequence<SimExecutionContext.Slice>) {
- queue = slices.iterator()
-
- if (queue.hasNext()) {
- activeSlice = queue.next()
- refresh()
- }
- }
-
- /**
- * Cancel the existing workload on the VM.
- */
- fun cancel() {
- queue = emptyList<SimExecutionContext.Slice>().iterator()
- activeSlice = null
- refresh()
- }
-
- /**
- * Finish the current slice of the VM.
- *
- * @return `true` if the vCPUs may be descheduled, `false` otherwise.
- */
- fun finish(): Boolean {
- val activeSlice = activeSlice ?: return true
-
- return if (queue.hasNext()) {
- val needsMerge = activeSlice.burst.any { it > 0 }
- val candidateSlice = queue.next()
- val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice
-
- this.activeSlice = slice
-
- // Update the vCPU cache
- refresh()
-
- false
- } else {
- this.activeSlice = null
- select()
- true
- }
- }
-
- /**
- * Refresh the vCPU cache.
- */
- fun refresh() {
- vcpus.forEach { it.refresh() }
- usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size
- }
- }
-
- /**
- * A virtual CPU that can be scheduled on a physical CPU.
- *
- * @param vm The VM of which this vCPU is part.
- * @param model The model of CPU that this vCPU models.
- * @param id The id of the vCPU with respect to the VM.
- */
- private data class VCpu(
- val vm: VmSession,
- val model: ProcessingUnit,
- val id: Int
- ) : Comparable<VCpu> {
- /**
- * The current limit on the vCPU.
- */
- var limit: Double = 0.0
-
- /**
- * The limit allocated by the hypervisor.
- */
- var allocatedLimit: Double = 0.0
-
- /**
- * The current burst running on the vCPU.
- */
- var burst: Long = 0L
-
- /**
- * Consume the specified burst on this vCPU.
- */
- fun consume(burst: Long): Boolean {
- this.burst = max(0, this.burst - burst)
-
- // Flush the result to the slice if it exists
- vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst)
-
- return allocatedLimit > 0.0 && this.burst == 0L
- }
-
- /**
- * Refresh the information of this vCPU based on the current slice.
- */
- fun refresh() {
- limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0
- burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0
- }
-
- /**
- * Compare to another vCPU based on the current load of the vCPU.
- */
- override fun compareTo(other: VCpu): Int {
- return limit.compareTo(other.limit)
- }
-
- /**
- * Create a string representation of the vCPU.
- */
- override fun toString(): String =
- "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)"
- }
-
- /**
- * The execution context in which a VM runs.
- *
- */
- private inner class VmExecutionContext(val session: VmSession) :
- SimExecutionContext, DisposableHandle {
- override val machine: SimMachineModel
- get() = session.model
-
- override val clock: Clock
- get() = this@SimHypervisor.clock
-
- @OptIn(InternalCoroutinesApi::class)
- override fun onRun(
- batch: Sequence<SimExecutionContext.Slice>,
- triggerMode: SimExecutionContext.TriggerMode,
- merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice
- ): SelectClause0 = object : SelectClause0 {
- @InternalCoroutinesApi
- override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
- session.triggerMode = triggerMode
- session.merge = merge
- session.select = {
- if (select.trySelect()) {
- block.startCoroutineCancellable(select.completion)
- }
- }
- session.schedule(batch)
- // Indicate to the hypervisor that the VM should be re-scheduled
- schedulingQueue.offer(SchedulerCommand.Schedule(session))
- select.disposeOnSelect(this@VmExecutionContext)
- }
- }
-
- override fun dispose() {
- if (!session.isIdle) {
- session.cancel()
- schedulingQueue.offer(SchedulerCommand.Deschedule(session))
- }
- }
- }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 918a78bd..0d2c9374 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -39,7 +39,7 @@ public class SimFlopsWorkload(
public val utilization: Double = 0.8
) : SimWorkload {
init {
- require(flops >= 0) { "Negative number of flops" }
+ require(flops >= 0) { "Negative number of FLOPs" }
require(cores > 0) { "Negative number of cores or no cores" }
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
@@ -54,4 +54,6 @@ public class SimFlopsWorkload(
ctx.run(SimExecutionContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = SimExecutionContext.TriggerMode.LAST)
}
+
+ override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,cores=$cores,utilization=$utilization)"
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index 78bd2940..e7fdd4b2 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -105,7 +105,7 @@ internal class SimHypervisorTest {
)
val machine = SimBareMetalMachine(scope, clock, machineModel)
- val hypervisor = SimHypervisor(scope, clock, listener)
+ val hypervisor = SimFairSharedHypervisor(scope, clock, listener)
launch {
machine.run(hypervisor)
@@ -120,7 +120,7 @@ internal class SimHypervisorTest {
assertAll(
{ Assertions.assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
- { Assertions.assertEquals(2073600, listener.totalRequestedBurst, "Requested Burst does not match") },
+ { Assertions.assertEquals(2082000, listener.totalRequestedBurst, "Requested Burst does not match") },
{ Assertions.assertEquals(2013600, listener.totalGrantedBurst, "Granted Burst does not match") },
{ Assertions.assertEquals(60000, listener.totalOvercommissionedBurst, "Overcommissioned Burst does not match") },
{ Assertions.assertEquals(1200001, scope.currentTime) }
diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts
index fa03508c..4346efcc 100644
--- a/simulator/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc-workflows/build.gradle.kts
@@ -32,8 +32,10 @@ dependencies {
api(project(":opendc-compute:opendc-compute-core"))
api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
+ implementation("io.github.microutils:kotlin-logging:1.7.9")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+ testImplementation(project(":opendc-compute:opendc-compute-simulator"))
testImplementation(project(":opendc-format"))
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") {
exclude("org.jetbrains.kotlin", module = "kotlin-reflect")
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index 91657f83..2c8d9a0b 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -29,41 +29,43 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import org.opendc.compute.core.Flavor
import org.opendc.compute.core.Server
import org.opendc.compute.core.ServerEvent
import org.opendc.compute.core.ServerState
-import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.metal.service.ProvisioningService
+import org.opendc.compute.core.virt.service.VirtProvisioningService
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.consumeAsFlow
import org.opendc.trace.core.enable
import org.opendc.workflows.service.stage.job.JobAdmissionPolicy
import org.opendc.workflows.service.stage.job.JobOrderPolicy
-import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy
-import org.opendc.workflows.service.stage.resource.ResourceSelectionPolicy
import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import org.opendc.workflows.service.stage.task.TaskOrderPolicy
import org.opendc.workflows.workload.Job
+import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
import java.time.Clock
import java.util.*
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
- * Topology Scheduling.
+ * Datacenter Scheduling.
*/
public class StageWorkflowService(
internal val coroutineScope: CoroutineScope,
internal val clock: Clock,
internal val tracer: EventTracer,
- private val provisioningService: ProvisioningService,
+ private val provisioningService: VirtProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
- taskOrderPolicy: TaskOrderPolicy,
- resourceFilterPolicy: ResourceFilterPolicy,
- resourceSelectionPolicy: ResourceSelectionPolicy
+ taskOrderPolicy: TaskOrderPolicy
) : WorkflowService {
+ /**
+ * The logger instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -101,25 +103,10 @@ public class StageWorkflowService(
internal val taskByServer = mutableMapOf<Server, TaskState>()
/**
- * The nodes that are controlled by the service.
- */
- internal lateinit var nodes: List<Node>
-
- /**
- * The available nodes.
- */
- internal val available: MutableSet<Node> = mutableSetOf()
-
- /**
- * The maximum number of incoming jobs.
- */
- private val throttleLimit: Int = 20000
-
- /**
* The load of the system.
*/
internal val load: Double
- get() = (available.size / nodes.size.toDouble())
+ get() = (activeTasks.size / provisioningService.hostCount.toDouble())
/**
* The root listener of this scheduler.
@@ -170,22 +157,13 @@ public class StageWorkflowService(
private val mode: WorkflowSchedulerMode.Logic
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
- private val resourceFilterPolicy: ResourceFilterPolicy.Logic
- private val resourceSelectionPolicy: Comparator<Node>
init {
- coroutineScope.launch {
- nodes = provisioningService.nodes().toList()
- available.addAll(nodes)
- }
-
this.mode = mode(this)
this.jobAdmissionPolicy = jobAdmissionPolicy(this)
this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
this.taskEligibilityPolicy = taskEligibilityPolicy(this)
this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
- this.resourceFilterPolicy = resourceFilterPolicy(this)
- this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
override val events: Flow<WorkflowEvent> = tracer.openRecording().let {
@@ -290,26 +268,25 @@ public class StageWorkflowService(
// T3 Per task
while (taskQueue.isNotEmpty()) {
val instance = taskQueue.peek()
- val host: Node? = available.firstOrNull()
- if (host != null) {
- // T4 Submit task to machine
- available -= host
+ val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1
+ val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task
+ val image = instance.task.image
+ coroutineScope.launch {
+ val server = provisioningService.deploy(instance.task.name, image, flavor)
+
instance.state = TaskStatus.ACTIVE
- val newHost = provisioningService.deploy(host, instance.task.image)
- val server = newHost.server!!
- instance.host = newHost
+ instance.server = server
taskByServer[server] = instance
+
server.events
.onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
.launchIn(coroutineScope)
-
- activeTasks += instance
- taskQueue.poll()
- rootListener.taskAssigned(instance)
- } else {
- break
}
+
+ activeTasks += instance
+ taskQueue.poll()
+ rootListener.taskAssigned(instance)
}
}
@@ -333,7 +310,6 @@ public class StageWorkflowService(
task.state = TaskStatus.FINISHED
task.finishedAt = clock.millis()
job.tasks.remove(task)
- available += task.host!!
activeTasks -= task
tracer.commit(
WorkflowEvent.TaskFinished(
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
index ed023c82..d1eb6704 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
@@ -22,7 +22,7 @@
package org.opendc.workflows.service
-import org.opendc.compute.core.metal.Node
+import org.opendc.compute.core.Server
import org.opendc.workflows.workload.Task
public class TaskState(public val job: JobState, public val task: Task) {
@@ -62,7 +62,7 @@ public class TaskState(public val job: JobState, public val task: Task) {
}
}
- public var host: Node? = null
+ public var server: Server? = null
/**
* Mark the specified [TaskView] as terminated.
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
deleted file mode 100644
index 8dc323ec..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.StageWorkflowService
-
-/**
- * A [ResourceSelectionPolicy] that selects the first machine that is available.
- */
-public object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<Node> =
- Comparator<Node> { _, _ -> 1 }
-
- override fun toString(): String = "First-Fit"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
deleted file mode 100644
index ac79a9ce..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
-
-/**
- * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for
- * the task.
- */
-public object FunctionalResourceFilterPolicy : ResourceFilterPolicy {
- override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic =
- object : ResourceFilterPolicy.Logic {
- override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> =
- hosts.filter { it in scheduler.available }
- }
-
- override fun toString(): String = "functional"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
deleted file mode 100644
index caf87c70..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.StageWorkflowService
-import java.util.*
-
-/**
- * A [ResourceSelectionPolicy] that randomly orders the machines.
- */
-public object RandomResourceSelectionPolicy : ResourceSelectionPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<Node> = object : Comparator<Node> {
- private val ids: Map<Node, Long>
-
- init {
- val random = Random(123)
- ids = scheduler.nodes.associateWith { random.nextLong() }
- }
-
- override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] }
- }
-
- override fun toString(): String = "Random"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
deleted file mode 100644
index 4923a34b..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.TaskState
-import org.opendc.workflows.service.stage.StagePolicy
-
-/**
- * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and
- * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic
- * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc.
- */
-public interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> {
- public interface Logic {
- /**
- * Filter the list of machines based on dynamic information.
- *
- * @param hosts The hosts to filter.
- * @param task The task that is to be scheduled.
- * @return The machines on which the task can be scheduled.
- */
- public operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node>
- }
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
deleted file mode 100644
index 990b990a..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.stage.StagePolicy
-
-/**
- * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected
- * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit.
- */
-public interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>>
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
index d02e2b4e..4305aa57 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
@@ -28,3 +28,8 @@ package org.opendc.workflows.workload
* Meta-data key for the deadline of a task.
*/
public const val WORKFLOW_TASK_DEADLINE: String = "workflow:task:deadline"
+
+/**
+ * Meta-data key for the number of cores needed for a task.
+ */
+public const val WORKFLOW_TASK_CORES: String = "workflow:task:cores"
diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 62955a11..b97cb915 100644
--- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -37,14 +37,14 @@ import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.metal.service.ProvisioningService
+import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
-import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
-import org.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
import kotlin.math.max
@@ -74,18 +74,26 @@ internal class StageWorkflowSchedulerIntegrationTest {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
.use { it.construct(testScope, clock) }
+ val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
+ val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, schedulingQuantum = 1000)
+
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
StageWorkflowService(
testScope,
clock,
tracer,
- environment.platforms[0].zones[0].services[ProvisioningService],
+ provisioner,
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
taskEligibilityPolicy = NullTaskEligibilityPolicy,
taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
- resourceFilterPolicy = FunctionalResourceFilterPolicy,
- resourceSelectionPolicy = FirstFitResourceSelectionPolicy
)
}
@@ -110,7 +118,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
while (reader.hasNext()) {
val (time, job) = reader.next()
jobsSubmitted++
- delay(max(0, time * 1000 - clock.millis()))
+ delay(max(0, time - clock.millis()))
scheduler.submit(job)
}
}
@@ -118,6 +126,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
testScope.advanceUntilIdle()
assertAll(
+ { assertEquals(emptyList<Throwable>(), testScope.uncaughtExceptions) },
{ assertNotEquals(0, jobsSubmitted, "No jobs submitted") },
{ assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") },
{ assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") },