diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-09 14:01:55 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-09 14:01:55 +0100 |
| commit | 66c2501d95b167f9e7474a45e542f82d2d8e83ff (patch) | |
| tree | 7c3464a424891ab7c3cb9c0ac77d67256b144f97 /simulator/opendc-simulator/opendc-simulator-compute/src | |
| parent | 2977dd8a5f1d742193eae79364a284e68269f7b5 (diff) | |
| parent | 75751865179c6cd5a05abb4a0641193595f59b45 (diff) | |
compute: Improvements to cloud compute model (v1)
This is the first of the pull requests in an attempt to improve the existing cloud compute model (see #86). This pull request restructures the compute API and splits the consumer and service interfaces into different modules:
- opendc-compute-api now defines the API interface for the OpenDC Compute module, which can be used by consumers of the OpenDC Compute service.
- opendc-compute-service hosts the service implementation for OpenDC Compute and contains all business logic regarding the IaaS platform (such as scheduling).
- opendc-compute-simulator implements a "compute driver" for the OpenDC Compute platform that simulates submitted workloads.
- Image is now a data-class and does not specify itself the workload to simulate. Instead, the workload should be passed via its tags currently (with key "workload"). In the future, the simulation backend will accept a mapper interface that maps Images to Workloads.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-compute/src')
5 files changed, 72 insertions, 62 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 812b5f20..f74c5697 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -84,33 +84,33 @@ public class SimBareMetalMachine( private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock) /** - * The execution context in which the workload runs. - */ - private val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = this@SimBareMetalMachine.model - - override val clock: Clock - get() = this@SimBareMetalMachine.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = this@SimBareMetalMachine.model + + override val clock: Clock + get() = this@SimBareMetalMachine.clock + + override val meta: Map<String, Any> + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { Cpu(it, workload) } + this.cpus = model.cpus.map { Cpu(ctx, it, workload) } for (cpu in cpus) { cpu.start() @@ -161,7 +161,7 @@ public class SimBareMetalMachine( /** * A physical CPU of the machine. */ - private inner class Cpu(val model: ProcessingUnit, val workload: SimWorkload) { + private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) { /** * The current command. */ diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt index c7c3d3cc..657dac66 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt @@ -41,6 +41,11 @@ public interface SimExecutionContext { public val machine: SimMachineModel /** + * The metadata associated with the context. + */ + public val meta: Map<String, Any> + + /** * Ask the host machine to interrupt the specified vCPU. * * @param cpu The id of the vCPU to interrupt. diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 5e86d32b..bf6d8a5e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -336,33 +336,33 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener private var cpus: List<VCpu> = emptyList() /** - * The execution context in which the workload runs. - */ - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimFairShareHypervisor.ctx.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimFairShareHypervisor.ctx.clock + + override val meta: Map<String, Any> + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { VCpu(this, it, workload) } + this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) } for (cpu in cpus) { // Register vCPU to scheduler @@ -417,7 +417,12 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload) : Comparable<VCpu> { + private inner class VCpu( + val vm: SimVm, + val ctx: SimExecutionContext, + val model: ProcessingUnit, + val workload: SimWorkload + ) : Comparable<VCpu> { /** * The latest command processed by the CPU. */ @@ -488,7 +493,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener isIntermediate = true latestFlush = ctx.clock.millis() - process(workload.onStart(vm.ctx, model.id)) + process(workload.onStart(ctx, model.id)) } catch (e: Throwable) { fail(e) } finally { @@ -514,7 +519,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener // Act like nothing has happened in case the vCPU did not reach its deadline or was not // interrupted by the user. if (interrupt || command.deadline <= now) { - process(workload.onNext(vm.ctx, model.id, 0.0)) + process(workload.onNext(ctx, model.id, 0.0)) } } is SimResourceCommand.Consume -> { @@ -545,7 +550,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener totalOvercommittedWork += remainingWork } - process(workload.onNext(vm.ctx, model.id, remainingWork)) + process(workload.onNext(ctx, model.id, remainingWork)) } else { process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index ea8eeb37..bfaa60bc 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -42,7 +42,7 @@ public interface SimMachine : AutoCloseable { /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - public suspend fun run(workload: SimWorkload) + public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) /** * Terminate this machine. diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 66d3eda7..778b68ca 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -117,33 +117,33 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen private var cpus: List<VCpu> = emptyList() /** - * The execution context in which the workload runs. - */ - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimSpaceSharedHypervisor.ctx.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimSpaceSharedHypervisor.ctx.clock + + override val meta: Map<String, Any> + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, model, workload, pCPUs[index]) } + this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) } for (cpu in cpus) { cpu.start() @@ -193,7 +193,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { + private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { /** * The processing speed of the vCPU. */ @@ -267,7 +267,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen * Interrupt the CPU. */ fun interrupt() { - ctx.interrupt(pCPU) + this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU) } /** |
