summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-01-11 20:13:46 +0100
committerGitHub <noreply@github.com>2021-01-11 20:13:46 +0100
commitbf558fe36c9de52310e85044ee4447b45bd50b75 (patch)
treebea8e47037660c88df42e04105e7a0b7f709173a
parent42e9a5b5b610f41a03e68f6fc781c54b9402925b (diff)
parent9dbb7bbcc2202955c715aaa3b28c70641a2fbd5b (diff)
Merge pull request #71 from atlarge-research/perf/workload
Convert to pull-based workload model
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt8
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt23
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt109
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt44
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt52
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt8
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt18
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt16
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt2
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt8
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt6
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt2
-rw-r--r--simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt1
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts1
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt347
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt113
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt590
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriverWorkload.kt)22
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt517
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt18
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt41
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt9
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt284
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt32
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt41
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt52
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt60
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt57
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt27
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt45
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt117
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt88
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt175
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt22
-rw-r--r--simulator/opendc-utils/build.gradle.kts5
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt209
-rw-r--r--simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt147
-rw-r--r--simulator/opendc-workflows/build.gradle.kts1
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt2
-rw-r--r--simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt5
46 files changed, 2315 insertions, 1027 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
index 5ecfd357..68cc7b50 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
@@ -23,6 +23,7 @@
package org.opendc.compute.core.virt.driver
import kotlinx.coroutines.flow.Flow
+import org.opendc.compute.core.Flavor
import org.opendc.compute.core.Server
import org.opendc.compute.core.image.Image
import org.opendc.compute.core.virt.HypervisorEvent
@@ -40,6 +41,11 @@ public interface VirtDriver {
public val events: Flow<HypervisorEvent>
/**
+ * Determine whether the specified [flavor] can still fit on this driver.
+ */
+ public fun canFit(flavor: Flavor): Boolean
+
+ /**
* Spawn the given [Image] on the compute resource of this driver.
*
* @param name The name of the server to spawn.
@@ -50,7 +56,7 @@ public interface VirtDriver {
public suspend fun spawn(
name: String,
image: Image,
- flavor: org.opendc.compute.core.Flavor
+ flavor: Flavor
): Server
public companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver")
diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
index d7570e54..dc93e956 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -29,10 +29,10 @@ plugins {
dependencies {
api(project(":opendc-compute:opendc-compute-core"))
+ api(project(":opendc-simulator:opendc-simulator-compute"))
+ api(project(":opendc-simulator:opendc-simulator-failures"))
implementation(project(":opendc-utils"))
implementation("io.github.microutils:kotlin-logging:1.7.9")
- implementation(project(":opendc-simulator:opendc-simulator-compute"))
- api(project(":opendc-simulator:opendc-simulator-failures"))
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
index 97f550ba..7a978a53 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
@@ -41,6 +41,7 @@ import org.opendc.core.services.ServiceRegistry
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimExecutionContext
import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.workload.SimResourceCommand
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.FailureDomain
import org.opendc.utils.flow.EventFlow
@@ -139,15 +140,31 @@ public class SimBareMetalDriver(
events
)
+ val delegate = (node.image as SimWorkloadImage).workload
// Wrap the workload to pass in a ComputeSimExecutionContext
val workload = object : SimWorkload {
- override suspend fun run(ctx: SimExecutionContext) {
- val wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx {
+ lateinit var wrappedCtx: ComputeSimExecutionContext
+
+ override fun onStart(ctx: SimExecutionContext) {
+ wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx {
override val server: Server
get() = nodeState.value.server!!
+
+ override fun toString(): String = "WrappedSimExecutionContext"
}
- (node.image as SimWorkloadImage).workload.run(wrappedCtx)
+
+ delegate.onStart(wrappedCtx)
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return delegate.onStart(wrappedCtx, cpu)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return delegate.onNext(wrappedCtx, cpu, remainingWork)
}
+
+ override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)"
}
job = coroutineScope.launch {
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
index 249979a8..d7a8a8b2 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
@@ -36,25 +36,25 @@ import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.workload.SimResourceCommand
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.utils.flow.EventFlow
-import java.time.Clock
import java.util.*
/**
* A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor].
*/
-public class SimVirtDriver(
- private val coroutineScope: CoroutineScope,
- clock: Clock,
- private val ctx: SimExecutionContext
-) : VirtDriver {
+public class SimVirtDriver(private val coroutineScope: CoroutineScope, hypervisor: SimHypervisorProvider) : VirtDriver, SimWorkload {
+ /**
+ * The execution context in which the [VirtDriver] runs.
+ */
+ private lateinit var ctx: ComputeSimExecutionContext
/**
* The server hosting this hypervisor.
*/
public val server: Server
- get() = (ctx as ComputeSimExecutionContext).server
+ get() = ctx.server
/**
* The [EventFlow] to emit the events.
@@ -66,35 +66,33 @@ public class SimVirtDriver(
/**
* Current total memory use of the images on this hypervisor.
*/
- private var availableMemory: Long = ctx.machine.memory.map { it.size }.sum()
+ private var availableMemory: Long = 0
/**
* The hypervisor to run multiple workloads.
*/
- private val hypervisor = SimFairSharedHypervisor(
- coroutineScope,
- clock,
+ private val hypervisor = hypervisor.create(
object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
cpuUsage: Double,
cpuDemand: Double
) {
eventFlow.emit(
HypervisorEvent.SliceFinished(
this@SimVirtDriver,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
+ requestedWork,
+ grantedWork,
+ overcommittedWork,
+ interferedWork,
cpuUsage,
cpuDemand,
vms.size,
- (ctx as ComputeSimExecutionContext).server
+ ctx.server
)
)
}
@@ -106,6 +104,14 @@ public class SimVirtDriver(
*/
private val vms = HashSet<VirtualMachine>()
+ override fun canFit(flavor: Flavor): Boolean {
+ val sufficientMemory = availableMemory > flavor.memorySize
+ val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount
+ val canFit = hypervisor.canFit(flavor.toMachineModel())
+
+ return sufficientMemory && enoughCpus && canFit
+ }
+
override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server {
val requiredMemory = flavor.memorySize
if (availableMemory - requiredMemory < 0) {
@@ -126,19 +132,25 @@ public class SimVirtDriver(
)
availableMemory -= requiredMemory
- val originalCpu = ctx.machine.cpus[0]
- val processingNode = originalCpu.node.copy(coreCount = flavor.cpuCount)
- val processingUnits = (0 until flavor.cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
- val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, flavor.memorySize))
-
- val machine = SimMachineModel(processingUnits, memoryUnits)
- val vm = VirtualMachine(server, events, hypervisor.createMachine(machine))
+ val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel()))
vms.add(vm)
vmStarted(vm)
eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
+ /**
+ * Convert flavor to machine model.
+ */
+ private fun Flavor.toMachineModel(): SimMachineModel {
+ val originalCpu = ctx.machine.cpus[0]
+ val processingNode = originalCpu.node.copy(coreCount = cpuCount)
+ val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+
+ return SimMachineModel(processingUnits, memoryUnits)
+ }
+
private fun vmStarted(vm: VirtualMachine) {
vms.forEach { it ->
vm.performanceInterferenceModel?.onStart(it.server.image.name)
@@ -154,18 +166,35 @@ public class SimVirtDriver(
/**
* A virtual machine instance that the driver manages.
*/
- private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, machine: SimMachine) {
+ private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) {
val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
val job = coroutineScope.launch {
+ val delegate = (server.image as SimWorkloadImage).workload
+ // Wrap the workload to pass in a ComputeSimExecutionContext
val workload = object : SimWorkload {
- override suspend fun run(ctx: SimExecutionContext) {
- val wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx {
+ lateinit var wrappedCtx: ComputeSimExecutionContext
+
+ override fun onStart(ctx: SimExecutionContext) {
+ wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx {
override val server: Server
- get() = this@VirtualMachine.server
+ get() = server
+
+ override fun toString(): String = "WrappedSimExecutionContext"
}
- (server.image as SimWorkloadImage).workload.run(wrappedCtx)
+
+ delegate.onStart(wrappedCtx)
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return delegate.onStart(wrappedCtx, cpu)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return delegate.onNext(wrappedCtx, cpu, remainingWork)
}
+
+ override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)"
}
delay(1) // TODO Introduce boot time
@@ -175,6 +204,8 @@ public class SimVirtDriver(
exit(null)
} catch (cause: Throwable) {
exit(cause)
+ } finally {
+ machine.close()
}
}
@@ -206,7 +237,17 @@ public class SimVirtDriver(
}
}
- public suspend fun run() {
- hypervisor.run(ctx)
+ override fun onStart(ctx: SimExecutionContext) {
+ this.ctx = ctx as ComputeSimExecutionContext
+ this.availableMemory = ctx.machine.memory.map { it.size }.sum()
+ this.hypervisor.onStart(ctx)
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return hypervisor.onStart(ctx, cpu)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return hypervisor.onNext(ctx, cpu, remainingWork)
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
index 17de3de7..defea888 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
@@ -40,7 +40,9 @@ import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.compute.core.virt.service.VirtProvisioningService
import org.opendc.compute.core.virt.service.events.*
import org.opendc.compute.simulator.allocation.AllocationPolicy
+import org.opendc.simulator.compute.SimHypervisorProvider
import org.opendc.trace.core.EventTracer
+import org.opendc.utils.TimerScheduler
import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
@@ -55,7 +57,8 @@ public class SimVirtProvisioningService(
private val provisioningService: ProvisioningService,
public val allocationPolicy: AllocationPolicy,
private val tracer: EventTracer,
- private val schedulingQuantum: Long = 300000 // 5 minutes in milliseconds
+ private val hypervisor: SimHypervisorProvider,
+ private val schedulingQuantum: Long = 300000, // 5 minutes in milliseconds
) : VirtProvisioningService {
/**
* The logger instance to use.
@@ -75,7 +78,7 @@ public class SimVirtProvisioningService(
/**
* The incoming images to be processed by the provisioner.
*/
- private val incomingImages: MutableSet<ImageView> = mutableSetOf()
+ private val incomingImages: Deque<ImageView> = ArrayDeque()
/**
* The active images in the system.
@@ -103,11 +106,16 @@ public class SimVirtProvisioningService(
override val events: Flow<VirtProvisioningEvent> = eventFlow
+ /**
+ * The [TimerScheduler] to use for scheduling the scheduler cycles.
+ */
+ private var scheduler: TimerScheduler<Unit> = TimerScheduler(coroutineScope, clock)
+
init {
coroutineScope.launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
- val workload = SimVirtDriverWorkload()
+ val workload = SimVirtDriver(coroutineScope, hypervisor)
val hypervisorImage = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), workload)
launch {
var init = false
@@ -125,7 +133,7 @@ public class SimVirtProvisioningService(
}.launchIn(this)
delay(1)
- onHypervisorAvailable(server, workload.driver)
+ onHypervisorAvailable(server, workload)
}
}
}
@@ -169,10 +177,9 @@ public class SimVirtProvisioningService(
provisionedNodes.forEach { node -> provisioningService.stop(node) }
}
- private var call: Job? = null
-
private fun requestCycle() {
- if (call != null) {
+ // Bail out in case we have already requested a new cycle.
+ if (scheduler.isTimerActive(Unit)) {
return
}
@@ -181,22 +188,20 @@ public class SimVirtProvisioningService(
// We calculate here the delay until the next scheduling slot.
val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
- val call = coroutineScope.launch {
- delay(delay)
- this@SimVirtProvisioningService.call = null
- schedule()
+ scheduler.startSingleTimer(Unit, delay) {
+ coroutineScope.launch { schedule() }
}
- this.call = call
}
private suspend fun schedule() {
- val imagesToBeScheduled = incomingImages.toSet()
-
- for (imageInstance in imagesToBeScheduled) {
+ while (incomingImages.isNotEmpty()) {
+ val imageInstance = incomingImages.peekFirst()
val requiredMemory = imageInstance.flavor.memorySize
val selectedHv = allocationLogic.select(availableHypervisors, imageInstance)
- if (selectedHv == null) {
+ if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) {
+ logger.trace { "Image ${imageInstance.image} selected for scheduling but no capacity available for it." }
+
if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
tracer.commit(VmSubmissionInvalidEvent(imageInstance.name))
@@ -208,12 +213,13 @@ public class SimVirtProvisioningService(
submittedVms,
runningVms,
finishedVms,
- queuedVms,
+ --queuedVms,
++unscheduledVms
)
)
- incomingImages -= imageInstance
+ // Remove the incoming image
+ incomingImages.poll()
logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]")
continue
@@ -224,7 +230,7 @@ public class SimVirtProvisioningService(
try {
logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
- incomingImages -= imageInstance
+ incomingImages.poll()
// Speculatively update the hypervisor view information to prevent other images in the queue from
// deciding on stale values.
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
index 0f1bd444..fb8a5f47 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
@@ -64,7 +64,7 @@ internal class SimBareMetalDriverTest {
testScope.launch {
val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel)
- val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(4_000, 2, utilization = 1.0))
+ val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(4_000, utilization = 1.0))
// Batch driver commands
withContext(coroutineContext) {
@@ -84,6 +84,6 @@ internal class SimBareMetalDriverTest {
testScope.advanceUntilIdle()
assertEquals(ServerState.SHUTOFF, finalState)
- assertEquals(1001, finalTime)
+ assertEquals(501, finalTime)
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
index def78ce7..a33a4e5f 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
@@ -64,7 +64,7 @@ internal class SimProvisioningServiceTest {
val clock = DelayControllerClockAdapter(testScope)
testScope.launch {
- val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(1000, 2))
+ val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(1000))
val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel)
val provisioner = SimpleProvisioningService()
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt
index 394e87c6..1831eae0 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.Flavor
import org.opendc.compute.core.virt.HypervisorEvent
+import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -66,17 +67,17 @@ internal class SimVirtDriverTest {
}
/**
- * Test overcommissioning of a hypervisor.
+ * Test overcommitting of resources by the hypervisor.
*/
@Test
- fun overcommission() {
- var requestedBurst = 0L
- var grantedBurst = 0L
- var overcommissionedBurst = 0L
+ fun testOvercommitted() {
+ var requestedWork = 0L
+ var grantedWork = 0L
+ var overcommittedWork = 0L
scope.launch {
- val virtDriverWorkload = SimVirtDriverWorkload()
- val vmm = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), virtDriverWorkload)
+ val virtDriver = SimVirtDriver(this, SimFairShareHypervisorProvider())
+ val vmm = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), virtDriver)
val duration = 5 * 60L
val vmImageA = SimWorkloadImage(
UUID.randomUUID(),
@@ -84,10 +85,10 @@ internal class SimVirtDriverTest {
emptyMap(),
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
- SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(0, 183L * duration, duration * 1000, 183.0, 2)
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
),
)
)
@@ -97,10 +98,10 @@ internal class SimVirtDriverTest {
emptyMap(),
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
- SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(0, 73L * duration, duration * 1000, 73.0, 2)
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
)
),
)
@@ -115,31 +116,30 @@ internal class SimVirtDriverTest {
delay(5)
val flavor = Flavor(2, 0)
- val vmDriver = virtDriverWorkload.driver
- vmDriver.events
+ virtDriver.events
.onEach { event ->
when (event) {
is HypervisorEvent.SliceFinished -> {
- requestedBurst += event.requestedBurst
- grantedBurst += event.grantedBurst
- overcommissionedBurst += event.overcommissionedBurst
+ requestedWork += event.requestedBurst
+ grantedWork += event.grantedBurst
+ overcommittedWork += event.overcommissionedBurst
}
}
}
.launchIn(this)
- vmDriver.spawn("a", vmImageA, flavor)
- vmDriver.spawn("b", vmImageB, flavor)
+ virtDriver.spawn("a", vmImageA, flavor)
+ virtDriver.spawn("b", vmImageB, flavor)
}
scope.advanceUntilIdle()
assertAll(
{ assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
- { assertEquals(2082000, requestedBurst, "Requested Burst does not match") },
- { assertEquals(2013600, grantedBurst, "Granted Burst does not match") },
- { assertEquals(60000, overcommissionedBurst, "Overcommissioned Burst does not match") },
- { assertEquals(1200007, scope.currentTime) }
+ { assertEquals(4197600, requestedWork, "Requested work does not match") },
+ { assertEquals(3057600, grantedWork, "Granted work does not match") },
+ { assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") },
+ { assertEquals(1200006, scope.currentTime) }
)
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
index 9ad744f2..202df6df 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
@@ -32,6 +32,7 @@ import org.opendc.compute.simulator.SimVirtProvisioningService
import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
+import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import org.opendc.workflows.service.StageWorkflowService
@@ -71,7 +72,7 @@ public fun main(args: Array<String>) {
// Wait for the bare metal nodes to be spawned
delay(10)
- val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, schedulingQuantum = 1000)
+ val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000)
// Wait for the hypervisors to be spawned
delay(10)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index f939738d..1e01e892 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -48,6 +48,7 @@ import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import org.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FailureDomain
@@ -150,7 +151,7 @@ public suspend fun createProvisioner(
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer)
+ val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider())
// Wait for the hypervisors to be spawned
delay(10)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
index 9bc1a58e..4a318df4 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -66,8 +66,6 @@ public class Sc20RawParquetTraceReader(private val path: File) {
val flops = record["flops"] as Long
val fragment = SimTraceWorkload.Fragment(
- tick,
- flops,
duration,
cpuUsage,
cores
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
index edef276c..ba22ae15 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
@@ -92,7 +92,7 @@ public class Sc20StreamingParquetTraceReader(
/**
* A poisonous fragment.
*/
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0, 0, 0.0, 0))
+ private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0))
/**
* The thread to read the records in.
@@ -120,8 +120,6 @@ public class Sc20StreamingParquetTraceReader(
val flops = record["flops"] as Long
val fragment = SimTraceWorkload.Fragment(
- tick,
- flops,
duration,
cpuUsage,
cores
@@ -204,6 +202,7 @@ public class Sc20StreamingParquetTraceReader(
val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
val fragments = sequence {
+ var time = submissionTime
repeat@ while (true) {
if (externalBuffer.isEmpty()) {
if (hasNext) {
@@ -220,7 +219,8 @@ public class Sc20StreamingParquetTraceReader(
for (fragment in internalBuffer) {
yield(fragment)
- if (fragment.time >= endTime) {
+ time += fragment.duration
+ if (time >= endTime) {
break@repeat
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 2eedb636..c5ad345d 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -90,7 +90,7 @@ class Sc20IntegrationTest {
fun tearDown() = testScope.cleanupTestCoroutines()
@Test
- fun smoke() {
+ fun testLarge() {
val failures = false
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
@@ -148,15 +148,15 @@ class Sc20IntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(207480856422, monitor.totalRequestedBurst) },
- { assertEquals(206510493178, monitor.totalGrantedBurst) },
- { assertEquals(336120436, monitor.totalOvercommissionedBurst) },
+ { assertEquals(1684849230562, monitor.totalRequestedBurst) },
+ { assertEquals(447612683996, monitor.totalGrantedBurst) },
+ { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
}
@Test
- fun small() {
+ fun testSmall() {
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
@@ -195,10 +195,10 @@ class Sc20IntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(96410877173, monitor.totalRequestedBurst) },
- { assertEquals(96046583992, monitor.totalGrantedBurst) },
- { assertEquals(19265632, monitor.totalOvercommissionedBurst) },
- { assertEquals(0, monitor.totalInterferedBurst) }
+ { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 9353ef28..90d751ea 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -34,6 +34,7 @@ import java.io.BufferedReader
import java.io.File
import java.io.FileReader
import java.util.*
+import kotlin.math.min
/**
* A [TraceReader] for the public VM workload trace format.
@@ -70,6 +71,7 @@ public class BitbrainsTraceReader(
var vmId = -1L
var cores = -1
var requiredMemory = -1L
+ var startTime = -1L
BufferedReader(FileReader(vmFile)).use { reader ->
reader.lineSequence()
@@ -91,21 +93,17 @@ public class BitbrainsTraceReader(
}
vmId = vmFile.nameWithoutExtension.trim().toLong()
- val timestamp = values[timestampCol].trim().toLong() - 5 * 60
+ startTime = min(startTime, values[timestampCol].trim().toLong() - 5 * 60)
cores = values[coreCol].trim().toInt()
val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong()
- val flops: Long = (cpuUsage * 5 * 60 * cores).toLong()
-
if (flopsHistory.isEmpty()) {
- flopsHistory.add(SimTraceWorkload.Fragment(timestamp, flops, traceInterval, cpuUsage, cores))
+ flopsHistory.add(SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores))
} else {
- if (flopsHistory.last().flops != flops) {
+ if (flopsHistory.last().usage != cpuUsage) {
flopsHistory.add(
SimTraceWorkload.Fragment(
- timestamp,
- flops,
traceInterval,
cpuUsage,
cores
@@ -115,8 +113,6 @@ public class BitbrainsTraceReader(
val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
flopsHistory.add(
SimTraceWorkload.Fragment(
- oldFragment.time,
- oldFragment.flops + flops,
oldFragment.duration + traceInterval,
cpuUsage,
cores
@@ -151,7 +147,7 @@ public class BitbrainsTraceReader(
)
)
entries[vmId] = TraceEntryImpl(
- flopsHistory.firstOrNull()?.time ?: -1,
+ startTime,
vmWorkload
)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
index b721905d..c76889c8 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -139,7 +139,7 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops, cores)),
+ SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops)),
HashSet(),
mapOf(
WORKFLOW_TASK_CORES to cores,
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
index 66efbcd0..78f581ca 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -125,20 +125,16 @@ public class Sc20TraceReader(
requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
maxCores = max(maxCores, cores)
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ last = if (last != null && last!!.usage == 0.0 && cpuUsage == 0.0) {
val oldFragment = last!!
SimTraceWorkload.Fragment(
- oldFragment.time,
- oldFragment.flops + flops,
oldFragment.duration + traceInterval,
cpuUsage,
cores
)
} else {
val fragment =
- SimTraceWorkload.Fragment(timestamp, flops, traceInterval, cpuUsage, cores)
+ SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores)
if (last != null) {
yield(last!!)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
index 52d41c44..80c54354 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
@@ -113,8 +113,6 @@ public class SwfTraceReader(
for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) {
flopsHistory.add(
SimTraceWorkload.Fragment(
- tick * 1000L,
- 0L,
sliceDuration * 1000L,
0.0,
cores
@@ -138,8 +136,6 @@ public class SwfTraceReader(
) {
flopsHistory.add(
SimTraceWorkload.Fragment(
- tick * 1000L,
- flopsFullSlice / sliceDuration,
sliceDuration * 1000L,
1.0,
cores
@@ -150,8 +146,6 @@ public class SwfTraceReader(
if (runtimePartialSliceRemainder > 0) {
flopsHistory.add(
SimTraceWorkload.Fragment(
- submitTime + (slicedWaitTime + runTime - runtimePartialSliceRemainder),
- flopsPartialSlice,
sliceDuration,
runtimePartialSliceRemainder / sliceDuration.toDouble(),
cores
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index 381a0b41..d7dc09fa 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -81,7 +81,7 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops, cores)),
+ SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops)),
HashSet(),
mapOf(
WORKFLOW_TASK_CORES to cores,
diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
index 8db2ab40..45c125c4 100644
--- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
+++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
@@ -41,7 +41,6 @@ class SwfTraceReaderTest {
assertEquals(164472, entry.submissionTime)
// 1188 slices for waiting, 0 full and 1 partial running slices
assertEquals(1189, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size)
- assertEquals(5_100_000L, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().last().flops)
assertEquals(0.25, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().last().usage)
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index cd7e5706..844a7c6d 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -28,6 +28,7 @@ plugins {
dependencies {
api(project(":opendc-simulator:opendc-simulator-core"))
+ implementation(project(":opendc-utils"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 5e50a676..812b5f20 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -25,13 +25,13 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.intrinsics.startCoroutineCancellable
-import kotlinx.coroutines.selects.SelectClause0
-import kotlinx.coroutines.selects.SelectInstance
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimResourceCommand
import org.opendc.simulator.compute.workload.SimWorkload
-import java.lang.Runnable
+import org.opendc.utils.TimerScheduler
import java.time.Clock
-import kotlin.coroutines.ContinuationInterceptor
+import java.util.*
+import kotlin.coroutines.*
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -59,23 +59,29 @@ public class SimBareMetalMachine(
get() = usageState
/**
+ * A flag to indicate that the machine is terminated.
+ */
+ private var isTerminated = false
+
+ /**
+ * The [MutableStateFlow] containing the load of the server.
+ */
+ private val usageState = MutableStateFlow(0.0)
+
+ /**
* The current active workload.
*/
- private var activeWorkload: SimWorkload? = null
+ private var cont: Continuation<Unit>? = null
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * The active CPUs of this machine.
*/
- override suspend fun run(workload: SimWorkload) {
- require(activeWorkload == null) { "Run should not be called concurrently" }
+ private var cpus: List<Cpu> = emptyList()
- try {
- activeWorkload = workload
- workload.run(ctx)
- } finally {
- activeWorkload = null
- }
- }
+ /**
+ * The [TimerScheduler] to use for scheduling the interrupts.
+ */
+ private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock)
/**
* The execution context in which the workload runs.
@@ -87,199 +93,220 @@ public class SimBareMetalMachine(
override val clock: Clock
get() = this@SimBareMetalMachine.clock
- override fun onRun(
- batch: Sequence<SimExecutionContext.Slice>,
- triggerMode: SimExecutionContext.TriggerMode,
- merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice
- ): SelectClause0 {
- return object : SelectClause0 {
- @InternalCoroutinesApi
- override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
- val context = select.completion.context
-
- // Do not reset the usage state: we will set it ourselves
- usageFlush?.dispose()
- usageFlush = null
-
- val queue = batch.iterator()
- var start = Long.MIN_VALUE
- var currentWork: SliceWork? = null
- var currentDisposable: DisposableHandle? = null
-
- fun schedule(slice: SimExecutionContext.Slice) {
- start = clock.millis()
-
- val isLastSlice = !queue.hasNext()
- val work = SliceWork(slice)
- val candidateDuration = when (triggerMode) {
- SimExecutionContext.TriggerMode.FIRST -> work.minExit
- SimExecutionContext.TriggerMode.LAST -> work.maxExit
- SimExecutionContext.TriggerMode.DEADLINE -> slice.deadline - start
- }
-
- // Check whether the deadline is exceeded during the run of the slice.
- val duration = min(candidateDuration, slice.deadline - start)
-
- val action = Runnable {
- currentWork = null
-
- // Flush all the work that was performed
- val hasFinished = work.stop(duration)
-
- if (!isLastSlice) {
- val candidateSlice = queue.next()
- val nextSlice =
- // If our previous slice exceeds its deadline, merge it with the next candidate slice
- if (hasFinished)
- candidateSlice
- else
- merge(candidateSlice, slice)
- schedule(nextSlice)
- } else if (select.trySelect()) {
- block.startCoroutineCancellable(select.completion)
- }
- }
-
- // Schedule the flush after the entire slice has finished
- currentDisposable = delay.invokeOnTimeout(duration, action, context)
-
- // Start the slice work
- currentWork = work
- work.start()
- }
-
- // Schedule the first work
- if (queue.hasNext()) {
- schedule(queue.next())
-
- // A DisposableHandle to flush the work in case the call is cancelled
- val disposable = DisposableHandle {
- val end = clock.millis()
- val duration = end - start
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
- currentWork?.stop(duration)
- currentDisposable?.dispose()
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload) {
+ require(!isTerminated) { "Machine is terminated" }
+ require(cont == null) { "Run should not be called concurrently" }
- val action = {
- usageState.value = 0.0
- usageFlush = null
- }
+ workload.onStart(ctx)
- // Schedule reset the usage of the machine since the call is returning
- usageFlush = delay.invokeOnTimeout(1, action, context)
- }
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
+ this.cpus = model.cpus.map { Cpu(it, workload) }
- select.disposeOnSelect(disposable)
- } else if (select.trySelect()) {
- // No work has been given: select immediately
- block.startCoroutineCancellable(select.completion)
- }
- }
+ for (cpu in cpus) {
+ cpu.start()
}
}
}
/**
- * The [MutableStateFlow] containing the load of the server.
+ * Terminate the specified bare-metal machine.
*/
- private val usageState = MutableStateFlow(0.0)
+ override fun close() {
+ isTerminated = true
+ }
/**
- * A disposable to prevent resetting the usage state for subsequent calls to onRun.
+ * Update the usage of the machine.
*/
- private var usageFlush: DisposableHandle? = null
+ private fun updateUsage() {
+ usageState.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency }
+ }
/**
- * Cache the [Delay] instance for timing.
- *
- * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy.
- * XXX Note however that this is an ugly hack which may break in the future.
+ * This method is invoked when one of the CPUs has exited.
*/
- @OptIn(InternalCoroutinesApi::class)
- private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay
+ private fun onCpuExit(cpu: Int) {
+ // Check whether all other CPUs have finished
+ if (cpus.all { it.hasExited }) {
+ val cont = cont
+ this.cont = null
+ cont?.resume(Unit)
+ }
+ }
+
+ /**
+ * This method is invoked when one of the CPUs failed.
+ */
+ private fun onCpuFailure(e: Throwable) {
+ // Make sure no other tasks will be resumed.
+ scheduler.cancelAll()
+
+ // In case the flush fails with an exception, immediately propagate to caller, cancelling all other
+ // tasks.
+ val cont = cont
+ this.cont = null
+ cont?.resumeWithException(e)
+ }
/**
- * A slice to be processed.
+ * A physical CPU of the machine.
*/
- private inner class SliceWork(val slice: SimExecutionContext.Slice) {
+ private inner class Cpu(val model: ProcessingUnit, val workload: SimWorkload) {
/**
- * The duration after which the first processor finishes processing this slice.
+ * The current command.
*/
- val minExit: Long
+ private var currentCommand: CommandWrapper? = null
/**
- * The duration after which the last processor finishes processing this slice.
+ * The actual processing speed.
*/
- val maxExit: Long
+ var speed: Double = 0.0
+ set(value) {
+ field = value
+ updateUsage()
+ }
/**
- * A flag to indicate that the slice will exceed the deadline.
+ * A flag to indicate that the CPU is currently processing a command.
*/
- val exceedsDeadline: Boolean
- get() = slice.deadline < maxExit
+ var isIntermediate: Boolean = false
/**
- * The total amount of CPU usage.
+ * A flag to indicate that the CPU has exited.
*/
- val totalUsage: Double
+ var hasExited: Boolean = false
/**
- * A flag to indicate that this slice is empty.
+ * Process the specified [SimResourceCommand] for this CPU.
*/
- val isEmpty: Boolean
-
- init {
- var totalUsage = 0.0
- var minExit = Long.MAX_VALUE
- var maxExit = 0L
- var nonEmpty = false
-
- // Determine the duration of the first/last CPU to finish
- for (i in 0 until min(model.cpus.size, slice.burst.size)) {
- val cpu = model.cpus[i]
- val usage = min(slice.limit[i], cpu.frequency)
- val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
-
- totalUsage += usage / cpu.frequency
-
- if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
- minExit = min(minExit, cpuDuration)
- maxExit = max(maxExit, cpuDuration)
- nonEmpty = true
+ fun process(command: SimResourceCommand) {
+ val timestamp = clock.millis()
+
+ val task = when (command) {
+ is SimResourceCommand.Idle -> {
+ speed = 0.0
+
+ val deadline = command.deadline
+
+ require(deadline >= timestamp) { "Deadline already passed" }
+
+ if (deadline != Long.MAX_VALUE) {
+ scheduler.startSingleTimerTo(this, deadline) { flush() }
+ } else {
+ null
+ }
+ }
+ is SimResourceCommand.Consume -> {
+ val work = command.work
+ val limit = command.limit
+ val deadline = command.deadline
+
+ require(deadline >= timestamp) { "Deadline already passed" }
+
+ speed = min(model.frequency, limit)
+
+ // The required duration to process all the work
+ val finishedAt = timestamp + ceil(work / speed * 1000).toLong()
+
+ scheduler.startSingleTimerTo(this, min(finishedAt, deadline)) { flush() }
+ }
+ is SimResourceCommand.Exit -> {
+ speed = 0.0
+ hasExited = true
+
+ onCpuExit(model.id)
+
+ null
}
}
- this.isEmpty = !nonEmpty
- this.totalUsage = totalUsage
- this.minExit = if (isEmpty) 0 else minExit
- this.maxExit = maxExit
+ assert(currentCommand == null) { "Concurrent access to current command" }
+ currentCommand = CommandWrapper(timestamp, command)
}
/**
- * Indicate that the work on the slice has started.
+ * Request the workload for more work.
*/
- fun start() {
- usageState.value = totalUsage / model.cpus.size
+ private fun next(remainingWork: Double) {
+ process(workload.onNext(ctx, model.id, remainingWork))
}
/**
- * Flush the work performed on the slice.
+ * Start the CPU.
*/
- fun stop(duration: Long): Boolean {
- var hasFinished = true
+ fun start() {
+ try {
+ isIntermediate = true
+
+ process(workload.onStart(ctx, model.id))
+ } catch (e: Throwable) {
+ onCpuFailure(e)
+ } finally {
+ isIntermediate = false
+ }
+ }
- for (i in 0 until min(model.cpus.size, slice.burst.size)) {
- val usage = min(slice.limit[i], model.cpus[i].frequency)
- val granted = ceil(duration / 1000.0 * usage).toLong()
- val res = max(0, slice.burst[i] - granted)
- slice.burst[i] = res
+ /**
+ * Flush the work performed by the CPU.
+ */
+ fun flush() {
+ try {
+ val (timestamp, command) = currentCommand ?: return
+
+ isIntermediate = true
+ currentCommand = null
+
+ // Cancel the running task and flush the progress
+ scheduler.cancel(this)
+
+ when (command) {
+ is SimResourceCommand.Idle -> next(remainingWork = 0.0)
+ is SimResourceCommand.Consume -> {
+ val duration = clock.millis() - timestamp
+ val remainingWork = if (duration > 0L) {
+ val processed = duration / 1000.0 * speed
+ max(0.0, command.work - processed)
+ } else {
+ 0.0
+ }
- if (res != 0L) {
- hasFinished = false
+ next(remainingWork)
+ }
+ SimResourceCommand.Exit -> throw IllegalStateException()
}
+ } catch (e: Throwable) {
+ onCpuFailure(e)
+ } finally {
+ isIntermediate = false
+ }
+ }
+
+ /**
+ * Interrupt the CPU.
+ */
+ fun interrupt() {
+ // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
+ // to infinite recursion.
+ if (isIntermediate) {
+ return
}
- return hasFinished
+ flush()
}
}
+
+ /**
+ * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
+ */
+ private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
index 5801fcd5..c7c3d3cc 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.selects.SelectClause0
-import kotlinx.coroutines.selects.select
import java.time.Clock
/**
@@ -43,113 +41,10 @@ public interface SimExecutionContext {
public val machine: SimMachineModel
/**
- * Ask the processor cores to run the specified [slice] and suspend execution until the trigger condition is met as
- * specified by [triggerMode].
+ * Ask the host machine to interrupt the specified vCPU.
*
- * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
- * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
- * the timing of this change.
- *
- * @param slice The representation of work to run on the processors.
- * @param triggerMode The trigger condition to resume execution.
- */
- public suspend fun run(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): Unit =
- select { onRun(slice, triggerMode).invoke {} }
-
- /**
- * Ask the processors cores to run the specified [batch] of work slices and suspend execution until the trigger
- * condition is met as specified by [triggerMode].
- *
- * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
- * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
- * the timing of this change.
- *
- * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these
- * slices with the next slice to be executed.
- *
- * @param batch The batch of work to run on the processors.
- * @param triggerMode The trigger condition to resume execution.
- * @param merge The merge function for consecutive slices in case the last slice was not completed within its
- * deadline.
+ * @param cpu The id of the vCPU to interrupt.
+ * @throws IllegalArgumentException if the identifier points to a non-existing vCPU.
*/
- public suspend fun run(
- batch: Sequence<Slice>,
- triggerMode: TriggerMode = TriggerMode.FIRST,
- merge: (Slice, Slice) -> Slice = { _, r -> r }
- ): Unit = select { onRun(batch, triggerMode, merge).invoke {} }
-
- /**
- * Ask the processor cores to run the specified [slice] and select when the trigger condition is met as specified
- * by [triggerMode].
- *
- * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
- * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
- * the timing of this change.
- *
- * @param slice The representation of work to request from the processors.
- * @param triggerMode The trigger condition to resume execution.
- */
- public fun onRun(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): SelectClause0 =
- onRun(sequenceOf(slice), triggerMode)
-
- /**
- * Ask the processors cores to run the specified [batch] of work slices and select when the trigger condition is met
- * as specified by [triggerMode].
- *
- * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
- * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
- * the timing of this change.
- *
- * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these
- * slices with the next slice to be executed.
- *
- * @param batch The batch of work to run on the processors.
- * @param triggerMode The trigger condition to resume execution during the **last** slice.
- * @param merge The merge function for consecutive slices in case the last slice was not completed within its
- * deadline.
- */
- public fun onRun(
- batch: Sequence<Slice>,
- triggerMode: TriggerMode = TriggerMode.FIRST,
- merge: (Slice, Slice) -> Slice = { _, r -> r }
- ): SelectClause0
-
- /**
- * A request to the host machine for a slice of CPU time from the processor cores.
- *
- * Both [burst] and [limit] must be of the same size and in any other case the method will throw an
- * [IllegalArgumentException].
- *
- *
- * @param burst The burst time to request from each of the processor cores.
- * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst.
- * @param deadline The instant at which this slice needs to be fulfilled.
- */
- public class Slice(public val burst: LongArray, public val limit: DoubleArray, public val deadline: Long) {
- init {
- require(burst.size == limit.size) { "Incompatible array dimensions" }
- }
- }
-
- /**
- * The modes for triggering a machine exit from the machine.
- */
- public enum class TriggerMode {
- /**
- * A machine exit occurs when either the first processor finishes processing a **non-zero** burst or the
- * deadline is reached.
- */
- FIRST,
-
- /**
- * A machine exit occurs when either the last processor finishes processing a **non-zero** burst or the deadline
- * is reached.
- */
- LAST,
-
- /**
- * A machine exit occurs only when the deadline is reached.
- */
- DEADLINE
- }
+ public fun interrupt(cpu: Int)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
new file mode 100644
index 00000000..5e86d32b
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -0,0 +1,590 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.compute.workload.SimWorkloadBarrier
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
+ * [SimBareMetalMachine] concurrently using weighted fair sharing.
+ *
+ * @param listener The hypervisor listener to use.
+ */
+public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor {
+
+ override fun onStart(ctx: SimExecutionContext) {
+ val model = ctx.machine
+ this.ctx = ctx
+ this.commands = Array(model.cpus.size) { SimResourceCommand.Idle() }
+ this.pCpus = model.cpus.indices.sortedBy { model.cpus[it].frequency }.toIntArray()
+ this.maxUsage = model.cpus.sumByDouble { it.frequency }
+ this.barrier = SimWorkloadBarrier(model.cpus.size)
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return commands[cpu]
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ totalRemainingWork += remainingWork
+ val isLast = barrier.enter()
+
+ // Flush the progress of the guest after the barrier has been reached.
+ if (isLast && isDirty) {
+ isDirty = false
+ flushGuests()
+ }
+
+ return if (isDirty) {
+ // Wait for the scheduler determine the work after the barrier has been reached by all CPUs.
+ SimResourceCommand.Idle()
+ } else {
+ // Indicate that the scheduler needs to run next call.
+ if (isLast) {
+ isDirty = true
+ }
+
+ commands[cpu]
+ }
+ }
+
+ override fun canFit(model: SimMachineModel): Boolean = true
+
+ override fun createMachine(
+ model: SimMachineModel,
+ performanceInterferenceModel: PerformanceInterferenceModel?
+ ): SimMachine = SimVm(model, performanceInterferenceModel)
+
+ /**
+ * The execution context in which the hypervisor runs.
+ */
+ private lateinit var ctx: SimExecutionContext
+
+ /**
+ * The commands to submit to the underlying host.
+ */
+ private lateinit var commands: Array<SimResourceCommand>
+
+ /**
+ * The active vCPUs.
+ */
+ private val vcpus: MutableList<VCpu> = mutableListOf()
+
+ /**
+ * The indices of the physical CPU ordered by their speed.
+ */
+ private lateinit var pCpus: IntArray
+
+ /**
+ * The maximum amount of work to be performed per second.
+ */
+ private var maxUsage: Double = 0.0
+
+ /**
+ * The current load on the hypervisor.
+ */
+ private var load: Double = 0.0
+
+ /**
+ * The total amount of remaining work (of all pCPUs).
+ */
+ private var totalRemainingWork: Double = 0.0
+
+ /**
+ * The total speed requested by the vCPUs.
+ */
+ private var totalRequestedSpeed = 0.0
+
+ /**
+ * The total amount of work requested by the vCPUs.
+ */
+ private var totalRequestedWork = 0.0
+
+ /**
+ * The total allocated speed for the vCPUs.
+ */
+ private var totalAllocatedSpeed = 0.0
+
+ /**
+ * The total allocated work requested for the vCPUs.
+ */
+ private var totalAllocatedWork = 0.0
+
+ /**
+ * The amount of work that could not be performed due to over-committing resources.
+ */
+ private var totalOvercommittedWork = 0.0
+
+ /**
+ * The amount of work that was lost due to interference.
+ */
+ private var totalInterferedWork = 0.0
+
+ /**
+ * A flag to indicate that the scheduler has submitted work that has not yet been completed.
+ */
+ private var isDirty: Boolean = false
+
+ /**
+ * The scheduler barrier.
+ */
+ private lateinit var barrier: SimWorkloadBarrier
+
+ /**
+ * Indicate that the workloads should be re-scheduled.
+ */
+ private fun shouldSchedule() {
+ isDirty = true
+ ctx.interruptAll()
+ }
+
+ /**
+ * Schedule the work over the physical CPUs.
+ */
+ private fun doSchedule() {
+ // If there is no work yet, mark all pCPUs as idle.
+ if (vcpus.isEmpty()) {
+ commands.fill(SimResourceCommand.Idle())
+ ctx.interruptAll()
+ }
+
+ var duration: Double = Double.MAX_VALUE
+ var deadline: Long = Long.MAX_VALUE
+ var availableSpeed = maxUsage
+ var totalRequestedSpeed = 0.0
+ var totalRequestedWork = 0.0
+
+ // Sort the vCPUs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ vcpus.sort()
+
+ // Divide the available host capacity fairly across the vCPUs using max-min fair sharing
+ val vcpuIterator = vcpus.listIterator()
+ var remaining = vcpus.size
+ while (vcpuIterator.hasNext()) {
+ val vcpu = vcpuIterator.next()
+ val availableShare = availableSpeed / remaining--
+
+ when (val command = vcpu.command) {
+ is SimResourceCommand.Idle -> {
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ vcpu.actualSpeed = 0.0
+ }
+ is SimResourceCommand.Consume -> {
+ val grantedSpeed = min(vcpu.allowedSpeed, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ // Ignore idle computation
+ if (grantedSpeed <= 0.0 || command.work <= 0.0) {
+ vcpu.actualSpeed = 0.0
+ continue
+ }
+
+ totalRequestedSpeed += command.limit
+ totalRequestedWork += command.work
+
+ vcpu.actualSpeed = grantedSpeed
+ availableSpeed -= grantedSpeed
+
+ // The duration that we want to run is that of the shortest request from a vCPU
+ duration = min(duration, command.work / grantedSpeed)
+ }
+ SimResourceCommand.Exit -> {
+ // Apparently the vCPU has exited, so remove it from the scheduling queue.
+ vcpuIterator.remove()
+ }
+ }
+ }
+
+ // Round the duration to milliseconds
+ duration = ceil(duration * 1000) / 1000
+
+ assert(deadline >= ctx.clock.millis()) { "Deadline already passed" }
+
+ val totalAllocatedSpeed = maxUsage - availableSpeed
+ var totalAllocatedWork = 0.0
+ availableSpeed = totalAllocatedSpeed
+ load = totalAllocatedSpeed / maxUsage
+
+ // Divide the requests over the available capacity of the pCPUs fairly
+ for (i in pCpus) {
+ val maxCpuUsage = ctx.machine.cpus[i].frequency
+ val fraction = maxCpuUsage / maxUsage
+ val grantedSpeed = min(maxCpuUsage, totalAllocatedSpeed * fraction)
+ val grantedWork = duration * grantedSpeed
+
+ commands[i] =
+ if (grantedWork > 0.0 && grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+
+ totalAllocatedWork += grantedWork
+ availableSpeed -= grantedSpeed
+ }
+
+ this.totalRequestedSpeed = totalRequestedSpeed
+ this.totalRequestedWork = totalRequestedWork
+ this.totalAllocatedSpeed = totalAllocatedSpeed
+ this.totalAllocatedWork = totalAllocatedWork
+
+ ctx.interruptAll()
+ }
+
+ /**
+ * Flush the progress of the vCPUs.
+ */
+ private fun flushGuests() {
+ // Flush all the vCPUs work
+ for (vcpu in vcpus) {
+ vcpu.flush(interrupt = false)
+ }
+
+ // Report metrics
+ listener?.onSliceFinish(
+ this,
+ totalRequestedWork.toLong(),
+ (totalAllocatedWork - totalRemainingWork).toLong(),
+ totalOvercommittedWork.toLong(),
+ totalInterferedWork.toLong(),
+ totalRequestedSpeed,
+ totalAllocatedSpeed
+ )
+ totalRemainingWork = 0.0
+ totalInterferedWork = 0.0
+ totalOvercommittedWork = 0.0
+
+ // Force all pCPUs to re-schedule their work.
+ doSchedule()
+ }
+
+ /**
+ * Interrupt all host CPUs.
+ */
+ private fun SimExecutionContext.interruptAll() {
+ for (i in machine.cpus.indices) {
+ interrupt(i)
+ }
+ }
+
+ /**
+ * A virtual machine running on the hypervisor.
+ *
+ * @property model The machine model of the virtual machine.
+ * @property performanceInterferenceModel The performance interference model to utilize.
+ */
+ private inner class SimVm(
+ override val model: SimMachineModel,
+ val performanceInterferenceModel: PerformanceInterferenceModel? = null,
+ ) : SimMachine {
+ /**
+ * A [StateFlow] representing the CPU usage of the simulated machine.
+ */
+ override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
+
+ /**
+ * A flag to indicate that the machine is terminated.
+ */
+ private var isTerminated = false
+
+ /**
+ * The current active workload.
+ */
+ private var cont: Continuation<Unit>? = null
+
+ /**
+ * The active CPUs of this virtual machine.
+ */
+ private var cpus: List<VCpu> = emptyList()
+
+ /**
+ * The execution context in which the workload runs.
+ */
+ val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = model
+
+ override val clock: Clock
+ get() = this@SimFairShareHypervisor.ctx.clock
+
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
+
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload) {
+ require(!isTerminated) { "Machine is terminated" }
+ require(cont == null) { "Run should not be called concurrently" }
+
+ workload.onStart(ctx)
+
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
+ this.cpus = model.cpus.map { VCpu(this, it, workload) }
+
+ for (cpu in cpus) {
+ // Register vCPU to scheduler
+ vcpus.add(cpu)
+
+ cpu.start()
+ }
+
+ // Re-schedule the work over the pCPUs
+ shouldSchedule()
+ }
+ }
+
+ /**
+ * Terminate this VM instance.
+ */
+ override fun close() {
+ isTerminated = true
+ }
+
+ /**
+ * Update the usage of the VM.
+ */
+ fun updateUsage() {
+ usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.model.frequency }
+ }
+
+ /**
+ * This method is invoked when one of the CPUs has exited.
+ */
+ fun onCpuExit(cpu: Int) {
+ // Check whether all other CPUs have finished
+ if (cpus.all { it.hasExited }) {
+ val cont = cont
+ this.cont = null
+ cont?.resume(Unit)
+ }
+ }
+
+ /**
+ * This method is invoked when one of the CPUs failed.
+ */
+ fun onCpuFailure(e: Throwable) {
+ // In case the flush fails with an exception, immediately propagate to caller, cancelling all other
+ // tasks.
+ val cont = cont
+ this.cont = null
+ cont?.resumeWithException(e)
+ }
+ }
+
+ /**
+ * A CPU of the virtual machine.
+ */
+ private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload) : Comparable<VCpu> {
+ /**
+ * The latest command processed by the CPU.
+ */
+ var command: SimResourceCommand = SimResourceCommand.Idle()
+
+ /**
+ * The latest timestamp at which the vCPU was flushed.
+ */
+ var latestFlush: Long = 0
+
+ /**
+ * The processing speed that is allowed by the model constraints.
+ */
+ var allowedSpeed: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ var actualSpeed: Double = 0.0
+ set(value) {
+ field = value
+ vm.updateUsage()
+ }
+
+ /**
+ * A flag to indicate that the CPU is currently processing a command.
+ */
+ var isIntermediate: Boolean = false
+
+ /**
+ * A flag to indicate that the CPU has exited.
+ */
+ val hasExited: Boolean
+ get() = command is SimResourceCommand.Exit
+
+ /**
+ * Process the specified [SimResourceCommand] for this CPU.
+ */
+ fun process(command: SimResourceCommand) {
+ // Assign command as the most recent executed command
+ this.command = command
+
+ when (command) {
+ is SimResourceCommand.Idle -> {
+ require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" }
+
+ allowedSpeed = 0.0
+ }
+ is SimResourceCommand.Consume -> {
+ require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" }
+
+ allowedSpeed = min(model.frequency, command.limit)
+ }
+ is SimResourceCommand.Exit -> {
+ allowedSpeed = 0.0
+ actualSpeed = 0.0
+
+ vm.onCpuExit(model.id)
+ }
+ }
+ }
+
+ /**
+ * Start the CPU.
+ */
+ fun start() {
+ try {
+ isIntermediate = true
+ latestFlush = ctx.clock.millis()
+
+ process(workload.onStart(vm.ctx, model.id))
+ } catch (e: Throwable) {
+ fail(e)
+ } finally {
+ isIntermediate = false
+ }
+ }
+
+ /**
+ * Flush the work performed by the CPU.
+ */
+ fun flush(interrupt: Boolean) {
+ val now = ctx.clock.millis()
+
+ // Fast path: if the CPU was already flushed at at the current instant, no need to flush the progress.
+ if (latestFlush >= now) {
+ return
+ }
+
+ try {
+ isIntermediate = true
+ when (val command = command) {
+ is SimResourceCommand.Idle -> {
+ // Act like nothing has happened in case the vCPU did not reach its deadline or was not
+ // interrupted by the user.
+ if (interrupt || command.deadline <= now) {
+ process(workload.onNext(vm.ctx, model.id, 0.0))
+ }
+ }
+ is SimResourceCommand.Consume -> {
+ // Apply performance interference model
+ val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0
+
+ // Compute the remaining amount of work
+ val remainingWork = if (command.work > 0.0) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualSpeed / totalAllocatedSpeed
+
+ // Compute the work that was actually granted to the VM.
+ val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
+ val processed = processingAvailable * performanceScore
+
+ val interferedWork = processingAvailable - processed
+ totalInterferedWork += interferedWork
+
+ max(0.0, command.work - processed)
+ } else {
+ 0.0
+ }
+
+ // Act like nothing has happened in case the vCPU did not finish yet or was not interrupted by
+ // the user.
+ if (interrupt || remainingWork == 0.0 || command.deadline <= now) {
+ if (!interrupt) {
+ totalOvercommittedWork += remainingWork
+ }
+
+ process(workload.onNext(vm.ctx, model.id, remainingWork))
+ } else {
+ process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
+ }
+ }
+ SimResourceCommand.Exit ->
+ throw IllegalStateException()
+ }
+ } catch (e: Throwable) {
+ fail(e)
+ } finally {
+ latestFlush = now
+ isIntermediate = false
+ }
+ }
+
+ /**
+ * Interrupt the CPU.
+ */
+ fun interrupt() {
+ // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
+ // to infinite recursion.
+ if (isIntermediate) {
+ return
+ }
+
+ flush(interrupt = true)
+
+ // Force the scheduler to re-schedule
+ shouldSchedule()
+ }
+
+ /**
+ * Fail the CPU.
+ */
+ fun fail(e: Throwable) {
+ command = SimResourceCommand.Exit
+ vm.onCpuFailure(e)
+ }
+
+ override fun compareTo(other: VCpu): Int = allowedSpeed.compareTo(other.allowedSpeed)
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriverWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
index 58b9408a..02eb6ad0 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriverWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,13 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator
+package org.opendc.simulator.compute
-import kotlinx.coroutines.coroutineScope
-import org.opendc.simulator.compute.SimExecutionContext
-import org.opendc.simulator.compute.workload.SimWorkload
-
-public class SimVirtDriverWorkload : SimWorkload {
- public lateinit var driver: SimVirtDriver
+/**
+ * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
+ */
+public class SimFairShareHypervisorProvider : SimHypervisorProvider {
+ override val id: String = "fair-share"
- override suspend fun run(ctx: SimExecutionContext) {
- coroutineScope {
- driver = SimVirtDriver(this, ctx.clock, ctx)
- driver.run()
- }
- }
+ override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimFairShareHypervisor(listener)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt
deleted file mode 100644
index b88871a5..00000000
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt
+++ /dev/null
@@ -1,517 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.compute
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.intrinsics.startCoroutineCancellable
-import kotlinx.coroutines.selects.SelectClause0
-import kotlinx.coroutines.selects.SelectInstance
-import kotlinx.coroutines.selects.select
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.time.Clock
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
- * [SimBareMetalMachine] concurrently using weighted fair sharing.
- *
- * @param coroutineScope The [CoroutineScope] to run the simulated workloads in.
- * @param clock The virtual clock to track the simulation time.
- */
-@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
-public class SimFairSharedHypervisor(
- private val coroutineScope: CoroutineScope,
- private val clock: Clock,
- private val listener: SimHypervisor.Listener? = null
-) : SimHypervisor {
- /**
- * A flag to indicate the driver is stopped.
- */
- private var stopped: Boolean = false
-
- /**
- * The channel for scheduling new CPU requests.
- */
- private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED)
-
- /**
- * Create a [SimMachine] instance on which users may run a [SimWorkload].
- *
- * @param model The machine to create.
- */
- override fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel?): SimMachine {
- val vm = VmSession(model, performanceInterferenceModel)
- val vmCtx = VmExecutionContext(vm)
-
- return object : SimMachine {
- override val model: SimMachineModel
- get() = vmCtx.machine
-
- override val usage: StateFlow<Double>
- get() = vm.usage
-
- /**
- * The current active workload.
- */
- private var activeWorkload: SimWorkload? = null
-
- override suspend fun run(workload: SimWorkload) {
- require(activeWorkload == null) { "Run should not be called concurrently" }
-
- try {
- activeWorkload = workload
- workload.run(vmCtx)
- } finally {
- activeWorkload = null
- }
- }
-
- override fun toString(): String = "SimVirtualMachine"
- }
- }
-
- /**
- * Run the scheduling process of the hypervisor.
- */
- override suspend fun run(ctx: SimExecutionContext) {
- val model = ctx.machine
- val maxUsage = model.cpus.sumByDouble { it.frequency }
- val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency }
-
- val vms = mutableSetOf<VmSession>()
- val vcpus = mutableListOf<VCpu>()
-
- val usage = DoubleArray(model.cpus.size)
- val burst = LongArray(model.cpus.size)
-
- fun process(command: SchedulerCommand) {
- when (command) {
- is SchedulerCommand.Schedule -> {
- vms += command.vm
- vcpus.addAll(command.vm.vcpus)
- }
- is SchedulerCommand.Deschedule -> {
- vms -= command.vm
- vcpus.removeAll(command.vm.vcpus)
- }
- is SchedulerCommand.Interrupt -> {
- }
- }
- }
-
- fun processRemaining() {
- var command = schedulingQueue.poll()
- while (command != null) {
- process(command)
- command = schedulingQueue.poll()
- }
- }
-
- while (!stopped) {
- // Wait for a request to be submitted if we have no work yet.
- if (vcpus.isEmpty()) {
- process(schedulingQueue.receive())
- }
-
- processRemaining()
-
- val start = clock.millis()
-
- var duration: Double = Double.POSITIVE_INFINITY
- var deadline: Long = Long.MAX_VALUE
- var availableUsage = maxUsage
- var totalRequestedUsage = 0.0
- var totalRequestedBurst = 0L
-
- // Sort the vCPUs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
- vcpus.sort()
-
- // Divide the available host capacity fairly across the vCPUs using max-min fair sharing
- for ((i, req) in vcpus.withIndex()) {
- val remaining = vcpus.size - i
- val availableShare = availableUsage / remaining
- val grantedUsage = min(req.limit, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
- deadline = min(deadline, req.vm.deadline)
-
- // Ignore empty CPUs
- if (grantedUsage <= 0 || req.burst <= 0) {
- req.allocatedLimit = 0.0
- continue
- }
-
- totalRequestedUsage += req.limit
- totalRequestedBurst += req.burst
-
- req.allocatedLimit = grantedUsage
- availableUsage -= grantedUsage
-
- // The duration that we want to run is that of the shortest request from a vCPU
- duration = min(duration, req.burst / grantedUsage)
- }
-
- val totalAllocatedUsage = maxUsage - availableUsage
- var totalAllocatedBurst = 0L
- availableUsage = totalAllocatedUsage
- val serverLoad = totalAllocatedUsage / maxUsage
-
- // / XXX Ceil duration to eliminate rounding issues
- duration = ceil(duration)
-
- // Divide the requests over the available capacity of the pCPUs fairly
- for (i in pCPUs) {
- val maxCpuUsage = model.cpus[i].frequency
- val fraction = maxCpuUsage / maxUsage
- val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction)
- val grantedBurst = ceil(duration * grantedUsage).toLong()
-
- usage[i] = grantedUsage
- burst[i] = grantedBurst
- totalAllocatedBurst += grantedBurst
- availableUsage -= grantedUsage
- }
-
- // XXX If none of the VMs require any computation, wait until their deadline, otherwise trigger on the
- // first vCPU finished.
- val triggerMode =
- if (totalAllocatedBurst > 0 && totalAllocatedUsage > 0.0)
- SimExecutionContext.TriggerMode.FIRST
- else
- SimExecutionContext.TriggerMode.DEADLINE
-
- // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
- // time, so not all of the burst may be executed.
- val isInterrupted = select<Boolean> {
- schedulingQueue.onReceive { schedulingQueue.offer(it); true }
- ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), triggerMode)
- .invoke { false }
- }
-
- val end = clock.millis()
-
- // The total requested burst that the VMs wanted to run in the time-frame that we ran.
- val totalRequestedSubBurst =
- vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
- val totalRemainder = burst.sum()
- val totalGrantedBurst = totalAllocatedBurst - totalRemainder
-
- // The burst that was lost due to overcommissioning of CPU resources
- var totalOvercommissionedBurst = 0L
- // The burst that was lost due to interference.
- var totalInterferedBurst = 0L
-
- val vmIterator = vms.iterator()
- while (vmIterator.hasNext()) {
- val vm = vmIterator.next()
-
- // Apply performance interference model
- val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0
- var hasFinished = false
-
- for (vcpu in vm.vcpus) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = vcpu.allocatedLimit / totalAllocatedUsage
-
- // Compute the burst time that the VM was actually granted
- val grantedBurst = ceil(totalGrantedBurst * fraction).toLong()
-
- // The burst that was actually used by the VM
- val usedBurst = ceil(grantedBurst * performanceScore).toLong()
-
- totalInterferedBurst += grantedBurst - usedBurst
-
- // Compute remaining burst time to be executed for the request
- if (vcpu.consume(usedBurst)) {
- hasFinished = true
- } else if (vm.deadline <= end) {
- // Request must have its entire burst consumed or otherwise we have overcommission
- // Note that we count the overcommissioned burst if the hypervisor has failed.
- totalOvercommissionedBurst += vcpu.burst
- }
- }
-
- if (hasFinished || vm.deadline <= end) {
- // Mark the VM as finished and deschedule the VMs if needed
- if (vm.finish()) {
- vmIterator.remove()
- vcpus.removeAll(vm.vcpus)
- }
- }
- }
-
- listener?.onSliceFinish(
- this,
- totalRequestedBurst,
- min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
- totalOvercommissionedBurst,
- totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
- min(
- totalAllocatedUsage,
- totalRequestedUsage
- ), // The allocated usage might be slightly higher due to FP rounding
- totalRequestedUsage
- )
- }
- }
-
- /**
- * A scheduling command processed by the scheduler.
- */
- private sealed class SchedulerCommand {
- /**
- * Schedule the specified VM on the hypervisor.
- */
- data class Schedule(val vm: VmSession) : SchedulerCommand()
-
- /**
- * De-schedule the specified VM on the hypervisor.
- */
- data class Deschedule(val vm: VmSession) : SchedulerCommand()
-
- /**
- * Interrupt the scheduler.
- */
- object Interrupt : SchedulerCommand()
- }
-
- /**
- * A virtual machine running on the hypervisor.
- *
- * @param ctx The execution context the vCPU runs in.
- * @param triggerMode The mode when to trigger the VM exit.
- * @param merge The function to merge consecutive slices on spillover.
- * @param select The function to select on finish.
- */
- @OptIn(InternalCoroutinesApi::class)
- private data class VmSession(
- val model: SimMachineModel,
- val performanceInterferenceModel: PerformanceInterferenceModel? = null,
- var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST,
- var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r },
- var select: () -> Unit = {}
- ) {
- /**
- * The vCPUs of this virtual machine.
- */
- val vcpus: List<VCpu>
-
- /**
- * The slices that the VM wants to run.
- */
- var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator()
-
- /**
- * The current active slice.
- */
- var activeSlice: SimExecutionContext.Slice? = null
-
- /**
- * The current deadline of the VM.
- */
- val deadline: Long
- get() = activeSlice?.deadline ?: Long.MAX_VALUE
-
- /**
- * A flag to indicate that the VM is idle.
- */
- val isIdle: Boolean
- get() = activeSlice == null
-
- /**
- * The usage of the virtual machine.
- */
- val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
-
- init {
- vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) }
- }
-
- /**
- * Schedule the given slices on this vCPU, replacing the existing slices.
- */
- fun schedule(slices: Sequence<SimExecutionContext.Slice>) {
- queue = slices.iterator()
-
- if (queue.hasNext()) {
- activeSlice = queue.next()
- refresh()
- }
- }
-
- /**
- * Cancel the existing workload on the VM.
- */
- fun cancel() {
- queue = emptyList<SimExecutionContext.Slice>().iterator()
- activeSlice = null
- refresh()
- }
-
- /**
- * Finish the current slice of the VM.
- *
- * @return `true` if the vCPUs may be descheduled, `false` otherwise.
- */
- fun finish(): Boolean {
- val activeSlice = activeSlice ?: return true
-
- return if (queue.hasNext()) {
- val needsMerge = activeSlice.burst.any { it > 0 }
- val candidateSlice = queue.next()
- val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice
-
- this.activeSlice = slice
-
- // Update the vCPU cache
- refresh()
-
- false
- } else {
- this.activeSlice = null
- select()
- true
- }
- }
-
- /**
- * Refresh the vCPU cache.
- */
- fun refresh() {
- vcpus.forEach { it.refresh() }
- usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size
- }
- }
-
- /**
- * A virtual CPU that can be scheduled on a physical CPU.
- *
- * @param vm The VM of which this vCPU is part.
- * @param model The model of CPU that this vCPU models.
- * @param id The id of the vCPU with respect to the VM.
- */
- private data class VCpu(
- val vm: VmSession,
- val model: ProcessingUnit,
- val id: Int
- ) : Comparable<VCpu> {
- /**
- * The current limit on the vCPU.
- */
- var limit: Double = 0.0
-
- /**
- * The limit allocated by the hypervisor.
- */
- var allocatedLimit: Double = 0.0
-
- /**
- * The current burst running on the vCPU.
- */
- var burst: Long = 0L
-
- /**
- * Consume the specified burst on this vCPU.
- */
- fun consume(burst: Long): Boolean {
- this.burst = max(0, this.burst - burst)
-
- // Flush the result to the slice if it exists
- vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst)
-
- val actuallyExists = vm.activeSlice?.burst?.let { id < it.size } ?: false
- return actuallyExists && this.burst == 0L
- }
-
- /**
- * Refresh the information of this vCPU based on the current slice.
- */
- fun refresh() {
- limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0
- burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0
- }
-
- /**
- * Compare to another vCPU based on the current load of the vCPU.
- */
- override fun compareTo(other: VCpu): Int {
- return limit.compareTo(other.limit)
- }
-
- /**
- * Create a string representation of the vCPU.
- */
- override fun toString(): String =
- "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)"
- }
-
- /**
- * The execution context in which a VM runs.
- *
- */
- private inner class VmExecutionContext(val session: VmSession) :
- SimExecutionContext, DisposableHandle {
- override val machine: SimMachineModel
- get() = session.model
-
- override val clock: Clock
- get() = this@SimFairSharedHypervisor.clock
-
- @OptIn(InternalCoroutinesApi::class)
- override fun onRun(
- batch: Sequence<SimExecutionContext.Slice>,
- triggerMode: SimExecutionContext.TriggerMode,
- merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice
- ): SelectClause0 = object : SelectClause0 {
- @InternalCoroutinesApi
- override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
- session.triggerMode = triggerMode
- session.merge = merge
- session.select = {
- if (select.trySelect()) {
- block.startCoroutineCancellable(select.completion)
- }
- }
- session.schedule(batch)
- // Indicate to the hypervisor that the VM should be re-scheduled
- schedulingQueue.offer(SchedulerCommand.Schedule(session))
- select.disposeOnSelect(this@VmExecutionContext)
- }
- }
-
- override fun dispose() {
- if (!session.isIdle) {
- session.cancel()
- schedulingQueue.offer(SchedulerCommand.Deschedule(session))
- }
- }
- }
-}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
index fb4cd137..d8f00bef 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
@@ -26,14 +26,20 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
/**
- * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] i
- * concurrently.
+ * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload
+ * to a [SimBareMetalMachine].
*/
public interface SimHypervisor : SimWorkload {
/**
+ * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment.
+ */
+ public fun canFit(model: SimMachineModel): Boolean
+
+ /**
* Create a [SimMachine] instance on which users may run a [SimWorkload].
*
* @param model The machine to create.
+ * @param performanceInterferenceModel The performance interference model to use.
*/
public fun createMachine(
model: SimMachineModel,
@@ -49,10 +55,10 @@ public interface SimHypervisor : SimWorkload {
*/
public fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
cpuUsage: Double,
cpuDemand: Double
)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
new file mode 100644
index 00000000..a5b4526b
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+/**
+ * A service provider interface for constructing a [SimHypervisor].
+ */
+public interface SimHypervisorProvider {
+ /**
+ * A unique identifier for this hypervisor implementation.
+ *
+ * Each hypervisor must provide a unique ID, so that they can be selected by the user.
+ * When in doubt, you may use the fully qualified name of your custom [SimHypervisor] implementation class.
+ */
+ public val id: String
+
+ /**
+ * Create a [SimHypervisor] instance with the specified [listener].
+ */
+ public fun create(listener: SimHypervisor.Listener? = null): SimHypervisor
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
index f66085af..ea8eeb37 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
@@ -22,15 +22,13 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.StateFlow
import org.opendc.simulator.compute.workload.SimWorkload
/**
* A generic machine that is able to run a [SimWorkload].
*/
-@OptIn(ExperimentalCoroutinesApi::class)
-public interface SimMachine {
+public interface SimMachine : AutoCloseable {
/**
* The model of the machine containing its specifications.
*/
@@ -45,4 +43,9 @@ public interface SimMachine {
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
public suspend fun run(workload: SimWorkload)
+
+ /**
+ * Terminate this machine.
+ */
+ public override fun close()
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
new file mode 100644
index 00000000..66d3eda7
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -0,0 +1,284 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import java.util.ArrayDeque
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+import kotlin.math.min
+
+/**
+ * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
+ *
+ * @param listener The hypervisor listener to use.
+ */
+public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor {
+ /**
+ * The execution context in which the hypervisor runs.
+ */
+ private lateinit var ctx: SimExecutionContext
+
+ /**
+ * The mapping from pCPU to vCPU.
+ */
+ private lateinit var vcpus: Array<VCpu?>
+
+ /**
+ * The available physical CPUs to schedule on.
+ */
+ private val availableCpus = ArrayDeque<Int>()
+
+ override fun canFit(model: SimMachineModel): Boolean = availableCpus.size >= model.cpus.size
+
+ override fun createMachine(
+ model: SimMachineModel,
+ performanceInterferenceModel: PerformanceInterferenceModel?
+ ): SimMachine {
+ require(canFit(model)) { "Cannot fit machine" }
+ return SimVm(model, performanceInterferenceModel)
+ }
+
+ override fun onStart(ctx: SimExecutionContext) {
+ this.ctx = ctx
+ this.vcpus = arrayOfNulls(ctx.machine.cpus.size)
+ this.availableCpus.addAll(ctx.machine.cpus.indices)
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return onNext(ctx, cpu, 0.0)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return vcpus[cpu]?.next(0.0) ?: SimResourceCommand.Idle()
+ }
+
+ /**
+ * A virtual machine running on the hypervisor.
+ *
+ * @property model The machine model of the virtual machine.
+ * @property performanceInterferenceModel The performance interference model to utilize.
+ */
+ private inner class SimVm(
+ override val model: SimMachineModel,
+ val performanceInterferenceModel: PerformanceInterferenceModel? = null,
+ ) : SimMachine {
+ /**
+ * A flag to indicate that the machine is terminated.
+ */
+ private var isTerminated = false
+
+ /**
+ * A [StateFlow] representing the CPU usage of the simulated machine.
+ */
+ override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
+
+ /**
+ * The current active workload.
+ */
+ private var cont: Continuation<Unit>? = null
+
+ /**
+ * The physical CPUs that have been allocated.
+ */
+ private val pCPUs = model.cpus.map { availableCpus.poll() }.toIntArray()
+
+ /**
+ * The active CPUs of this virtual machine.
+ */
+ private var cpus: List<VCpu> = emptyList()
+
+ /**
+ * The execution context in which the workload runs.
+ */
+ val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = model
+
+ override val clock: Clock
+ get() = this@SimSpaceSharedHypervisor.ctx.clock
+
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
+
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload) {
+ require(!isTerminated) { "Machine is terminated" }
+ require(cont == null) { "Run should not be called concurrently" }
+
+ workload.onStart(ctx)
+
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
+ this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, model, workload, pCPUs[index]) }
+
+ for (cpu in cpus) {
+ cpu.start()
+ }
+ }
+ }
+
+ override fun close() {
+ isTerminated = true
+ for (pCPU in pCPUs) {
+ vcpus[pCPU] = null
+ availableCpus.add(pCPU)
+ }
+ }
+
+ /**
+ * Update the usage of the VM.
+ */
+ fun updateUsage() {
+ usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency }
+ }
+
+ /**
+ * This method is invoked when one of the CPUs has exited.
+ */
+ fun onCpuExit(cpu: Int) {
+ // Check whether all other CPUs have finished
+ if (cpus.all { it.hasExited }) {
+ val cont = cont
+ this.cont = null
+ cont?.resume(Unit)
+ }
+ }
+
+ /**
+ * This method is invoked when one of the CPUs failed.
+ */
+ fun onCpuFailure(e: Throwable) {
+ // In case the flush fails with an exception, immediately propagate to caller, cancelling all other
+ // tasks.
+ val cont = cont
+ this.cont = null
+ cont?.resumeWithException(e)
+ }
+ }
+
+ /**
+ * A CPU of the virtual machine.
+ */
+ private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) {
+ /**
+ * The processing speed of the vCPU.
+ */
+ var speed: Double = 0.0
+ set(value) {
+ field = value
+ vm.updateUsage()
+ }
+
+ /**
+ * A flag to indicate that the CPU has exited.
+ */
+ var hasExited: Boolean = false
+
+ /**
+ * A flag to indicate that the CPU was started.
+ */
+ var hasStarted: Boolean = false
+
+ /**
+ * Process the specified [SimResourceCommand] for this CPU.
+ */
+ fun process(command: SimResourceCommand): SimResourceCommand {
+ return when (command) {
+ is SimResourceCommand.Idle -> {
+ speed = 0.0
+ command
+ }
+ is SimResourceCommand.Consume -> {
+ speed = min(model.frequency, command.limit)
+ command
+ }
+ is SimResourceCommand.Exit -> {
+ speed = 0.0
+ hasExited = true
+
+ vm.onCpuExit(model.id)
+
+ SimResourceCommand.Idle()
+ }
+ }
+ }
+
+ /**
+ * Start the CPU.
+ */
+ fun start() {
+ vcpus[pCPU] = this
+ interrupt()
+ }
+
+ /**
+ * Request the workload for more work.
+ */
+ fun next(remainingWork: Double): SimResourceCommand {
+ return try {
+ val command =
+ if (hasStarted) {
+ workload.onNext(ctx, model.id, remainingWork)
+ } else {
+ hasStarted = true
+ workload.onStart(ctx, model.id)
+ }
+ process(command)
+ } catch (e: Throwable) {
+ fail(e)
+ }
+ }
+
+ /**
+ * Interrupt the CPU.
+ */
+ fun interrupt() {
+ ctx.interrupt(pCPU)
+ }
+
+ /**
+ * Fail the CPU.
+ */
+ fun fail(e: Throwable): SimResourceCommand {
+ hasExited = true
+
+ vm.onCpuFailure(e)
+
+ return SimResourceCommand.Idle()
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
new file mode 100644
index 00000000..3d49e544
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+/**
+ * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
+ */
+public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
+ override val id: String = "space-shared"
+
+ override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor(listener)
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 0d2c9374..c22fcc07 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -23,37 +23,46 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimExecutionContext
-import kotlin.math.min
/**
- * A [SimWorkload] that models applications performing a static number of floating point operations ([flops]) on
- * a compute resource.
+ * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on
+ * multiple cores of a compute resource.
*
* @property flops The number of floating point operations to perform for this task in MFLOPs.
- * @property cores The number of cores that the image is able to utilize.
* @property utilization A model of the CPU utilization of the application.
*/
public class SimFlopsWorkload(
public val flops: Long,
- public val cores: Int,
public val utilization: Double = 0.8
) : SimWorkload {
init {
- require(flops >= 0) { "Negative number of FLOPs" }
- require(cores > 0) { "Negative number of cores or no cores" }
+ require(flops >= 0) { "Negative number of flops" }
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- /**
- * Execute the runtime behavior based on a number of floating point operations to execute.
- */
- override suspend fun run(ctx: SimExecutionContext) {
- val cores = min(this.cores, ctx.machine.cpus.size)
- val burst = LongArray(cores) { flops / cores }
- val maxUsage = DoubleArray(cores) { i -> ctx.machine.cpus[i].frequency * utilization }
+ override fun onStart(ctx: SimExecutionContext) {}
- ctx.run(SimExecutionContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = SimExecutionContext.TriggerMode.LAST)
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ val cores = ctx.machine.cpus.size
+ val limit = ctx.machine.cpus[cpu].frequency * utilization
+ val work = flops.toDouble() / cores
+
+ return if (work > 0.0) {
+ SimResourceCommand.Consume(work, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return if (remainingWork > 0.0) {
+ val limit = ctx.machine.cpus[cpu].frequency * utilization
+
+ return SimResourceCommand.Consume(remainingWork, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
}
- override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,cores=$cores,utilization=$utilization)"
+ override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt
new file mode 100644
index 00000000..41a5028e
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+/**
+ * A command that is sent to the host machine.
+ */
+public sealed class SimResourceCommand {
+ /**
+ * A request to the host to process the specified amount of [work] on a vCPU before the specified [deadline].
+ *
+ * @param work The amount of work to process on the CPU.
+ * @param limit The maximum amount of work to be processed per second.
+ * @param deadline The instant at which the work needs to be fulfilled.
+ */
+ public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() {
+ init {
+ require(work > 0) { "The amount of work must be positive." }
+ require(limit > 0) { "Limit must be positive." }
+ }
+ }
+
+ /**
+ * An indication to the host that the vCPU will idle until the specified [deadline] or is interrupted.
+ */
+ public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand()
+
+ /**
+ * An indication to the host that the vCPU has finished processing.
+ */
+ public object Exit : SimResourceCommand()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
new file mode 100644
index 00000000..00ebebce
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+import org.opendc.simulator.compute.SimExecutionContext
+
+/**
+ * A [SimWorkload] that models application execution as a single duration.
+ *
+ * @property duration The duration of the workload.
+ * @property utilization The utilization of the application during runtime.
+ */
+public class SimRuntimeWorkload(
+ public val duration: Long,
+ public val utilization: Double = 0.8
+) : SimWorkload {
+ init {
+ require(duration >= 0) { "Duration must be non-negative" }
+ require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
+ }
+
+ override fun onStart(ctx: SimExecutionContext) {}
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ val limit = ctx.machine.cpus[cpu].frequency * utilization
+ val work = (limit / 1000) * duration
+ return SimResourceCommand.Consume(work, limit)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return if (remainingWork > 0.0) {
+ val limit = ctx.machine.cpus[cpu].frequency * utilization
+ SimResourceCommand.Consume(remainingWork, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+
+ override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 7b1ddf32..deb10b98 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -23,31 +23,64 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimExecutionContext
-import kotlin.math.min
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
* consumption for some period of time.
*/
public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkload {
- override suspend fun run(ctx: SimExecutionContext) {
- var offset = ctx.clock.millis()
-
- val batch = trace.map { fragment ->
- val cores = min(fragment.cores, ctx.machine.cpus.size)
- val burst = LongArray(cores) { fragment.flops / cores }
- val usage = DoubleArray(cores) { fragment.usage / cores }
- offset += fragment.duration
- SimExecutionContext.Slice(burst, usage, offset)
+ private var offset = 0L
+ private val iterator = trace.iterator()
+ private var fragment: Fragment? = null
+ private lateinit var barrier: SimWorkloadBarrier
+
+ override fun onStart(ctx: SimExecutionContext) {
+ barrier = SimWorkloadBarrier(ctx.machine.cpus.size)
+ fragment = nextFragment()
+ offset = ctx.clock.millis()
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return onNext(ctx, cpu, 0.0)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ val now = ctx.clock.millis()
+ val fragment = fragment ?: return SimResourceCommand.Exit
+ val work = (fragment.duration / 1000) * fragment.usage
+ val deadline = offset + fragment.duration
+
+ assert(deadline >= now) { "Deadline already passed" }
+
+ val cmd =
+ if (cpu < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, fragment.usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+
+ if (barrier.enter()) {
+ this.fragment = nextFragment()
+ this.offset += fragment.duration
}
- ctx.run(batch)
+ return cmd
}
override fun toString(): String = "SimTraceWorkload"
/**
+ * Obtain the next fragment.
+ */
+ private fun nextFragment(): Fragment? {
+ return if (iterator.hasNext()) {
+ iterator.next()
+ } else {
+ null
+ }
+ }
+
+ /**
* A fragment of the workload.
*/
- public data class Fragment(val time: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int)
+ public data class Fragment(val duration: Long, val usage: Double, val cores: Int)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index 2add8cce..6fc78d56 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -28,14 +28,31 @@ import org.opendc.simulator.compute.SimExecutionContext
* A model that characterizes the runtime behavior of some particular workload.
*
* Workloads are stateful objects that may be paused and resumed at a later moment. As such, be careful when using the
- * same [SimWorkload] from multiple contexts as only a single concurrent [run] call is expected.
+ * same [SimWorkload] from multiple contexts.
*/
public interface SimWorkload {
/**
- * Launch the workload in the specified [SimExecutionContext].
+ * This method is invoked when the workload is started, before the (virtual) CPUs assigned to the workload will
+ * start.
+ */
+ public fun onStart(ctx: SimExecutionContext)
+
+ /**
+ * This method is invoked when a (virtual) CPU assigned to the workload has started.
+ *
+ * @param ctx The execution context in which the workload runs.
+ * @param cpu The index of the (virtual) CPU to start.
+ * @return The command to perform on the CPU.
+ */
+ public fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand
+
+ /**
+ * This method is invoked when a (virtual) CPU assigned to the workload was interrupted or reached its deadline.
*
- * This method should encapsulate and characterize the runtime behavior of the instance resulting from launching
- * the workload on some machine, in terms of the resource consumption on the machine.
+ * @param ctx The execution context in which the workload runs.
+ * @param cpu The index of the (virtual) CPU to obtain the resource consumption of.
+ * @param remainingWork The remaining work that was not yet completed.
+ * @return The next command to perform on the CPU.
*/
- public suspend fun run(ctx: SimExecutionContext)
+ public fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt
new file mode 100644
index 00000000..45a299be
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+/**
+ * The [SimWorkloadBarrier] is a barrier that allows workloads to wait for a select number of CPUs to complete, before
+ * proceeding its operation.
+ */
+public class SimWorkloadBarrier(public val parties: Int) {
+ private var counter = 0
+
+ /**
+ * Enter the barrier and determine whether the caller is the last to reach the barrier.
+ *
+ * @return `true` if the caller is the last to reach the barrier, `false` otherwise.
+ */
+ public fun enter(): Boolean {
+ val last = ++counter == parties
+ if (last) {
+ counter = 0
+ return true
+ }
+ return false
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index e7fdd4b2..b8eee4f0 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -26,7 +26,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions
+import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
@@ -51,7 +51,7 @@ internal class SimHypervisorTest {
scope = TestCoroutineScope()
clock = DelayControllerClockAdapter(scope)
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
machineModel = SimMachineModel(
cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
@@ -59,27 +59,27 @@ internal class SimHypervisorTest {
}
/**
- * Test overcommissioning of a hypervisor.
+ * Test overcommitting of resources via the hypervisor with a single VM.
*/
@Test
- fun overcommission() {
+ fun testOvercommittedSingle() {
val listener = object : SimHypervisor.Listener {
- var totalRequestedBurst = 0L
- var totalGrantedBurst = 0L
- var totalOvercommissionedBurst = 0L
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
cpuUsage: Double,
cpuDemand: Double
) {
- totalRequestedBurst += requestedBurst
- totalGrantedBurst += grantedBurst
- totalOvercommissionedBurst += overcommissionedBurst
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
}
}
@@ -88,24 +88,84 @@ internal class SimHypervisorTest {
val workloadA =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
- SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(0, 183L * duration, duration * 1000, 183.0, 2)
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ ),
+ )
+
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimFairShareHypervisor(listener)
+
+ launch {
+ machine.run(hypervisor)
+ }
+
+ yield()
+ launch { hypervisor.createMachine(machineModel).run(workloadA) }
+ }
+
+ scope.advanceUntilIdle()
+ scope.uncaughtExceptions.forEach { it.printStackTrace() }
+
+ assertAll(
+ { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
+ { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, scope.currentTime) }
+ )
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with two VMs.
+ */
+ @Test
+ fun testOvercommittedDual() {
+ val listener = object : SimHypervisor.Listener {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ hypervisor: SimHypervisor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ scope.launch {
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
),
)
val workloadB =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
- SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(0, 73L * duration, duration * 1000, 73.0, 2)
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 73.0, 1)
)
)
val machine = SimBareMetalMachine(scope, clock, machineModel)
- val hypervisor = SimFairSharedHypervisor(scope, clock, listener)
+ val hypervisor = SimFairShareHypervisor(listener)
launch {
machine.run(hypervisor)
@@ -117,13 +177,14 @@ internal class SimHypervisorTest {
}
scope.advanceUntilIdle()
+ scope.uncaughtExceptions.forEach { it.printStackTrace() }
assertAll(
- { Assertions.assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
- { Assertions.assertEquals(2082000, listener.totalRequestedBurst, "Requested Burst does not match") },
- { Assertions.assertEquals(2013600, listener.totalGrantedBurst, "Granted Burst does not match") },
- { Assertions.assertEquals(60000, listener.totalOvercommissionedBurst, "Overcommissioned Burst does not match") },
- { Assertions.assertEquals(1200001, scope.currentTime) }
+ { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
+ { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, scope.currentTime) }
)
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 332ca8e9..1036f1ac 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -23,15 +23,20 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
/**
@@ -58,7 +63,7 @@ class SimMachineTest {
val machine = SimBareMetalMachine(testScope, clock, machineModel)
testScope.runBlockingTest {
- machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0))
+ machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
// Two cores execute 1000 MFlOps per second (1000 ms)
assertEquals(1000, testScope.currentTime)
@@ -72,12 +77,83 @@ class SimMachineTest {
val machine = SimBareMetalMachine(testScope, clock, machineModel)
testScope.runBlockingTest {
- machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0))
- assertEquals(1.0, machine.usage.value)
+ val res = mutableListOf<Double>()
+ val job = launch { machine.usage.toList(res) }
- // Wait for the usage to reset
- delay(1)
- assertEquals(0.0, machine.usage.value)
+ machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
+
+ job.cancel()
+ assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" }
+ }
+ }
+
+ @Test
+ fun testInterrupt() {
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+ val machine = SimBareMetalMachine(testScope, clock, machineModel)
+
+ val workload = object : SimWorkload {
+ override fun onStart(ctx: SimExecutionContext) {}
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ ctx.interrupt(cpu)
+ return SimResourceCommand.Exit
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertDoesNotThrow {
+ testScope.runBlockingTest { machine.run(workload) }
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnStart() {
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+ val machine = SimBareMetalMachine(testScope, clock, machineModel)
+
+ val workload = object : SimWorkload {
+ override fun onStart(ctx: SimExecutionContext) {}
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ throw IllegalStateException()
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ testScope.runBlockingTest { machine.run(workload) }
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() {
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+ val machine = SimBareMetalMachine(testScope, clock, machineModel)
+
+ val workload = object : SimWorkload {
+ override fun onStart(ctx: SimExecutionContext) {}
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ testScope.runBlockingTest { machine.run(workload) }
}
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
new file mode 100644
index 00000000..1a9faf11
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
@@ -0,0 +1,175 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimRuntimeWorkload
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import java.time.Clock
+
+/**
+ * A test suite for the [SimSpaceSharedHypervisor].
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimSpaceSharedHypervisorTest {
+ private lateinit var scope: TestCoroutineScope
+ private lateinit var clock: Clock
+ private lateinit var machineModel: SimMachineModel
+
+ @BeforeEach
+ fun setUp() {
+ scope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(scope)
+
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
+ machineModel = SimMachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+ }
+
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() {
+ val usagePm = mutableListOf<Double>()
+ val usageVm = mutableListOf<Double>()
+
+ scope.launch {
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ ),
+ )
+
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ launch { machine.usage.toList(usagePm) }
+ launch { machine.run(hypervisor) }
+
+ yield()
+ launch {
+ val vm = hypervisor.createMachine(machineModel)
+ launch { vm.usage.toList(usageVm) }
+ vm.run(workloadA)
+ }
+ }
+
+ scope.advanceUntilIdle()
+
+ assertAll(
+ { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usagePm) { "Correct PM usage" } },
+ { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } },
+ { assertEquals(5 * 60L * 4000, scope.currentTime) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() {
+ val duration = 5 * 60L * 1000
+ val workload = SimRuntimeWorkload(duration)
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+ launch { hypervisor.createMachine(machineModel).run(workload) }
+ }
+
+ scope.advanceUntilIdle()
+
+ assertEquals(duration, scope.currentTime) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() {
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+
+ hypervisor.createMachine(machineModel)
+
+ assertAll(
+ { assertFalse(hypervisor.canFit(machineModel)) },
+ { assertThrows<IllegalStateException> { hypervisor.createMachine(machineModel) } }
+ )
+ }
+
+ scope.advanceUntilIdle()
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadSucceeds() {
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+
+ hypervisor.createMachine(machineModel).close()
+
+ assertAll(
+ { assertTrue(hypervisor.canFit(machineModel)) },
+ { assertDoesNotThrow { hypervisor.createMachine(machineModel) } }
+ )
+ }
+
+ scope.advanceUntilIdle()
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt
index 51bed76c..b3e57453 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt
@@ -32,42 +32,28 @@ class SimFlopsWorkloadTest {
@Test
fun testFlopsNonNegative() {
assertThrows<IllegalArgumentException>("FLOPs must be non-negative") {
- SimFlopsWorkload(-1, 1)
- }
- }
-
- @Test
- fun testCoresNonZero() {
- assertThrows<IllegalArgumentException>("Cores cannot be zero") {
- SimFlopsWorkload(1, 0)
- }
- }
-
- @Test
- fun testCoresPositive() {
- assertThrows<IllegalArgumentException>("Cores cannot be negative") {
- SimFlopsWorkload(1, -1)
+ SimFlopsWorkload(-1)
}
}
@Test
fun testUtilizationNonZero() {
assertThrows<IllegalArgumentException>("Utilization cannot be zero") {
- SimFlopsWorkload(1, 1, 0.0)
+ SimFlopsWorkload(1, 0.0)
}
}
@Test
fun testUtilizationPositive() {
assertThrows<IllegalArgumentException>("Utilization cannot be negative") {
- SimFlopsWorkload(1, 1, -1.0)
+ SimFlopsWorkload(1, -1.0)
}
}
@Test
fun testUtilizationNotLargerThanOne() {
assertThrows<IllegalArgumentException>("Utilization cannot be larger than one") {
- SimFlopsWorkload(1, 1, 2.0)
+ SimFlopsWorkload(1, 2.0)
}
}
}
diff --git a/simulator/opendc-utils/build.gradle.kts b/simulator/opendc-utils/build.gradle.kts
index d66148c4..d4b8c514 100644
--- a/simulator/opendc-utils/build.gradle.kts
+++ b/simulator/opendc-utils/build.gradle.kts
@@ -29,4 +29,9 @@ plugins {
dependencies {
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Library.KOTLINX_COROUTINES}")
+
+ testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+ testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
+ testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
+ testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}")
}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
new file mode 100644
index 00000000..ff116443
--- /dev/null
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -0,0 +1,209 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.utils
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.sendBlocking
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.selects.select
+import java.time.Clock
+import java.util.*
+import kotlin.math.max
+
+/**
+ * A TimerScheduler facilitates scheduled execution of future tasks.
+ *
+ * @property coroutineScope The [CoroutineScope] to run the tasks in.
+ * @property clock The clock to keep track of the time.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, private val clock: Clock) : AutoCloseable {
+ /**
+ * A priority queue containing the tasks to be scheduled in the future.
+ */
+ private val queue = PriorityQueue<Timer>()
+
+ /**
+ * A map that keeps track of the timers.
+ */
+ private val timers = mutableMapOf<T, Timer>()
+
+ /**
+ * The channel to communicate with the
+ */
+ private val channel = Channel<Long?>(Channel.CONFLATED)
+
+ /**
+ * The scheduling job.
+ */
+ private val job = coroutineScope.launch {
+ val queue = queue
+ var next: Long? = channel.receive()
+
+ while (true) {
+ next = select {
+ channel.onReceive { it }
+
+ val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select
+
+ onTimeout(delay) {
+ while (queue.isNotEmpty()) {
+ val timer = queue.peek()
+ val timestamp = clock.millis()
+
+ assert(timer.timestamp >= timestamp) { "Found task in the past" }
+
+ if (timer.timestamp > timestamp && !timer.isCancelled) {
+ // Schedule a task for the next event to occur.
+ return@onTimeout timer.timestamp
+ }
+
+ queue.poll()
+
+ if (!timer.isCancelled) {
+ timers.remove(timer.key)
+ timer()
+ }
+ }
+
+ null
+ }
+ }
+ }
+ }
+
+ /**
+ * Stop the scheduler.
+ */
+ override fun close() {
+ cancelAll()
+ job.cancel()
+ }
+
+ /**
+ * Cancel a timer with a given key.
+ *
+ * If canceling a timer that was already canceled, or key never was used to start
+ * a timer this operation will do nothing.
+ *
+ * @param key The key of the timer to cancel.
+ */
+ public fun cancel(key: T) {
+ if (!job.isActive) {
+ return
+ }
+
+ val timer = timers.remove(key)
+
+ // Mark the timer as cancelled
+ timer?.isCancelled = true
+
+ // Optimization: check whether we are the head of the queue
+ if (queue.peek() == timer) {
+ queue.poll()
+
+ if (queue.isNotEmpty()) {
+ channel.sendBlocking(queue.peek().timestamp)
+ } else {
+ channel.sendBlocking(null)
+ }
+ }
+ }
+
+ /**
+ * Cancel all timers.
+ */
+ public fun cancelAll() {
+ queue.clear()
+ timers.clear()
+ }
+
+ /**
+ * Check if a timer with a given key is active.
+ *
+ * @param key The key to check if active.
+ * @return `true` if the timer with the specified [key] is active, `false` otherwise.
+ */
+ public fun isTimerActive(key: T): Boolean = key in timers
+
+ /**
+ * Start a timer that will invoke the specified [block] after [delay].
+ *
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param delay The delay before invoking the block.
+ * @param block The block to invoke.
+ */
+ public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) {
+ startSingleTimerTo(key, clock.millis() + delay, block)
+ }
+
+ /**
+ * Start a timer that will invoke the specified [block] at [timestamp].
+ *
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param timestamp The timestamp at which to invoke the block.
+ * @param block The block to invoke.
+ */
+ public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
+ val now = clock.millis()
+
+ require(timestamp >= now) { "Timestamp must be in the future" }
+ check(job.isActive) { "Timer is stopped" }
+
+ val timer = Timer(key, timestamp, block)
+
+ timers.compute(key) { _, old ->
+ old?.isCancelled = true
+ timer
+ }
+ queue.add(timer)
+
+ // Check if we need to push the interruption forward
+ if (queue.peek() == timer) {
+ channel.sendBlocking(timer.timestamp)
+ }
+ }
+
+ /**
+ * A task that is scheduled to run in the future.
+ */
+ private inner class Timer(val key: T, val timestamp: Long, val block: () -> Unit) : Comparable<Timer> {
+ /**
+ * A flag to indicate that the task has been cancelled.
+ */
+ var isCancelled: Boolean = false
+
+ /**
+ * Run the task.
+ */
+ operator fun invoke(): Unit = block()
+
+ override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp)
+ }
+}
diff --git a/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt b/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt
new file mode 100644
index 00000000..3a4acc90
--- /dev/null
+++ b/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.utils
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+
+/**
+ * A test suite for the [TimerScheduler] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class TimerSchedulerTest {
+ @Test
+ fun testBasicTimer() {
+ runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Int>(this, clock)
+
+ scheduler.startSingleTimer(0, 1000) {
+ scheduler.close()
+ assertEquals(1000, clock.millis())
+ }
+ }
+ }
+
+ @Test
+ fun testCancelNonExisting() {
+ runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Int>(this, clock)
+
+ scheduler.cancel(1)
+ scheduler.close()
+ }
+ }
+
+ @Test
+ fun testCancelExisting() {
+ runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Int>(this, clock)
+
+ scheduler.startSingleTimer(0, 1000) {
+ assertFalse(false)
+ }
+
+ scheduler.startSingleTimer(1, 100) {
+ scheduler.cancel(0)
+ scheduler.close()
+
+ assertEquals(100, clock.millis())
+ }
+ }
+ }
+
+ @Test
+ fun testCancelAll() {
+ runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Int>(this, clock)
+
+ scheduler.startSingleTimer(0, 1000) {
+ assertFalse(false)
+ }
+
+ scheduler.startSingleTimer(1, 100) {
+ assertFalse(false)
+ }
+
+ scheduler.close()
+ }
+ }
+
+ @Test
+ fun testOverride() {
+ runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Int>(this, clock)
+
+ scheduler.startSingleTimer(0, 1000) {
+ assertFalse(false)
+ }
+
+ scheduler.startSingleTimer(0, 200) {
+ scheduler.close()
+
+ assertEquals(200, clock.millis())
+ }
+ }
+ }
+
+ @Test
+ fun testStopped() {
+ runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Int>(this, clock)
+
+ scheduler.close()
+
+ assertThrows<IllegalStateException> {
+ scheduler.startSingleTimer(1, 100) {
+ assertFalse(false)
+ }
+ }
+ }
+ }
+
+ @Test
+ fun testNegativeDelay() {
+ runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Int>(this, clock)
+
+ assertThrows<IllegalArgumentException> {
+ scheduler.startSingleTimer(1, -1) {
+ assertFalse(false)
+ }
+ }
+
+ scheduler.close()
+ }
+ }
+}
diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts
index 4346efcc..e9c85de5 100644
--- a/simulator/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc-workflows/build.gradle.kts
@@ -41,6 +41,7 @@ dependencies {
exclude("org.jetbrains.kotlin", module = "kotlin-reflect")
}
testImplementation(kotlin("reflect"))
+ testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}")
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index 2c8d9a0b..e04c8a4c 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -340,7 +340,7 @@ public class StageWorkflowService(
}
}
- private suspend fun finishJob(job: JobState) {
+ private fun finishJob(job: JobState) {
activeJobs -= job
tracer.commit(WorkflowEvent.JobFinished(this, job.job))
rootListener.jobFinished(job)
diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index b97cb915..2bfcba35 100644
--- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -41,6 +41,7 @@ import org.opendc.compute.simulator.SimVirtProvisioningService
import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
+import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
@@ -59,7 +60,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
* A large integration test where we check whether all tasks in some trace are executed correctly.
*/
@Test
- fun `should execute all tasks in trace`() {
+ fun testTrace() {
var jobsSubmitted = 0L
var jobsStarted = 0L
var jobsFinished = 0L
@@ -79,7 +80,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
// Wait for the bare metal nodes to be spawned
delay(10)
- val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, schedulingQuantum = 1000)
+ val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000)
// Wait for the hypervisors to be spawned
delay(10)