summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt22
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt19
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt8
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt18
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt20
-rw-r--r--opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt28
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt150
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt143
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt110
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt71
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt)44
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt)13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt)25
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt76
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt55
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt43
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt30
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt40
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt (renamed from opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt)19
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt (renamed from opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt)59
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt38
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt115
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt362
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt131
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt14
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt9
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt)53
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt)27
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt11
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt386
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt99
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt81
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt69
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt95
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt99
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt9
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt27
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt74
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt43
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt36
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt422
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt)22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt331
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt63
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt93
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt27
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt19
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt71
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt32
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt5
65 files changed, 2313 insertions, 1697 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 68667a8c..d36717af 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -32,15 +32,16 @@ import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.ScalingDriver
import org.opendc.simulator.compute.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.PowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.failures.FailureDomain
+import org.opendc.simulator.resources.SimResourceInterpreter
import java.time.Clock
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -59,7 +60,7 @@ public class SimHost(
meter: Meter,
hypervisor: SimHypervisorProvider,
scalingGovernor: ScalingGovernor,
- scalingDriver: ScalingDriver,
+ scalingDriver: PowerDriver,
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
) : Host, FailureDomain, AutoCloseable {
@@ -74,7 +75,7 @@ public class SimHost(
hypervisor: SimHypervisorProvider,
powerModel: PowerModel = ConstantPowerModel(0.0),
mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
- ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimpleScalingDriver(powerModel), mapper)
+ ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimplePowerDriver(powerModel), mapper)
/**
* The [CoroutineScope] of the host bounded by the lifecycle of the host.
@@ -94,19 +95,24 @@ public class SimHost(
/**
* Current total memory use of the images on this hypervisor.
*/
- private var availableMemory: Long = model.memory.map { it.size }.sum()
+ private var availableMemory: Long = model.memory.sumOf { it.size }
+
+ /**
+ * The resource interpreter to schedule the resource interactions.
+ */
+ private val interpreter = SimResourceInterpreter(context, clock)
/**
* The machine to run on.
*/
- public val machine: SimBareMetalMachine = SimBareMetalMachine(context, clock, model, scalingGovernor, scalingDriver)
+ public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, scalingDriver)
/**
* The hypervisor to run multiple workloads.
*/
public val hypervisor: SimHypervisor = hypervisor.create(
- scope.coroutineContext, clock,
- object : SimHypervisor.Listener {
+ interpreter,
+ listener = object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
requestedWork: Long,
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 5594fd59..a6cff3ba 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -124,9 +124,18 @@ internal class SimHostTest {
object : MetricExporter {
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
val metricsByName = metrics.associateBy { it.name }
- requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong()
- grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong()
- overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong()
+ val totalWork = metricsByName["cpu.work.total"]
+ if (totalWork != null) {
+ requestedWork += totalWork.doubleSummaryData.points.first().sum.toLong()
+ }
+ val grantedWorkCycle = metricsByName["cpu.work.granted"]
+ if (grantedWorkCycle != null) {
+ grantedWork += grantedWorkCycle.doubleSummaryData.points.first().sum.toLong()
+ }
+ val overcommittedWorkCycle = metricsByName["cpu.work.overcommit"]
+ if (overcommittedWorkCycle != null) {
+ overcommittedWork += overcommittedWorkCycle.doubleSummaryData.points.first().sum.toLong()
+ }
return CompletableResultCode.ofSuccess()
}
@@ -160,8 +169,8 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(4197600, requestedWork, "Requested work does not match") },
- { assertEquals(2157600, grantedWork, "Granted work does not match") },
+ { assertEquals(4147200, requestedWork, "Requested work does not match") },
+ { assertEquals(2107200, grantedWork, "Granted work does not match") },
{ assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
{ assertEquals(1500001, clock.millis()) }
)
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 7c7f0dad..0dade513 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -48,4 +48,6 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
+
+ testImplementation(libs.log4j.slf4j)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 2d5cc68c..4b21b4f7 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -113,8 +113,8 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(207380244590, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(207112418950, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
{ assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
@@ -150,8 +150,8 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(96344616902, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(96324879442, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
{ assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index 7460a1e7..37e10580 100644
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -173,23 +173,23 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
*/
public enum class PowerModelType {
CUBIC {
- override val driver: ScalingDriver = SimpleScalingDriver(CubicPowerModel(206.0, 56.4))
+ override val driver: PowerDriver = SimplePowerDriver(CubicPowerModel(206.0, 56.4))
},
LINEAR {
- override val driver: ScalingDriver = SimpleScalingDriver(LinearPowerModel(206.0, 56.4))
+ override val driver: PowerDriver = SimplePowerDriver(LinearPowerModel(206.0, 56.4))
},
SQRT {
- override val driver: ScalingDriver = SimpleScalingDriver(SqrtPowerModel(206.0, 56.4))
+ override val driver: PowerDriver = SimplePowerDriver(SqrtPowerModel(206.0, 56.4))
},
SQUARE {
- override val driver: ScalingDriver = SimpleScalingDriver(SquarePowerModel(206.0, 56.4))
+ override val driver: PowerDriver = SimplePowerDriver(SquarePowerModel(206.0, 56.4))
},
INTERPOLATION {
- override val driver: ScalingDriver = SimpleScalingDriver(
+ override val driver: PowerDriver = SimplePowerDriver(
InterpolationPowerModel(
listOf(56.4, 100.0, 107.0, 117.0, 127.0, 138.0, 149.0, 162.0, 177.0, 191.0, 206.0)
)
@@ -197,17 +197,17 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
},
MSE {
- override val driver: ScalingDriver = SimpleScalingDriver(MsePowerModel(206.0, 56.4, 1.4))
+ override val driver: PowerDriver = SimplePowerDriver(MsePowerModel(206.0, 56.4, 1.4))
},
ASYMPTOTIC {
- override val driver: ScalingDriver = SimpleScalingDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, false))
+ override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, false))
},
ASYMPTOTIC_DVFS {
- override val driver: ScalingDriver = SimpleScalingDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, true))
+ override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, true))
};
- public abstract val driver: ScalingDriver
+ public abstract val driver: PowerDriver
}
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index f4c18ff1..001547ef 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -29,16 +29,12 @@ import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.PowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceCommand
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
+import org.opendc.simulator.resources.*
import java.time.Clock
import java.util.*
import kotlin.coroutines.Continuation
@@ -67,8 +63,8 @@ public class SimTFDevice(
* The [SimMachine] representing the device.
*/
private val machine = SimBareMetalMachine(
- scope.coroutineContext, clock, SimMachineModel(listOf(pu), listOf(memory)),
- PerformanceScalingGovernor(), SimpleScalingDriver(powerModel)
+ SimResourceInterpreter(scope.coroutineContext, clock), SimMachineModel(listOf(pu), listOf(memory)),
+ SimplePowerDriver(powerModel)
)
/**
@@ -119,9 +115,11 @@ public class SimTFDevice(
*/
private var activeWork: Work? = null
- override fun onStart(ctx: SimMachineContext) {}
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer = this
+ override fun onStart(ctx: SimMachineContext) {
+ for (cpu in ctx.cpus) {
+ cpu.startConsumer(this)
+ }
+ }
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val activeWork = activeWork
diff --git a/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt b/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt
index b5516b4d..32e5f75e 100644
--- a/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt
+++ b/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt
@@ -34,9 +34,9 @@ import org.opendc.serverless.simulator.workload.SimServerlessWorkloadMapper
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.resources.SimResourceInterpreter
import java.time.Clock
import java.util.ArrayDeque
import kotlin.coroutines.Continuation
@@ -72,7 +72,11 @@ public class SimFunctionDeployer(
/**
* The machine that will execute the workloads.
*/
- public val machine: SimMachine = SimBareMetalMachine(scope.coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
+ public val machine: SimMachine = SimBareMetalMachine(
+ SimResourceInterpreter(scope.coroutineContext, clock),
+ model,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
/**
* The job associated with the lifecycle of the instance.
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index 15714aca..fb753de2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -25,17 +25,15 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceScheduler
-import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
+import org.opendc.simulator.resources.SimResourceInterpreter
import org.openjdk.jmh.annotations.*
import java.util.concurrent.TimeUnit
@@ -46,13 +44,13 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var scheduler: SimResourceScheduler
+ private lateinit var interpreter: SimResourceInterpreter
private lateinit var machineModel: SimMachineModel
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
+ interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock)
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
@@ -85,8 +83,7 @@ class SimMachineBenchmarks {
fun benchmarkBareMetal(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace))
}
@@ -96,10 +93,9 @@ class SimMachineBenchmarks {
fun benchmarkSpaceSharedHypervisor(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
@@ -118,10 +114,9 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorSingle(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(scheduler)
+ val hypervisor = SimFairShareHypervisor(interpreter)
launch { machine.run(hypervisor) }
@@ -140,10 +135,9 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorDouble(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(scheduler)
+ val hypervisor = SimFairShareHypervisor(interpreter)
launch { machine.run(hypervisor) }
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
index 713376e7..57c25b86 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -22,21 +22,22 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.launch
+import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
-import java.time.Clock
+import org.opendc.simulator.resources.SimResourceSwitch
/**
* Abstract implementation of the [SimHypervisor] interface.
+ *
+ * @param interpreter The resource interpreter to use.
+ * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware.
*/
-public abstract class SimAbstractHypervisor : SimHypervisor {
+public abstract class SimAbstractHypervisor(
+ private val interpreter: SimResourceInterpreter,
+ private val scalingGovernor: ScalingGovernor?
+) : SimHypervisor {
/**
* The machine on which the hypervisor runs.
*/
@@ -55,6 +56,11 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
get() = _vms
/**
+ * The scaling governors attached to the physical CPUs backing this hypervisor.
+ */
+ private val governors = mutableListOf<ScalingGovernor.Logic>()
+
+ /**
* Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs.
*/
public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch
@@ -64,6 +70,16 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
*/
public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean
+ /**
+ * Trigger the governors to recompute the scaling limits.
+ */
+ protected fun triggerGovernors(load: Double) {
+ for (governor in governors) {
+ governor.onLimit(load)
+ }
+ }
+
+ /* SimHypervisor */
override fun canFit(model: SimMachineModel): Boolean {
return canFit(model, switch)
}
@@ -78,6 +94,21 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
return vm
}
+ /* SimWorkload */
+ override fun onStart(ctx: SimMachineContext) {
+ context = ctx
+ switch = createSwitch(ctx)
+
+ for (cpu in ctx.cpus) {
+ val governor = scalingGovernor?.createLogic(cpu)
+ if (governor != null) {
+ governors.add(governor)
+ governor.onStart()
+ }
+ switch.addInput(cpu)
+ }
+ }
+
/**
* A virtual machine running on the hypervisor.
*
@@ -85,105 +116,34 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
* @property performanceInterferenceModel The performance interference model to utilize.
*/
private inner class VirtualMachine(
- override val model: SimMachineModel,
+ model: SimMachineModel,
val performanceInterferenceModel: PerformanceInterferenceModel? = null,
- ) : SimMachine {
- /**
- * A [StateFlow] representing the CPU usage of the simulated machine.
- */
- override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
-
- /**
- * A flag to indicate that the machine is terminated.
- */
- private var isTerminated = false
-
+ ) : SimAbstractMachine(interpreter, parent = null, model) {
/**
* The vCPUs of the machine.
*/
- private val cpus = model.cpus.map { ProcessingUnitImpl(it, switch) }
-
- /**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
- */
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- coroutineScope {
- require(!isTerminated) { "Machine is terminated" }
-
- val ctx = object : SimMachineContext {
- override val cpus: List<SimProcessingUnit> = this@VirtualMachine.cpus
-
- override val memory: List<MemoryUnit>
- get() = model.memory
+ override val cpus = model.cpus.map { VCpu(switch.newOutput(), it) }
- override val clock: Clock
- get() = this@SimAbstractHypervisor.context.clock
-
- override val meta: Map<String, Any> = meta
- }
-
- workload.onStart(ctx)
-
- for (cpu in cpus) {
- launch {
- cpu.consume(workload.getConsumer(ctx, cpu.model))
- }
- }
- }
- }
-
- /**
- * Terminate this VM instance.
- */
override fun close() {
- if (!isTerminated) {
- isTerminated = true
+ super.close()
- cpus.forEach(SimProcessingUnit::close)
- _vms.remove(this)
- }
+ _vms.remove(this)
}
}
- override fun onStart(ctx: SimMachineContext) {
- context = ctx
- switch = createSwitch(ctx)
- }
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- val forwarder = SimResourceForwarder()
- switch.addInput(forwarder)
- return forwarder
- }
-
/**
- * The [SimProcessingUnit] of this machine.
+ * A [SimProcessingUnit] of a virtual machine.
*/
- public inner class ProcessingUnitImpl(override val model: ProcessingUnit, switch: SimResourceSwitch) : SimProcessingUnit {
- /**
- * The actual resource supporting the processing unit.
- */
- private val source = switch.addOutput(model.frequency)
-
- override val speed: Double = 0.0 /* TODO Implement */
-
- override val state: SimResourceState
- get() = source.state
-
- override fun startConsumer(consumer: SimResourceConsumer) {
- source.startConsumer(consumer)
- }
-
- override fun interrupt() {
- source.interrupt()
- }
-
- override fun cancel() {
- source.cancel()
- }
+ private class VCpu(
+ private val source: SimResourceProvider,
+ override val model: ProcessingUnit
+ ) : SimProcessingUnit, SimResourceProvider by source {
+ override var capacity: Double
+ get() = source.capacity
+ set(_) {
+ // Ignore capacity changes
+ }
- override fun close() {
- source.close()
- }
+ override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]"
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index e501033a..e12ac72b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -27,17 +27,27 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.consume
-import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
+import org.opendc.simulator.resources.*
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
/**
* Abstract implementation of the [SimMachine] interface.
+ *
+ * @param interpreter The interpreter to manage the machine's resources.
+ * @param parent The parent simulation system.
+ * @param model The model of the machine.
*/
-public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine {
+public abstract class SimAbstractMachine(
+ protected val interpreter: SimResourceInterpreter,
+ final override val parent: SimResourceSystem?,
+ final override val model: SimMachineModel
+) : SimMachine, SimResourceSystem {
+ /**
+ * A [StateFlow] representing the CPU usage of the simulated machine.
+ */
private val _usage = MutableStateFlow(0.0)
- override val usage: StateFlow<Double>
+ public final override val usage: StateFlow<Double>
get() = _usage
/**
@@ -48,67 +58,76 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
private var _speed = doubleArrayOf()
/**
- * A flag to indicate that the machine is terminated.
- */
- private var isTerminated = false
-
- /**
- * The [CoroutineContext] to run in.
- */
- protected abstract val context: CoroutineContext
-
- /**
* The resources allocated for this machine.
*/
protected abstract val cpus: List<SimProcessingUnit>
/**
- * The execution context in which the workload runs.
+ * A flag to indicate that the machine is terminated.
*/
- private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
- override val clock: Clock
- get() = this@SimAbstractMachine.clock
-
- override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
+ private var isTerminated = false
- override val memory: List<MemoryUnit> = model.memory
- }
+ /**
+ * The continuation to resume when the virtual machine workload has finished.
+ */
+ private var cont: Continuation<Unit>? = null
/**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) {
- require(!isTerminated) { "Machine is terminated" }
- val ctx = Context(meta)
- val totalCapacity = model.cpus.sumOf { it.frequency }
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
+ check(!isTerminated) { "Machine is terminated" }
+ check(cont == null) { "A machine cannot run concurrently" }
- _speed = DoubleArray(model.cpus.size) { 0.0 }
- var totalSpeed = 0.0
+ val ctx = Context(meta)
// Before the workload starts, initialize the initial power draw
+ _speed = DoubleArray(model.cpus.size) { 0.0 }
updateUsage(0.0)
- workload.onStart(ctx)
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
- for (cpu in cpus) {
- val model = cpu.model
- val consumer = workload.getConsumer(ctx, model)
- val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed ->
- val _speed = _speed
- val _usage = _usage
-
- val oldSpeed = _speed[model.id]
- _speed[model.id] = newSpeed
- totalSpeed = totalSpeed - oldSpeed + newSpeed
-
- val newUsage = totalSpeed / totalCapacity
- if (_usage.value != newUsage) {
- updateUsage(totalSpeed / totalCapacity)
+ // Cancel all cpus on cancellation
+ cont.invokeOnCancellation {
+ this.cont = null
+
+ interpreter.batch {
+ for (cpu in cpus) {
+ cpu.cancel()
+ }
}
}
- launch { cpu.consume(adapter) }
+ interpreter.batch { workload.onStart(ctx) }
+ }
+ }
+
+ override fun close() {
+ if (isTerminated) {
+ return
+ }
+
+ isTerminated = true
+ cancel()
+ interpreter.batch {
+ for (cpu in cpus) {
+ cpu.close()
+ }
+ }
+ }
+
+ /* SimResourceSystem */
+ override fun onConverge(timestamp: Long) {
+ val totalCapacity = model.cpus.sumOf { it.frequency }
+ val cpus = cpus
+ var totalSpeed = 0.0
+ for (cpu in cpus) {
+ _speed[cpu.model.id] = cpu.speed
+ totalSpeed += cpu.speed
}
+
+ updateUsage(totalSpeed / totalCapacity)
}
/**
@@ -118,10 +137,34 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
_usage.value = usage
}
- override fun close() {
- if (!isTerminated) {
- isTerminated = true
- cpus.forEach(SimProcessingUnit::close)
+ /**
+ * Cancel the workload that is currently running on the machine.
+ */
+ private fun cancel() {
+ interpreter.batch {
+ for (cpu in cpus) {
+ cpu.cancel()
+ }
+ }
+
+ val cont = cont
+ if (cont != null) {
+ this.cont = null
+ cont.resume(Unit)
}
}
+
+ /**
+ * The execution context in which the workload runs.
+ */
+ private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
+ override val interpreter: SimResourceInterpreter
+ get() = this@SimAbstractMachine.interpreter
+
+ override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
+
+ override val memory: List<MemoryUnit> = model.memory
+
+ override fun close() = cancel()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 27ebba21..5d5d1e5a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -22,62 +22,36 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.*
-import org.opendc.simulator.compute.cpufreq.ScalingDriver
-import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.resources.*
-import org.opendc.utils.TimerScheduler
-import java.time.Clock
-import kotlin.coroutines.*
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* A simulated bare-metal machine that is able to run a single workload.
*
* A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For
- * example. the class expects only a single concurrent call to [run].
+ * example. The class expects only a single concurrent call to [run].
*
- * @param context The [CoroutineContext] to run the simulated workload in.
- * @param clock The virtual clock to track the simulation time.
+ * @param interpreter The [SimResourceInterpreter] to drive the simulation.
* @param model The machine model to simulate.
+ * @param powerDriver The power driver to use.
+ * @param parent The parent simulation system.
*/
-@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
public class SimBareMetalMachine(
- context: CoroutineContext,
- private val clock: Clock,
- override val model: SimMachineModel,
- scalingGovernor: ScalingGovernor,
- scalingDriver: ScalingDriver
-) : SimAbstractMachine(clock) {
- /**
- * The [Job] associated with this machine.
- */
- private val scope = CoroutineScope(context + Job())
-
- override val context: CoroutineContext = scope.coroutineContext
-
- /**
- * The [TimerScheduler] to use for scheduling the interrupts.
- */
- private val scheduler = SimResourceSchedulerTrampoline(this.context, clock)
-
- override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) }
+ interpreter: SimResourceInterpreter,
+ model: SimMachineModel,
+ powerDriver: PowerDriver,
+ parent: SimResourceSystem? = null,
+) : SimAbstractMachine(interpreter, parent, model) {
+ override val cpus: List<SimProcessingUnit> = model.cpus.map { cpu ->
+ Cpu(SimResourceSource(cpu.frequency, interpreter, this@SimBareMetalMachine), cpu)
+ }
/**
- * Construct the [ScalingDriver.Logic] for this machine.
+ * Construct the [PowerDriver.Logic] for this machine.
*/
- private val scalingDriver = scalingDriver.createLogic(this)
-
- /**
- * The scaling contexts associated with each CPU.
- */
- private val scalingGovernors = cpus.map { cpu ->
- scalingGovernor.createLogic(this.scalingDriver.createContext(cpu))
- }
-
- init {
- scalingGovernors.forEach { it.onStart() }
- }
+ private val powerDriver = powerDriver.createLogic(this, cpus)
/**
* The power draw of the machine.
@@ -88,45 +62,25 @@ public class SimBareMetalMachine(
override fun updateUsage(usage: Double) {
super.updateUsage(usage)
- scalingGovernors.forEach { it.onLimit() }
- powerDraw = scalingDriver.computePower()
- }
-
- override fun close() {
- super.close()
-
- scope.cancel()
+ powerDraw = powerDriver.computePower()
}
/**
- * The [SimProcessingUnit] of this machine.
+ * A [SimProcessingUnit] of a bare-metal machine.
*/
- public inner class ProcessingUnitImpl(override val model: ProcessingUnit) : SimProcessingUnit {
- /**
- * The actual resource supporting the processing unit.
- */
- private val source = SimResourceSource(model.frequency, scheduler)
-
- override val speed: Double
- get() = source.speed
-
- override val state: SimResourceState
- get() = source.state
-
- override fun startConsumer(consumer: SimResourceConsumer) {
- source.startConsumer(consumer)
- }
-
- override fun interrupt() {
- source.interrupt()
- }
-
- override fun cancel() {
- source.cancel()
- }
-
- override fun close() {
- source.interrupt()
- }
+ private class Cpu(
+ private val source: SimResourceSource,
+ override val model: ProcessingUnit
+ ) : SimProcessingUnit, SimResourceProvider by source {
+ override var capacity: Double
+ get() = source.capacity
+ set(value) {
+ // Clamp the capacity of the CPU between [0.0, maxFreq]
+ if (value >= 0.0 && value <= model.frequency) {
+ source.capacity = value
+ }
+ }
+
+ override fun toString(): String = "SimBareMetalMachine.Cpu[model=$model]"
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index 11aec2de..e7776c81 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -22,35 +22,72 @@
package org.opendc.simulator.compute
+import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSwitch
+import org.opendc.simulator.resources.SimResourceSwitchMaxMin
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
* [SimBareMetalMachine] concurrently using weighted fair sharing.
*
+ * @param interpreter The interpreter to manage the machine's resources.
+ * @param parent The parent simulation system.
+ * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor.
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val scheduler: SimResourceScheduler, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
+public class SimFairShareHypervisor(
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem? = null,
+ scalingGovernor: ScalingGovernor? = null,
+ private val listener: SimHypervisor.Listener? = null
+) : SimAbstractHypervisor(interpreter, scalingGovernor) {
override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
- return SimResourceSwitchMaxMin(
- scheduler,
- object : SimResourceSwitchMaxMin.Listener {
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
- }
+ return SwitchSystem(ctx).switch
+ }
+
+ private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem {
+ val switch = SimResourceSwitchMaxMin(interpreter, this)
+
+ override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent
+
+ private var lastCpuUsage = 0.0
+ private var lastCpuDemand = 0.0
+ private var lastDemand = 0.0
+ private var lastActual = 0.0
+ private var lastOvercommit = 0.0
+ private var lastReport = Long.MIN_VALUE
+
+ override fun onConverge(timestamp: Long) {
+ val listener = listener ?: return
+ val counters = switch.counters
+
+ if (timestamp > lastReport) {
+ listener.onSliceFinish(
+ this@SimFairShareHypervisor,
+ (counters.demand - lastDemand).toLong(),
+ (counters.actual - lastActual).toLong(),
+ (counters.overcommit - lastOvercommit).toLong(),
+ 0L,
+ lastCpuUsage,
+ lastCpuDemand
+ )
}
- )
+ lastReport = timestamp
+
+ lastCpuDemand = switch.inputs.sumOf { it.demand }
+ lastCpuUsage = switch.inputs.sumOf { it.speed }
+ lastDemand = counters.demand
+ lastActual = counters.actual
+ lastOvercommit = counters.overcommit
+
+ val load = lastCpuDemand / ctx.cpus.sumOf { it.model.frequency }
+ triggerGovernors(load)
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
index 2ab3ea09..94c905b2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
@@ -22,9 +22,8 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
@@ -32,7 +31,9 @@ import kotlin.coroutines.CoroutineContext
public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override val id: String = "fair-share"
- override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
- return SimFairShareHypervisor(SimResourceSchedulerTrampoline(context, clock), listener)
- }
+ override fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?,
+ listener: SimHypervisor.Listener?
+ ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener = listener)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
index b66020f4..8e8c3698 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A service provider interface for constructing a [SimHypervisor].
@@ -40,5 +40,9 @@ public interface SimHypervisorProvider {
/**
* Create a [SimHypervisor] instance with the specified [listener].
*/
- public fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener? = null): SimHypervisor
+ public fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem? = null,
+ listener: SimHypervisor.Listener? = null
+ ): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index c2523a2a..5cbabc86 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -23,18 +23,18 @@
package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.MemoryUnit
-import java.time.Clock
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* A simulated execution context in which a bootable image runs. This interface represents the
* firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on
* which the image runs.
*/
-public interface SimMachineContext {
+public interface SimMachineContext : AutoCloseable {
/**
- * The virtual clock tracking simulation time.
+ * The resource interpreter that simulates the machine.
*/
- public val clock: Clock
+ public val interpreter: SimResourceInterpreter
/**
* The metadata associated with the context.
@@ -50,4 +50,9 @@ public interface SimMachineContext {
* The memory available on the machine
*/
public val memory: List<MemoryUnit>
+
+ /**
+ * Stop the workload.
+ */
+ public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
index 13c7d9b2..93c9ddfa 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
@@ -30,12 +30,12 @@ import org.opendc.simulator.resources.SimResourceProvider
*/
public interface SimProcessingUnit : SimResourceProvider {
/**
- * The model representing the static properties of the processing unit.
+ * The capacity of the processing unit, which can be adjusted by the workload if supported by the machine.
*/
- public val model: ProcessingUnit
+ public override var capacity: Double
/**
- * The current speed of the processing unit.
+ * The model representing the static properties of the processing unit.
*/
- public val speed: Double
+ public val model: ProcessingUnit
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index fd8e546f..f6ae18f7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -22,12 +22,14 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSwitch
+import org.opendc.simulator.resources.SimResourceSwitchExclusive
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*/
-public class SimSpaceSharedHypervisor : SimAbstractHypervisor() {
+public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter, null) {
override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean {
return switch.inputs.size - switch.outputs.size >= model.cpus.size
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
index 83b924d7..923b5bab 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
@@ -31,7 +31,9 @@ import kotlin.coroutines.CoroutineContext
public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
- override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
- return SimSpaceSharedHypervisor()
- }
+ override fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?,
+ listener: SimHypervisor.Listener?
+ ): SimHypervisor = SimSpaceSharedHypervisor(interpreter)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt
index 96f8775a..245877be 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt
@@ -22,13 +22,15 @@
package org.opendc.simulator.compute.cpufreq
+import org.opendc.simulator.compute.SimProcessingUnit
+
/**
* A CPUFreq [ScalingGovernor] that causes the highest possible frequency to be requested from the resource.
*/
public class PerformanceScalingGovernor : ScalingGovernor {
- override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic {
- override fun onLimit() {
- ctx.setTarget(ctx.cpu.model.frequency)
+ override fun createLogic(cpu: SimProcessingUnit): ScalingGovernor.Logic = object : ScalingGovernor.Logic {
+ override fun onStart() {
+ cpu.capacity = cpu.model.frequency
}
override fun toString(): String = "PerformanceScalingGovernor.Logic"
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt
index c9aea580..b7e7ffc6 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute.cpufreq
+import org.opendc.simulator.compute.SimProcessingUnit
+
/**
* A [ScalingGovernor] in the CPUFreq subsystem of OpenDC is responsible for scaling the frequency of simulated CPUs
* independent of the particular implementation of the CPU.
@@ -33,9 +35,9 @@ package org.opendc.simulator.compute.cpufreq
*/
public interface ScalingGovernor {
/**
- * Create the scaling logic for the specified [context]
+ * Create the scaling logic for the specified [cpu]
*/
- public fun createLogic(ctx: ScalingContext): Logic
+ public fun createLogic(cpu: SimProcessingUnit): Logic
/**
* The logic of the scaling governor.
@@ -48,7 +50,9 @@ public interface ScalingGovernor {
/**
* This method is invoked when the governor should re-decide the frequency limits.
+ *
+ * @param load The load of the system.
*/
- public fun onLimit() {}
+ public fun onLimit(load: Double) {}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
index 6f44d778..6328c8e4 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
@@ -20,67 +20,41 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.compute.power
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimProcessingUnit
-import org.opendc.simulator.compute.power.PowerModel
import java.util.*
import kotlin.math.max
import kotlin.math.min
/**
- * A [ScalingDriver] that scales the frequency of the processor based on a discrete set of frequencies.
+ * A [PowerDriver] that computes the power draw using multiple [PowerModel]s based on multiple frequency states.
*
* @param states A map describing the states of the driver.
*/
-public class PStateScalingDriver(states: Map<Double, PowerModel>) : ScalingDriver {
+public class PStatePowerDriver(states: Map<Double, PowerModel>) : PowerDriver {
/**
* The P-States defined by the user and ordered by key.
*/
private val states = TreeMap(states)
- override fun createLogic(machine: SimMachine): ScalingDriver.Logic = object : ScalingDriver.Logic {
- /**
- * The scaling contexts.
- */
- private val contexts = mutableListOf<ScalingContextImpl>()
-
- override fun createContext(cpu: SimProcessingUnit): ScalingContext {
- val ctx = ScalingContextImpl(machine, cpu)
- contexts.add(ctx)
- return ctx
- }
-
+ override fun createLogic(machine: SimMachine, cpus: List<SimProcessingUnit>): PowerDriver.Logic = object : PowerDriver.Logic {
override fun computePower(): Double {
var targetFreq = 0.0
var totalSpeed = 0.0
- for (ctx in contexts) {
- targetFreq = max(ctx.target, targetFreq)
- totalSpeed += ctx.cpu.speed
+ for (cpu in cpus) {
+ targetFreq = max(cpu.capacity, targetFreq)
+ totalSpeed += cpu.speed
}
val maxFreq = states.lastKey()
val (actualFreq, model) = states.ceilingEntry(min(maxFreq, targetFreq))
- val utilization = totalSpeed / (actualFreq * contexts.size)
+ val utilization = totalSpeed / (actualFreq * cpus.size)
return model.computePower(utilization)
}
- override fun toString(): String = "PStateScalingDriver.Logic"
- }
-
- private class ScalingContextImpl(
- override val machine: SimMachine,
- override val cpu: SimProcessingUnit
- ) : ScalingContext {
- var target = cpu.model.frequency
- private set
-
- override fun setTarget(freq: Double) {
- target = freq
- }
-
- override fun toString(): String = "PStateScalingDriver.Context"
+ override fun toString(): String = "PStatePowerDriver.Logic"
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt
index b4fd7550..a1a2b911 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt
@@ -20,30 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.compute.power
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimProcessingUnit
/**
- * A [ScalingDriver] is responsible for switching the processor to the correct frequency.
+ * A [PowerDriver] is responsible for switching the processor to the correct frequency.
*/
-public interface ScalingDriver {
+public interface PowerDriver {
/**
* Create the scaling logic for the specified [machine]
*/
- public fun createLogic(machine: SimMachine): Logic
+ public fun createLogic(machine: SimMachine, cpus: List<SimProcessingUnit>): Logic
/**
* The logic of the scaling driver.
*/
public interface Logic {
/**
- * Create the [ScalingContext] for the specified [cpu] instance.
- */
- public fun createContext(cpu: SimProcessingUnit): ScalingContext
-
- /**
* Compute the power consumption of the processor.
*
* @return The power consumption of the processor in W.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
index 18338079..5c5ceff5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
@@ -20,27 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.compute.power
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimProcessingUnit
/**
- * A [ScalingContext] is used to communicate frequency scaling changes between the [ScalingGovernor] and driver.
+ * A [PowerDriver] that computes the power consumption based on a single specified [power model][model].
*/
-public interface ScalingContext {
- /**
- * The machine the processing unit belongs to.
- */
- public val machine: SimMachine
+public class SimplePowerDriver(private val model: PowerModel) : PowerDriver {
+ override fun createLogic(machine: SimMachine, cpus: List<SimProcessingUnit>): PowerDriver.Logic = object : PowerDriver.Logic {
+ override fun computePower(): Double {
+ return model.computePower(machine.usage.value)
+ }
- /**
- * The processing unit associated with this context.
- */
- public val cpu: SimProcessingUnit
-
- /**
- * Target the processor to run at the specified target [frequency][freq].
- */
- public fun setTarget(freq: Double)
+ override fun toString(): String = "SimplePowerDriver.Logic"
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt
new file mode 100644
index 00000000..43662d93
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.util
+
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.SimResourceEvent
+
+/**
+ * A helper class to manage the lifecycle of a [SimWorkload]
+ */
+public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
+ /**
+ * The resource consumers which represent the lifecycle of the workload.
+ */
+ private val waiting = mutableSetOf<SimResourceConsumer>()
+
+ /**
+ * Wait for the specified [consumer] to complete before ending the lifecycle of the workload.
+ */
+ public fun waitFor(consumer: SimResourceConsumer): SimResourceConsumer {
+ waiting.add(consumer)
+ return object : SimResourceConsumer by consumer {
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ try {
+ consumer.onEvent(ctx, event)
+ } finally {
+ if (event == SimResourceEvent.Exit) {
+ complete(consumer)
+ }
+ }
+ }
+
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ try {
+ consumer.onFailure(ctx, cause)
+ } finally {
+ complete(consumer)
+ }
+ }
+
+ override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]"
+ }
+ }
+
+ /**
+ * Complete the specified [SimResourceConsumer].
+ */
+ private fun complete(consumer: SimResourceConsumer) {
+ if (waiting.remove(consumer) && waiting.isEmpty()) {
+ ctx.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 63c9d28c..de6832ca 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -23,8 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.compute.util.SimWorkloadLifecycle
import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
@@ -43,10 +42,11 @@ public class SimFlopsWorkload(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimMachineContext) {}
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ for (cpu in ctx.cpus) {
+ cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)))
+ }
}
override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index a3420e32..318a6b49 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -23,8 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.compute.util.SimWorkloadLifecycle
import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
@@ -42,11 +41,12 @@ public class SimRuntimeWorkload(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimMachineContext) {}
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- val limit = cpu.frequency * utilization
- return SimWorkConsumer((limit / 1000) * duration, utilization)
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ for (cpu in ctx.cpus) {
+ val limit = cpu.capacity * utilization
+ cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer((limit / 1000) * duration, utilization)))
+ }
}
override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index ffb332d1..6929f4d2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -24,6 +24,7 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.util.SimWorkloadLifecycle
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -44,33 +45,12 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
barrier = SimConsumerBarrier(ctx.cpus.size)
fragment = nextFragment()
- offset = ctx.clock.millis()
- }
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- return object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- val now = ctx.clock.millis()
- val fragment = fragment ?: return SimResourceCommand.Exit
- val usage = fragment.usage / fragment.cores
- val work = (fragment.duration / 1000) * usage
- val deadline = offset + fragment.duration
-
- assert(deadline >= now) { "Deadline already passed" }
-
- val cmd =
- if (cpu.id < fragment.cores && work > 0.0)
- SimResourceCommand.Consume(work, usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
+ offset = ctx.interpreter.clock.millis()
- if (barrier.enter()) {
- this@SimTraceWorkload.fragment = nextFragment()
- this@SimTraceWorkload.offset += fragment.duration
- }
+ val lifecycle = SimWorkloadLifecycle(ctx)
- return cmd
- }
+ for (cpu in ctx.cpus) {
+ cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model)))
}
}
@@ -87,6 +67,31 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
}
}
+ private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val now = ctx.clock.millis()
+ val fragment = fragment ?: return SimResourceCommand.Exit
+ val usage = fragment.usage / fragment.cores
+ val work = (fragment.duration / 1000) * usage
+ val deadline = offset + fragment.duration
+
+ assert(deadline >= now) { "Deadline already passed" }
+
+ val cmd =
+ if (cpu.id < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+
+ if (barrier.enter()) {
+ this@SimTraceWorkload.fragment = nextFragment()
+ this@SimTraceWorkload.offset += fragment.duration
+ }
+
+ return cmd
+ }
+ }
+
/**
* A fragment of the workload.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index bdc12bb5..b80665fa 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -23,8 +23,6 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
/**
* A model that characterizes the runtime behavior of some particular workload.
@@ -35,11 +33,8 @@ import org.opendc.simulator.resources.SimResourceConsumer
public interface SimWorkload {
/**
* This method is invoked when the workload is started.
+ *
+ * @param ctx The execution context in which the machine runs.
*/
public fun onStart(ctx: SimMachineContext)
-
- /**
- * Obtain the resource consumer for the specified processing unit.
- */
- public fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index 8886caa7..b15692ec 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -31,15 +31,16 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertDoesNotThrow
import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* Test suite for the [SimHypervisor] class.
@@ -93,8 +94,9 @@ internal class SimHypervisorTest {
),
)
- val machine = SimBareMetalMachine(coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
+ val platform = SimResourceInterpreter(coroutineContext, clock)
+ val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener)
launch {
machine.run(hypervisor)
@@ -164,11 +166,11 @@ internal class SimHypervisorTest {
)
)
+ val platform = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, model, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
+ val hypervisor = SimFairShareHypervisor(platform, listener = listener)
launch {
machine.run(hypervisor)
@@ -190,10 +192,33 @@ internal class SimHypervisorTest {
yield()
assertAll(
- { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") },
{ assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
+
+ @Test
+ fun testMultipleCPUs() = runBlockingSimulation {
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ val model = SimMachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+
+ val platform = SimResourceInterpreter(coroutineContext, clock)
+ val machine = SimBareMetalMachine(
+ platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+ val hypervisor = SimFairShareHypervisor(platform)
+
+ assertDoesNotThrow {
+ launch {
+ machine.run(hypervisor)
+ }
+ }
+
+ machine.close()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 205f2eca..69f562d2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -29,14 +29,14 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* Test suite for the [SimBareMetalMachine] class.
@@ -57,7 +57,11 @@ class SimMachineTest {
@Test
fun testFlopsWorkload() = runBlockingSimulation {
- val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
try {
machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
@@ -76,7 +80,11 @@ class SimMachineTest {
cpus = List(cpuNode.coreCount * 2) { ProcessingUnit(cpuNode, it % 2, 1000.0) },
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
try {
machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
@@ -90,7 +98,11 @@ class SimMachineTest {
@Test
fun testUsage() = runBlockingSimulation {
- val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
val res = mutableListOf<Double>()
val job = launch { machine.usage.toList(res) }
@@ -99,7 +111,7 @@ class SimMachineTest {
machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
yield()
job.cancel()
- assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" }
+ assertEquals(listOf(0.0, 1.0, 0.0), res) { "Machine is fully utilized" }
} finally {
machine.close()
}
@@ -107,7 +119,11 @@ class SimMachineTest {
@Test
fun testClose() = runBlockingSimulation {
- val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
machine.close()
assertDoesNotThrow { machine.close() }
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
index ef6f536d..dba3e9a1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
@@ -30,16 +30,16 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* A test suite for the [SimSpaceSharedHypervisor].
@@ -76,11 +76,11 @@ internal class SimSpaceSharedHypervisorTest {
),
)
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ SimResourceInterpreter(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
val colA = launch { machine.usage.toList(usagePm) }
launch { machine.run(hypervisor) }
@@ -112,11 +112,11 @@ internal class SimSpaceSharedHypervisorTest {
fun testRuntimeWorkload() = runBlockingSimulation {
val duration = 5 * 60L * 1000
val workload = SimRuntimeWorkload(duration)
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
@@ -135,11 +135,11 @@ internal class SimSpaceSharedHypervisorTest {
fun testFlopsWorkload() = runBlockingSimulation {
val duration = 5 * 60L * 1000
val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0)
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
@@ -156,11 +156,11 @@ internal class SimSpaceSharedHypervisorTest {
@Test
fun testTwoWorkloads() = runBlockingSimulation {
val duration = 5 * 60L * 1000
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
@@ -182,11 +182,11 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
@@ -206,11 +206,11 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadSucceeds() = runBlockingSimulation {
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt
index c482d348..8e8b09c8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt
@@ -26,23 +26,24 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import org.junit.jupiter.api.Test
+import org.opendc.simulator.compute.SimProcessingUnit
/**
- * Test suite for the [DemandScalingGovernor]
+ * Test suite for the [PerformanceScalingGovernor]
*/
-internal class DemandScalingGovernorTest {
+internal class PerformanceScalingGovernorTest {
@Test
- fun testSetDemandLimit() {
- val ctx = mockk<ScalingContext>(relaxUnitFun = true)
+ fun testSetStartLimit() {
+ val cpu = mockk<SimProcessingUnit>(relaxUnitFun = true)
- every { ctx.cpu.speed } returns 2100.0
+ every { cpu.model.frequency } returns 4100.0
+ every { cpu.speed } returns 2100.0
- val logic = DemandScalingGovernor().createLogic(ctx)
+ val logic = PerformanceScalingGovernor().createLogic(cpu)
logic.onStart()
- verify(exactly = 0) { ctx.setTarget(any()) }
+ logic.onLimit(1.0)
- logic.onLimit()
- verify(exactly = 1) { ctx.setTarget(2100.0) }
+ verify(exactly = 1) { cpu.capacity = 4100.0 }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
index bbea3ee2..35fd7c4c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.compute.power
import io.mockk.every
import io.mockk.mockk
@@ -28,18 +28,16 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimProcessingUnit
-import org.opendc.simulator.compute.power.ConstantPowerModel
-import org.opendc.simulator.compute.power.LinearPowerModel
/**
- * Test suite for [PStateScalingDriver].
+ * Test suite for [PStatePowerDriver].
*/
-internal class PStateScalingDriverTest {
+internal class PStatePowerDriverTest {
@Test
- fun testPowerWithoutGovernor() {
+ fun testPowerBaseline() {
val machine = mockk<SimBareMetalMachine>()
- val driver = PStateScalingDriver(
+ val driver = PStatePowerDriver(
sortedMapOf(
2800.0 to ConstantPowerModel(200.0),
3300.0 to ConstantPowerModel(300.0),
@@ -47,19 +45,19 @@ internal class PStateScalingDriverTest {
)
)
- val logic = driver.createLogic(machine)
+ val logic = driver.createLogic(machine, emptyList())
assertEquals(200.0, logic.computePower())
}
@Test
- fun testPowerWithSingleGovernor() {
+ fun testPowerWithSingleCpu() {
val machine = mockk<SimBareMetalMachine>()
val cpu = mockk<SimProcessingUnit>()
- every { cpu.model.frequency } returns 4100.0
+ every { cpu.capacity } returns 3200.0
every { cpu.speed } returns 1200.0
- val driver = PStateScalingDriver(
+ val driver = PStatePowerDriver(
sortedMapOf(
2800.0 to ConstantPowerModel(200.0),
3300.0 to ConstantPowerModel(300.0),
@@ -67,23 +65,26 @@ internal class PStateScalingDriverTest {
)
)
- val logic = driver.createLogic(machine)
-
- val scalingContext = logic.createContext(cpu)
- scalingContext.setTarget(3200.0)
+ val logic = driver.createLogic(machine, listOf(cpu))
assertEquals(300.0, logic.computePower())
}
@Test
- fun testPowerWithMultipleGovernors() {
+ fun testPowerWithMultipleCpus() {
val machine = mockk<SimBareMetalMachine>()
- val cpu = mockk<SimProcessingUnit>()
+ val cpus = listOf(
+ mockk<SimProcessingUnit>(),
+ mockk()
+ )
- every { cpu.model.frequency } returns 4100.0
- every { cpu.speed } returns 1200.0
+ every { cpus[0].capacity } returns 1000.0
+ every { cpus[0].speed } returns 1200.0
- val driver = PStateScalingDriver(
+ every { cpus[1].capacity } returns 3500.0
+ every { cpus[1].speed } returns 1200.0
+
+ val driver = PStatePowerDriver(
sortedMapOf(
2800.0 to ConstantPowerModel(200.0),
3300.0 to ConstantPowerModel(300.0),
@@ -91,13 +92,7 @@ internal class PStateScalingDriverTest {
)
)
- val logic = driver.createLogic(machine)
-
- val scalingContextA = logic.createContext(cpu)
- scalingContextA.setTarget(1000.0)
-
- val scalingContextB = logic.createContext(cpu)
- scalingContextB.setTarget(3400.0)
+ val logic = driver.createLogic(machine, cpus)
assertEquals(350.0, logic.computePower())
}
@@ -109,7 +104,7 @@ internal class PStateScalingDriverTest {
every { cpu.model.frequency } returns 4200.0
- val driver = PStateScalingDriver(
+ val driver = PStatePowerDriver(
sortedMapOf(
2800.0 to LinearPowerModel(200.0, 100.0),
3300.0 to LinearPowerModel(250.0, 150.0),
@@ -117,16 +112,14 @@ internal class PStateScalingDriverTest {
)
)
- val logic = driver.createLogic(machine)
-
- val scalingContext = logic.createContext(cpu)
+ val logic = driver.createLogic(machine, listOf(cpu))
every { cpu.speed } returns 1400.0
- scalingContext.setTarget(1400.0)
+ every { cpu.capacity } returns 1400.0
assertEquals(150.0, logic.computePower())
every { cpu.speed } returns 1400.0
- scalingContext.setTarget(4000.0)
+ every { cpu.capacity } returns 4000.0
assertEquals(235.0, logic.computePower())
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index cd5f33bd..b45b2a2f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var scheduler: SimResourceScheduler
+ private lateinit var interpreter: SimResourceInterpreter
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
+ interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock)
}
@State(Scope.Thread)
@@ -67,7 +67,7 @@ class SimResourceBenchmarks {
@Benchmark
fun benchmarkSource(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, scheduler)
+ val provider = SimResourceSource(4200.0, interpreter)
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -75,7 +75,7 @@ class SimResourceBenchmarks {
@Benchmark
fun benchmarkForwardOverhead(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, scheduler)
+ val provider = SimResourceSource(4200.0, interpreter)
val forwarder = SimResourceForwarder()
provider.startConsumer(forwarder)
return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace))
@@ -85,12 +85,12 @@ class SimResourceBenchmarks {
@Benchmark
fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(scheduler)
+ val switch = SimResourceSwitchMaxMin(interpreter)
- switch.addInput(SimResourceSource(3000.0, scheduler))
- switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -98,14 +98,14 @@ class SimResourceBenchmarks {
@Benchmark
fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(scheduler)
+ val switch = SimResourceSwitchMaxMin(interpreter)
- switch.addInput(SimResourceSource(3000.0, scheduler))
- switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
- repeat(3) { i ->
+ repeat(3) {
launch {
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -117,10 +117,10 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, scheduler))
- switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -130,12 +130,12 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, scheduler))
- switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
repeat(2) {
launch {
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
provider.consume(SimTraceConsumer(state.trace))
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 653b53e0..5fe7d7bb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -25,7 +25,10 @@ package org.opendc.simulator.resources
/**
* Abstract implementation of [SimResourceAggregator].
*/
-public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator {
+public abstract class SimAbstractResourceAggregator(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?
+) : SimResourceAggregator {
/**
* This method is invoked when the resource consumer consumes resources.
*/
@@ -39,7 +42,7 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
/**
* This method is invoked when the resource consumer finishes processing.
*/
- protected abstract fun doFinish(cause: Throwable?)
+ protected abstract fun doFinish()
/**
* This method is invoked when an input context is started.
@@ -51,8 +54,9 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
*/
protected abstract fun onInputFinished(input: Input)
+ /* SimResourceAggregator */
override fun addInput(input: SimResourceProvider) {
- check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
+ check(state != SimResourceState.Stopped) { "Aggregator has been stopped" }
val consumer = Consumer()
_inputs.add(input)
@@ -60,42 +64,75 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
input.startConsumer(consumer)
}
- override fun close() {
- output.close()
- }
-
- override val output: SimResourceProvider
- get() = _output
- private val _output = SimResourceForwarder()
-
override val inputs: Set<SimResourceProvider>
get() = _inputs
private val _inputs = mutableSetOf<SimResourceProvider>()
private val _inputConsumers = mutableListOf<Consumer>()
- protected val outputContext: SimResourceContext
- get() = context
- private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) {
- override val remainingWork: Double
- get() {
- val now = clock.millis()
-
- return if (_remainingWorkFlush < now) {
- _remainingWorkFlush = now
- _inputConsumers.sumOf { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it }
- } else {
- _remainingWork
+ /* SimResourceProvider */
+ override val state: SimResourceState
+ get() = _output.state
+
+ override val capacity: Double
+ get() = _output.capacity
+
+ override val speed: Double
+ get() = _output.speed
+
+ override val demand: Double
+ get() = _output.demand
+
+ override val counters: SimResourceCounters
+ get() = _output.counters
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ _output.startConsumer(consumer)
+ }
+
+ override fun cancel() {
+ _output.cancel()
+ }
+
+ override fun interrupt() {
+ _output.interrupt()
+ }
+
+ override fun close() {
+ _output.close()
+ }
+
+ private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) {
+ override fun createLogic(): SimResourceProviderLogic {
+ return object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
+ doIdle(deadline)
+ return Long.MAX_VALUE
}
- }
- private var _remainingWork: Double = 0.0
- private var _remainingWorkFlush: Long = Long.MIN_VALUE
- override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline)
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
+ doConsume(work, limit, deadline)
+ return Long.MAX_VALUE
+ }
- override fun onIdle(deadline: Long) = doIdle(deadline)
+ override fun onFinish(ctx: SimResourceControllableContext) {
+ doFinish()
+ }
- override fun onFinish() {
- doFinish(null)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
+
+ override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ return _inputConsumers.sumOf { it.remainingWork }
+ }
+ }
+ }
+
+ /**
+ * Flush the progress of the output if possible.
+ */
+ fun flush() {
+ ctx?.flush()
}
}
@@ -123,7 +160,13 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
*/
override val ctx: SimResourceContext
get() = _ctx!!
- var _ctx: SimResourceContext? = null
+ private var _ctx: SimResourceContext? = null
+
+ /**
+ * The remaining work of the consumer.
+ */
+ val remainingWork: Double
+ get() = _ctx?.remainingWork ?: 0.0
/**
* The resource command to run next.
@@ -132,7 +175,7 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
private fun updateCapacity() {
// Adjust capacity of output resource
- context.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
+ _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
}
/* Input */
@@ -149,7 +192,8 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
this.command = null
next
} else {
- context.flush(isIntermediate = true)
+ _output.flush()
+
next = command
this.command = null
next ?: SimResourceCommand.Idle()
@@ -162,11 +206,6 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
_ctx = ctx
updateCapacity()
- // Make sure we initialize the output if we have not done so yet
- if (context.state == SimResourceState.Pending) {
- context.start()
- }
-
onInputStarted(this)
}
SimResourceEvent.Capacity -> updateCapacity()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
deleted file mode 100644
index c03bfad5..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import java.time.Clock
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
- */
-public abstract class SimAbstractResourceContext(
- initialCapacity: Double,
- private val scheduler: SimResourceScheduler,
- private val consumer: SimResourceConsumer
-) : SimResourceContext, SimResourceFlushable {
-
- /**
- * The clock of the context.
- */
- public override val clock: Clock
- get() = scheduler.clock
-
- /**
- * The capacity of the resource.
- */
- public final override var capacity: Double = initialCapacity
- set(value) {
- val oldValue = field
-
- // Only changes will be propagated
- if (value != oldValue) {
- field = value
- onCapacityChange()
- }
- }
-
- /**
- * The amount of work still remaining at this instant.
- */
- override val remainingWork: Double
- get() {
- val activeCommand = activeCommand ?: return 0.0
- val now = clock.millis()
-
- return if (_remainingWorkFlush < now) {
- _remainingWorkFlush = now
- computeRemainingWork(activeCommand, now).also { _remainingWork = it }
- } else {
- _remainingWork
- }
- }
- private var _remainingWork: Double = 0.0
- private var _remainingWorkFlush: Long = Long.MIN_VALUE
-
- /**
- * A flag to indicate the state of the context.
- */
- public var state: SimResourceState = SimResourceState.Pending
- private set
-
- /**
- * The current processing speed of the resource.
- */
- final override var speed: Double = 0.0
- private set
-
- /**
- * This method is invoked when the resource will idle until the specified [deadline].
- */
- public abstract fun onIdle(deadline: Long)
-
- /**
- * This method is invoked when the resource will be consumed until the specified [work] was processed or the
- * [deadline] was reached.
- */
- public abstract fun onConsume(work: Double, limit: Double, deadline: Long)
-
- /**
- * This method is invoked when the resource consumer has finished.
- */
- public abstract fun onFinish()
-
- /**
- * Get the remaining work to process after a resource consumption.
- *
- * @param work The size of the resource consumption.
- * @param speed The speed of consumption.
- * @param duration The duration from the start of the consumption until now.
- * @return The amount of work remaining.
- */
- protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
- return if (duration > 0L) {
- val processed = duration / 1000.0 * speed
- max(0.0, work - processed)
- } else {
- 0.0
- }
- }
-
- /**
- * Start the consumer.
- */
- public fun start() {
- check(state == SimResourceState.Pending) { "Consumer is already started" }
-
- val now = clock.millis()
-
- state = SimResourceState.Active
- isProcessing = true
- latestFlush = now
-
- try {
- consumer.onEvent(this, SimResourceEvent.Start)
- activeCommand = interpret(consumer.onNext(this), now)
- } catch (cause: Throwable) {
- doFail(cause)
- } finally {
- isProcessing = false
- }
- }
-
- /**
- * Immediately stop the consumer.
- */
- public fun stop() {
- try {
- isProcessing = true
- latestFlush = clock.millis()
-
- flush(isIntermediate = true)
- doStop()
- } finally {
- isProcessing = false
- }
- }
-
- override fun flush(isIntermediate: Boolean) {
- // Flush is no-op when the consumer is finished or not yet started
- if (state != SimResourceState.Active) {
- return
- }
-
- val now = clock.millis()
-
- // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
- if (isIntermediate && latestFlush >= now) {
- return
- }
-
- try {
- val activeCommand = activeCommand ?: return
- val (timestamp, command) = activeCommand
-
- // Note: accessor is reliant on activeCommand being set
- val remainingWork = remainingWork
-
- isProcessing = true
-
- val duration = now - timestamp
- assert(duration >= 0) { "Flush in the past" }
-
- this.activeCommand = when (command) {
- is SimResourceCommand.Idle -> {
- // We should only continue processing the next command if:
- // 1. The resource consumer reached its deadline.
- // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
- if (command.deadline <= now || !isIntermediate) {
- next(now)
- } else {
- interpret(SimResourceCommand.Idle(command.deadline), now)
- }
- }
- is SimResourceCommand.Consume -> {
- // We should only continue processing the next command if:
- // 1. The resource consumption was finished.
- // 2. The resource capacity cannot satisfy the demand.
- // 4. The resource consumer should be interrupted (e.g., someone called .interrupt())
- if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
- next(now)
- } else {
- interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now)
- }
- }
- SimResourceCommand.Exit ->
- // Flush may not be called when the resource consumer has finished
- throw IllegalStateException()
- }
-
- // Flush remaining work cache
- _remainingWorkFlush = Long.MIN_VALUE
- } catch (cause: Throwable) {
- doFail(cause)
- } finally {
- latestFlush = now
- isProcessing = false
- }
- }
-
- override fun interrupt() {
- // Prevent users from interrupting the resource while they are constructing their next command, as this will
- // only lead to infinite recursion.
- if (isProcessing) {
- return
- }
-
- scheduler.schedule(this, isIntermediate = false)
- }
-
- override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
-
- /**
- * A flag to indicate that the resource is currently processing a command.
- */
- private var isProcessing: Boolean = false
-
- /**
- * The current command that is being processed.
- */
- private var activeCommand: CommandWrapper? = null
-
- /**
- * The latest timestamp at which the resource was flushed.
- */
- private var latestFlush: Long = Long.MIN_VALUE
-
- /**
- * Finish the consumer and resource provider.
- */
- private fun doStop() {
- val state = state
- this.state = SimResourceState.Stopped
-
- if (state == SimResourceState.Active) {
- activeCommand = null
- try {
- consumer.onEvent(this, SimResourceEvent.Exit)
- onFinish()
- } catch (cause: Throwable) {
- doFail(cause)
- }
- }
- }
-
- /**
- * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
- */
- private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? {
- when (command) {
- is SimResourceCommand.Idle -> {
- val deadline = command.deadline
-
- require(deadline >= now) { "Deadline already passed" }
-
- speed = 0.0
-
- onIdle(deadline)
- consumer.onEvent(this, SimResourceEvent.Run)
- }
- is SimResourceCommand.Consume -> {
- val work = command.work
- val limit = command.limit
- val deadline = command.deadline
-
- require(deadline >= now) { "Deadline already passed" }
-
- speed = min(capacity, limit)
- onConsume(work, limit, deadline)
- consumer.onEvent(this, SimResourceEvent.Run)
- }
- is SimResourceCommand.Exit -> {
- speed = 0.0
-
- doStop()
-
- // No need to set the next active command
- return null
- }
- }
-
- return CommandWrapper(now, command)
- }
-
- /**
- * Request the workload for more work.
- */
- private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now)
-
- /**
- * Compute the remaining work based on the specified [wrapper] and [timestamp][now].
- */
- private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double {
- val (timestamp, command) = wrapper
- val duration = now - timestamp
- return when (command) {
- is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration)
- is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0
- }
- }
-
- /**
- * Fail the resource consumer.
- */
- private fun doFail(cause: Throwable) {
- state = SimResourceState.Stopped
- activeCommand = null
-
- try {
- consumer.onFailure(this, cause)
- } catch (e: Throwable) {
- e.addSuppressed(cause)
- e.printStackTrace()
- }
-
- onFinish()
- }
-
- /**
- * Indicate that the capacity of the resource has changed.
- */
- private fun onCapacityChange() {
- // Do not inform the consumer if it has not been started yet
- if (state != SimResourceState.Active) {
- return
- }
-
- val isThrottled = speed > capacity
-
- consumer.onEvent(this, SimResourceEvent.Capacity)
-
- // Optimization: only flush changes if the new capacity cannot satisfy the active resource command.
- // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush().
- if (isThrottled) {
- flush(isIntermediate = true)
- }
- }
-
- /**
- * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
- */
- private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
new file mode 100644
index 00000000..de26f99e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+
+/**
+ * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations.
+ */
+public abstract class SimAbstractResourceProvider(
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem?,
+ initialCapacity: Double
+) : SimResourceProvider {
+ /**
+ * The capacity of the resource.
+ */
+ public override var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ ctx?.capacity = value
+ }
+
+ /**
+ * The current processing speed of the resource.
+ */
+ public override val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ /**
+ * The resource processing speed demand at this instant.
+ */
+ public override val demand: Double
+ get() = ctx?.demand ?: 0.0
+
+ /**
+ * The resource counters to track the execution metrics of the resource.
+ */
+ public override val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
+ /**
+ * The [SimResourceControllableContext] that is currently running.
+ */
+ protected var ctx: SimResourceControllableContext? = null
+ private set
+
+ /**
+ * The state of the resource provider.
+ */
+ final override var state: SimResourceState = SimResourceState.Pending
+ private set
+
+ /**
+ * Construct the [SimResourceProviderLogic] instance for a new consumer.
+ */
+ protected abstract fun createLogic(): SimResourceProviderLogic
+
+ /**
+ * Start the specified [SimResourceControllableContext].
+ */
+ protected open fun start(ctx: SimResourceControllableContext) {
+ ctx.start()
+ }
+
+ /**
+ * Update the counters of the resource provider.
+ */
+ protected fun updateCounters(ctx: SimResourceContext, work: Double) {
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+ counters.demand += work
+ counters.actual += work - remainingWork
+ counters.overcommit += remainingWork
+ }
+
+ final override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+ val ctx = interpreter.newContext(consumer, createLogic(), parent)
+
+ ctx.capacity = capacity
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+
+ start(ctx)
+ }
+
+ override fun close() {
+ cancel()
+ state = SimResourceState.Stopped
+ }
+
+ final override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ final override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.close()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+
+ override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]"
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
index bb4e6a2c..00972f43 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
@@ -25,24 +25,14 @@ package org.opendc.simulator.resources
/**
* A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource.
*/
-public interface SimResourceAggregator : AutoCloseable {
- /**
- * The output resource provider to which resource consumers can be attached.
- */
- public val output: SimResourceProvider
-
+public interface SimResourceAggregator : SimResourceProvider {
/**
* The input resources that will be switched between the output providers.
*/
public val inputs: Set<SimResourceProvider>
/**
- * Add the specified [input] to the switch.
+ * Add the specified [input] to the aggregator.
*/
public fun addInput(input: SimResourceProvider)
-
- /**
- * End the lifecycle of the aggregator.
- */
- public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 5665abd1..c39c1aca 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -25,7 +25,10 @@ package org.opendc.simulator.resources
/**
* A [SimResourceAggregator] that distributes the load equally across the input resources.
*/
-public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) {
+public class SimResourceAggregatorMaxMin(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem? = null
+) : SimAbstractResourceAggregator(interpreter, parent) {
private val consumers = mutableListOf<Input>()
override fun doConsume(work: Double, limit: Double, deadline: Long) {
@@ -35,7 +38,7 @@ public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimA
// Divide the requests over the available capacity of the input resources fairly
for (input in consumers) {
val inputCapacity = input.ctx.capacity
- val fraction = inputCapacity / outputContext.capacity
+ val fraction = inputCapacity / capacity
val grantedSpeed = limit * fraction
val grantedWork = fraction * work
@@ -53,7 +56,7 @@ public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimA
}
}
- override fun doFinish(cause: Throwable?) {
+ override fun doFinish() {
val iterator = consumers.iterator()
for (input in iterator) {
iterator.remove()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index 7c76c634..0d9a6106 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -45,6 +45,11 @@ public interface SimResourceContext {
public val speed: Double
/**
+ * The resource processing speed demand at this instant.
+ */
+ public val demand: Double
+
+ /**
* The amount of work still remaining at this instant.
*/
public val remainingWork: Double
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
index cf0bbb28..ceaca39a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
@@ -20,30 +20,45 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
-
-import org.opendc.simulator.compute.SimMachine
-import org.opendc.simulator.compute.SimProcessingUnit
-import org.opendc.simulator.compute.power.PowerModel
+package org.opendc.simulator.resources
/**
- * A [ScalingDriver] that ignores the instructions of the [ScalingGovernor] and directly computes the power consumption
- * based on the specified [power model][model].
+ * A controllable [SimResourceContext].
+ *
+ * This interface is used by resource providers to control the resource context.
*/
-public class SimpleScalingDriver(private val model: PowerModel) : ScalingDriver {
- override fun createLogic(machine: SimMachine): ScalingDriver.Logic = object : ScalingDriver.Logic {
- override fun createContext(cpu: SimProcessingUnit): ScalingContext {
- return object : ScalingContext {
- override val machine: SimMachine = machine
+public interface SimResourceControllableContext : SimResourceContext, AutoCloseable {
+ /**
+ * The state of the resource context.
+ */
+ public val state: SimResourceState
+
+ /**
+ * The capacity of the resource.
+ */
+ public override var capacity: Double
- override val cpu: SimProcessingUnit = cpu
+ /**
+ * Start the resource context.
+ */
+ public fun start()
- override fun setTarget(freq: Double) {}
- }
- }
+ /**
+ * Stop the resource context.
+ */
+ public override fun close()
- override fun computePower(): Double = model.computePower(machine.usage.value)
+ /**
+ * Invalidate the resource context's state.
+ *
+ * By invalidating the resource context's current state, the state is re-computed and the current progress is
+ * materialized during the next interpreter cycle. As a result, this call run asynchronously. See [flush] for the
+ * synchronous variant.
+ */
+ public fun invalidate()
- override fun toString(): String = "SimpleScalingDriver.Logic"
- }
+ /**
+ * Synchronously flush the progress of the resource context and materialize its current progress.
+ */
+ public fun flush()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
index f6a1a42e..725aa5bc 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
@@ -23,15 +23,26 @@
package org.opendc.simulator.resources
/**
- * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer.
+ * An interface that tracks cumulative counts of the work performed by a resource.
*/
-public interface SimResourceFlushable {
+public interface SimResourceCounters {
/**
- * Flush the current active resource consumption.
- *
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
+ * The amount of work that resource consumers wanted the resource to perform.
*/
- public fun flush(isIntermediate: Boolean)
+ public val demand: Double
+
+ /**
+ * The amount of work performed by the resource.
+ */
+ public val actual: Double
+
+ /**
+ * The amount of work that could not be completed due to overcommitted resources.
+ */
+ public val overcommit: Double
+
+ /**
+ * Reset the resource counters.
+ */
+ public fun reset()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
index b2759b7f..e0333ff9 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -25,19 +25,14 @@ package org.opendc.simulator.resources
/**
* A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
*/
-public interface SimResourceDistributor : AutoCloseable {
+public interface SimResourceDistributor : SimResourceConsumer {
/**
* The output resource providers to which resource consumers can be attached.
*/
public val outputs: Set<SimResourceProvider>
/**
- * The input resource that will be distributed over the consumers.
+ * Create a new output for the distributor.
*/
- public val input: SimResourceProvider
-
- /**
- * Add an output to the switch with the specified [capacity].
- */
- public fun addOutput(capacity: Double): SimResourceProvider
+ public fun newOutput(): SimResourceProvider
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index a76cb1e3..be9e89fb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -29,23 +29,22 @@ import kotlin.math.min
* A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
*/
public class SimResourceDistributorMaxMin(
- override val input: SimResourceProvider,
- private val scheduler: SimResourceScheduler,
- private val listener: Listener? = null
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem? = null
) : SimResourceDistributor {
override val outputs: Set<SimResourceProvider>
get() = _outputs
- private val _outputs = mutableSetOf<OutputProvider>()
+ private val _outputs = mutableSetOf<Output>()
/**
- * The active output contexts.
+ * The resource context of the consumer.
*/
- private val outputContexts: MutableList<OutputContext> = mutableListOf()
+ private var ctx: SimResourceContext? = null
/**
- * The total speed requested by the output resources.
+ * The active outputs.
*/
- private var totalRequestedSpeed = 0.0
+ private val activeOutputs: MutableList<Output> = mutableListOf()
/**
* The total amount of work requested by the output resources.
@@ -57,147 +56,83 @@ public class SimResourceDistributorMaxMin(
*/
private var totalAllocatedSpeed = 0.0
- /**
- * The total allocated work requested for the output resources.
- */
- private var totalAllocatedWork = 0.0
-
- /**
- * The amount of work that could not be performed due to over-committing resources.
- */
- private var totalOvercommittedWork = 0.0
-
- /**
- * The amount of work that was lost due to interference.
- */
- private var totalInterferedWork = 0.0
-
- /**
- * A flag to indicate that the switch is closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * An internal [SimResourceConsumer] implementation for switch inputs.
- */
- private val consumer = object : SimResourceConsumer {
- /**
- * The resource context of the consumer.
- */
- private lateinit var ctx: SimResourceContext
-
- val remainingWork: Double
- get() = ctx.remainingWork
-
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- return doNext(ctx.capacity)
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- this.ctx = ctx
- }
- SimResourceEvent.Exit -> {
- val iterator = _outputs.iterator()
- while (iterator.hasNext()) {
- val output = iterator.next()
-
- // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call to output.close()
- iterator.remove()
-
- output.close()
- }
- }
- else -> {}
- }
- }
- }
-
- /**
- * The total amount of remaining work.
- */
- private val totalRemainingWork: Double
- get() = consumer.remainingWork
-
- override fun addOutput(capacity: Double): SimResourceProvider {
- check(!isClosed) { "Distributor has been closed" }
-
- val provider = OutputProvider(capacity)
+ /* SimResourceDistributor */
+ override fun newOutput(): SimResourceProvider {
+ val provider = Output(ctx?.capacity ?: 0.0)
_outputs.add(provider)
return provider
}
- override fun close() {
- if (!isClosed) {
- isClosed = true
- input.cancel()
- }
+ /* SimResourceConsumer */
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return doNext(ctx.capacity)
}
- init {
- input.startConsumer(consumer)
- }
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ this.ctx = ctx
+ updateCapacity(ctx)
+ }
+ SimResourceEvent.Exit -> {
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
- /**
- * Indicate that the workloads should be re-scheduled.
- */
- private fun schedule() {
- input.interrupt()
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call to output.close()
+ iterator.remove()
+
+ output.close()
+ }
+ }
+ SimResourceEvent.Capacity -> updateCapacity(ctx)
+ else -> {}
+ }
}
/**
- * Schedule the work over the physical CPUs.
+ * Schedule the work of the outputs.
*/
- private fun doSchedule(capacity: Double): SimResourceCommand {
- // If there is no work yet, mark all inputs as idle.
- if (outputContexts.isEmpty()) {
+ private fun doNext(capacity: Double): SimResourceCommand {
+ // If there is no work yet, mark the input as idle.
+ if (activeOutputs.isEmpty()) {
return SimResourceCommand.Idle()
}
- val maxUsage = capacity
var duration: Double = Double.MAX_VALUE
var deadline: Long = Long.MAX_VALUE
- var availableSpeed = maxUsage
+ var availableSpeed = capacity
var totalRequestedSpeed = 0.0
var totalRequestedWork = 0.0
- // Flush the work of the outputs
- var outputIterator = outputContexts.listIterator()
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
-
- output.flush(isIntermediate = true)
+ // Pull in the work of the outputs
+ val outputIterator = activeOutputs.listIterator()
+ for (output in outputIterator) {
+ output.pull()
- if (output.activeCommand == SimResourceCommand.Exit) {
- // Apparently the output consumer has exited, so remove it from the scheduling queue.
+ // Remove outputs that have finished
+ if (output.isFinished) {
outputIterator.remove()
}
}
- // Sort the outputs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
- outputContexts.sort()
+ // Sort in-place the outputs based on their requested usage.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ activeOutputs.sort()
// Divide the available input capacity fairly across the outputs using max-min fair sharing
- outputIterator = outputContexts.listIterator()
- var remaining = outputContexts.size
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
+ var remaining = activeOutputs.size
+ for (output in activeOutputs) {
val availableShare = availableSpeed / remaining--
when (val command = output.activeCommand) {
is SimResourceCommand.Idle -> {
- // Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
-
output.actualSpeed = 0.0
}
is SimResourceCommand.Consume -> {
val grantedSpeed = min(output.allowedSpeed, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
// Ignore idle computation
@@ -212,216 +147,139 @@ public class SimResourceDistributorMaxMin(
output.actualSpeed = grantedSpeed
availableSpeed -= grantedSpeed
- // The duration that we want to run is that of the shortest request from an output
+ // The duration that we want to run is that of the shortest request of an output
duration = min(duration, command.work / grantedSpeed)
}
SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" }
}
}
- assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" }
+ assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" }
- this.totalRequestedSpeed = totalRequestedSpeed
this.totalRequestedWork = totalRequestedWork
- this.totalAllocatedSpeed = maxUsage - availableSpeed
- this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration)
+ this.totalAllocatedSpeed = capacity - availableSpeed
+ val totalAllocatedWork = min(
+ totalRequestedWork,
+ totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration)
+ )
return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
- SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
+ SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline)
else
SimResourceCommand.Idle(deadline)
}
- /**
- * Obtain the next command to perform.
- */
- private fun doNext(capacity: Double): SimResourceCommand {
- val totalRequestedWork = totalRequestedWork.toLong()
- val totalRemainingWork = totalRemainingWork.toLong()
- val totalAllocatedWork = totalAllocatedWork.toLong()
- val totalRequestedSpeed = totalRequestedSpeed
- val totalAllocatedSpeed = totalAllocatedSpeed
-
- // Force all inputs to re-schedule their work.
- val command = doSchedule(capacity)
-
- // Report metrics
- listener?.onSliceFinish(
- this,
- totalRequestedWork,
- totalAllocatedWork - totalRemainingWork,
- totalOvercommittedWork.toLong(),
- totalInterferedWork.toLong(),
- totalAllocatedSpeed,
- totalRequestedSpeed
- )
-
- totalInterferedWork = 0.0
- totalOvercommittedWork = 0.0
-
- return command
+ private fun updateCapacity(ctx: SimResourceContext) {
+ for (output in _outputs) {
+ output.capacity = ctx.capacity
+ }
}
/**
- * Event listener for hypervisor events.
+ * An internal [SimResourceProvider] implementation for switch outputs.
*/
- public interface Listener {
+ private inner class Output(capacity: Double) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceProviderLogic, Comparable<Output> {
/**
- * This method is invoked when a slice is finished.
+ * The current command that is processed by the resource.
*/
- public fun onSliceFinish(
- switch: SimResourceDistributor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- )
- }
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
- /**
- * An internal [SimResourceProvider] implementation for switch outputs.
- */
- private inner class OutputProvider(val capacity: Double) : SimResourceProvider {
/**
- * The [OutputContext] that is currently running.
+ * The processing speed that is allowed by the model constraints.
*/
- private var ctx: OutputContext? = null
+ var allowedSpeed: Double = 0.0
- override var state: SimResourceState = SimResourceState.Pending
- internal set
+ /**
+ * The actual processing speed.
+ */
+ var actualSpeed: Double = 0.0
- override fun startConsumer(consumer: SimResourceConsumer) {
- check(state == SimResourceState.Pending) { "Resource cannot be consumed" }
+ /**
+ * A flag to indicate that the output is finished.
+ */
+ val isFinished
+ get() = activeCommand is SimResourceCommand.Exit
- val ctx = OutputContext(this, consumer)
- this.ctx = ctx
- this.state = SimResourceState.Active
- outputContexts += ctx
+ /**
+ * The timestamp at which we received the last command.
+ */
+ private var lastCommandTimestamp: Long = Long.MIN_VALUE
- ctx.start()
- schedule()
- }
+ /* SimAbstractResourceProvider */
+ override fun createLogic(): SimResourceProviderLogic = this
- override fun close() {
- cancel()
+ override fun start(ctx: SimResourceControllableContext) {
+ activeOutputs += this
- if (state != SimResourceState.Stopped) {
- state = SimResourceState.Stopped
- _outputs.remove(this)
+ interpreter.batch {
+ ctx.start()
+ // Interrupt the input to re-schedule the resources
+ this@SimResourceDistributorMaxMin.ctx?.interrupt()
}
}
- override fun interrupt() {
- ctx?.interrupt()
- }
+ override fun close() {
+ val state = state
- override fun cancel() {
- val ctx = ctx
- if (ctx != null) {
- this.ctx = null
- ctx.stop()
- }
+ super.close()
if (state != SimResourceState.Stopped) {
- state = SimResourceState.Pending
+ _outputs.remove(this)
}
}
- }
-
- /**
- * A [SimAbstractResourceContext] for the output resources.
- */
- private inner class OutputContext(
- private val provider: OutputProvider,
- consumer: SimResourceConsumer
- ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> {
- /**
- * The current command that is processed by the vCPU.
- */
- var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
-
- /**
- * The processing speed that is allowed by the model constraints.
- */
- var allowedSpeed: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- var actualSpeed: Double = 0.0
-
- private fun reportOvercommit() {
- val remainingWork = remainingWork
- totalOvercommittedWork += remainingWork
- }
-
- override fun onIdle(deadline: Long) {
- reportOvercommit()
+ /* SimResourceProviderLogic */
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
allowedSpeed = 0.0
activeCommand = SimResourceCommand.Idle(deadline)
- }
+ lastCommandTimestamp = ctx.clock.millis()
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- reportOvercommit()
+ return Long.MAX_VALUE
+ }
- allowedSpeed = speed
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
+ allowedSpeed = ctx.speed
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ lastCommandTimestamp = ctx.clock.millis()
+
+ return Long.MAX_VALUE
}
- override fun onFinish() {
- reportOvercommit()
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
+ override fun onFinish(ctx: SimResourceControllableContext) {
activeCommand = SimResourceCommand.Exit
- provider.cancel()
+ lastCommandTimestamp = ctx.clock.millis()
}
- override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
- // Apply performance interference model
- val performanceScore = 1.0
+ override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0
- // Compute the remaining amount of work
return if (work > 0.0) {
- // Compute the fraction of compute time allocated to the VM
+ // Compute the fraction of compute time allocated to the output
val fraction = actualSpeed / totalAllocatedSpeed
- // Compute the work that was actually granted to the VM.
- val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
- val processed = processingAvailable * performanceScore
-
- val interferedWork = processingAvailable - processed
-
- totalInterferedWork += interferedWork
-
- max(0.0, work - processed)
+ // Compute the work that was actually granted to the output.
+ val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction
+ max(0.0, work - processingAvailable)
} else {
0.0
}
}
- private var isProcessing: Boolean = false
-
- override fun interrupt() {
- // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
- // to infinite recursion.
- if (isProcessing) {
- return
- }
-
- try {
- isProcessing = false
+ /* Comparable */
+ override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed)
- super.interrupt()
-
- // Force the scheduler to re-schedule
- schedule()
- } finally {
- isProcessing = true
+ /**
+ * Pull the next command if necessary.
+ */
+ fun pull() {
+ val ctx = ctx
+ if (ctx != null && lastCommandTimestamp < ctx.clock.millis()) {
+ ctx.flush()
}
}
-
- override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt
new file mode 100644
index 00000000..82631377
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * The resource interpreter is responsible for managing the interaction between resource consumer and provider.
+ *
+ * The interpreter centralizes the scheduling logic of state updates of resource context, allowing update propagation
+ * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
+ */
+public interface SimResourceInterpreter {
+ /**
+ * The [Clock] associated with this interpreter.
+ */
+ public val clock: Clock
+
+ /**
+ * Create a new [SimResourceControllableContext] with the given [provider].
+ *
+ * @param consumer The consumer logic.
+ * @param provider The logic of the resource provider.
+ * @param parent The system to which the resource context belongs.
+ */
+ public fun newContext(
+ consumer: SimResourceConsumer,
+ provider: SimResourceProviderLogic,
+ parent: SimResourceSystem? = null
+ ): SimResourceControllableContext
+
+ /**
+ * Start batching the execution of resource updates until [popBatch] is called.
+ *
+ * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs
+ * simultaneously) in a single state update.
+ *
+ * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the
+ * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called
+ * the same amount of times. To simplify batching, see [batch].
+ */
+ public fun pushBatch()
+
+ /**
+ * Stop the batching of resource updates and run the interpreter on the batch.
+ *
+ * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call.
+ */
+ public fun popBatch()
+
+ public companion object {
+ /**
+ * Construct a new [SimResourceInterpreter] implementation.
+ *
+ * @param context The coroutine context to use.
+ * @param clock The virtual simulation clock.
+ */
+ @JvmName("create")
+ public operator fun invoke(context: CoroutineContext, clock: Clock): SimResourceInterpreter {
+ return SimResourceInterpreterImpl(context, clock)
+ }
+ }
+}
+
+/**
+ * Batch the execution of several interrupts into a single call.
+ *
+ * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
+ */
+public inline fun SimResourceInterpreter.batch(block: () -> Unit) {
+ try {
+ pushBatch()
+ block()
+ } finally {
+ popBatch()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index 2f567a5e..f709ca17 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -27,7 +27,7 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/**
- * A [SimResourceProvider] provides some resource of type [R].
+ * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer].
*/
public interface SimResourceProvider : AutoCloseable {
/**
@@ -36,6 +36,26 @@ public interface SimResourceProvider : AutoCloseable {
public val state: SimResourceState
/**
+ * The resource capacity available at this instant.
+ */
+ public val capacity: Double
+
+ /**
+ * The current processing speed of the resource.
+ */
+ public val speed: Double
+
+ /**
+ * The resource processing speed demand at this instant.
+ */
+ public val demand: Double
+
+ /**
+ * The resource counters to track the execution metrics of the resource.
+ */
+ public val counters: SimResourceCounters
+
+ /**
* Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
*
* @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
new file mode 100644
index 00000000..5231ecf5
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlin.math.max
+
+/**
+ * The logic of a resource provider.
+ */
+public interface SimResourceProviderLogic {
+ /**
+ * This method is invoked when the resource is reported to idle until the specified [deadline].
+ *
+ * @param ctx The context in which the provider runs.
+ * @param deadline The deadline that was requested by the resource consumer.
+ * @return The instant at which to resume the consumer.
+ */
+ public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long
+
+ /**
+ * This method is invoked when the resource will be consumed until the specified amount of [work] was processed
+ * or [deadline] is reached.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param work The amount of work that was requested by the resource consumer.
+ * @param limit The limit on the work rate of the resource consumer.
+ * @param deadline The deadline that was requested by the resource consumer.
+ * @return The instant at which to resume the consumer.
+ */
+ public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long
+
+ /**
+ * This method is invoked when the progress of the resource consumer is materialized.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param work The amount of work that was requested by the resource consumer.
+ */
+ public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {}
+
+ /**
+ * This method is invoked when the resource consumer has finished.
+ */
+ public fun onFinish(ctx: SimResourceControllableContext)
+
+ /**
+ * Get the remaining work to process after a resource consumption.
+ *
+ * @param work The size of the resource consumption.
+ * @param speed The speed of consumption.
+ * @param duration The duration from the start of the consumption until now.
+ * @return The amount of work remaining.
+ */
+ public fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ return if (duration > 0L) {
+ val processed = duration / 1000.0 * speed
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
deleted file mode 100644
index a228c47b..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import java.time.Clock
-
-/**
- * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource
- * providers and consumers.
- *
- * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more
- * efficiently, reducing the overall work needed per update.
- */
-public interface SimResourceScheduler {
- /**
- * The [Clock] associated with this scheduler.
- */
- public val clock: Clock
-
- /**
- * Schedule a direct interrupt for the resource context represented by [flushable].
- *
- * @param flushable The resource context that needs to be flushed.
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
- */
- public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false)
-
- /**
- * Schedule an interrupt in the future for the resource context represented by [flushable].
- *
- * This method will override earlier calls to this method for the same [flushable].
- *
- * @param flushable The resource context that needs to be flushed.
- * @param timestamp The timestamp when the interrupt should happen.
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
- */
- public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false)
-
- /**
- * Batch the execution of several interrupts into a single call.
- *
- * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
- */
- public fun batch(block: () -> Unit)
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
deleted file mode 100644
index cdbb4a6c..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import org.opendc.utils.TimerScheduler
-import java.time.Clock
-import java.util.ArrayDeque
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after.
- *
- * @param clock The virtual simulation clock.
- */
-public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler {
- /**
- * The [TimerScheduler] to actually schedule the interrupts.
- */
- private val timers = TimerScheduler<Any>(context, clock)
-
- /**
- * A flag to indicate that an interrupt is currently running already.
- */
- private var isRunning: Boolean = false
-
- /**
- * The queue of resources to be flushed.
- */
- private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>()
-
- override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) {
- queue.add(flushable to isIntermediate)
-
- if (isRunning) {
- return
- }
-
- flush()
- }
-
- override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) {
- timers.startSingleTimerTo(flushable, timestamp) {
- schedule(flushable, isIntermediate)
- }
- }
-
- override fun batch(block: () -> Unit) {
- val wasAlreadyRunning = isRunning
- try {
- isRunning = true
- block()
- } finally {
- if (!wasAlreadyRunning) {
- isRunning = false
- }
- }
- }
-
- /**
- * Flush the scheduled queue.
- */
- private fun flush() {
- val visited = mutableSetOf<SimResourceFlushable>()
- try {
- isRunning = true
- while (queue.isNotEmpty()) {
- val (flushable, isIntermediate) = queue.poll()
- flushable.flush(isIntermediate)
- visited.add(flushable)
- }
- } finally {
- isRunning = false
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 3277b889..9f062cc3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -26,98 +26,39 @@ import kotlin.math.ceil
import kotlin.math.min
/**
- * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
+ * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity.
*
* @param initialCapacity The initial capacity of the resource.
- * @param scheduler The scheduler to schedule the interrupts.
+ * @param interpreter The interpreter that is used for managing the resource contexts.
+ * @param parent The parent resource system.
*/
public class SimResourceSource(
initialCapacity: Double,
- private val scheduler: SimResourceScheduler
-) : SimResourceProvider {
- /**
- * The current processing speed of the resource.
- */
- public val speed: Double
- get() = ctx?.speed ?: 0.0
-
- /**
- * The capacity of the resource.
- */
- public var capacity: Double = initialCapacity
- set(value) {
- field = value
- ctx?.capacity = value
- }
-
- /**
- * The [Context] that is currently running.
- */
- private var ctx: Context? = null
-
- override var state: SimResourceState = SimResourceState.Pending
- private set
-
- override fun startConsumer(consumer: SimResourceConsumer) {
- check(state == SimResourceState.Pending) { "Resource is in invalid state" }
- val ctx = Context(consumer)
-
- this.ctx = ctx
- this.state = SimResourceState.Active
-
- ctx.start()
- }
-
- override fun close() {
- cancel()
- state = SimResourceState.Stopped
- }
-
- override fun interrupt() {
- ctx?.interrupt()
- }
-
- override fun cancel() {
- val ctx = ctx
- if (ctx != null) {
- this.ctx = null
- ctx.stop()
- }
-
- if (state != SimResourceState.Stopped) {
- state = SimResourceState.Pending
- }
- }
-
- /**
- * Internal implementation of [SimResourceContext] for this class.
- */
- private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) {
- override fun onIdle(deadline: Long) {
- // Do not resume if deadline is "infinite"
- if (deadline != Long.MAX_VALUE) {
- scheduler.schedule(this, deadline)
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem? = null
+) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) {
+ override fun createLogic(): SimResourceProviderLogic {
+ return object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
+ return deadline
}
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- val until = min(deadline, clock.millis() + getDuration(work, speed))
- scheduler.schedule(this, until)
- }
- override fun onFinish() {
- cancel()
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
+ return min(deadline, ctx.clock.millis() + getDuration(work, speed))
+ }
- ctx = null
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
- if (this@SimResourceSource.state != SimResourceState.Stopped) {
- this@SimResourceSource.state = SimResourceState.Pending
+ override fun onFinish(ctx: SimResourceControllableContext) {
+ cancel()
}
}
-
- override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]"
}
+ override fun toString(): String = "SimResourceSource[capacity=$capacity]"
+
/**
* Compute the duration that a resource consumption will take with the specified [speed].
*/
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
index 53fec16a..e224285e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -37,9 +37,14 @@ public interface SimResourceSwitch : AutoCloseable {
public val inputs: Set<SimResourceProvider>
/**
- * Add an output to the switch with the specified [capacity].
+ * The resource counters to track the execution metrics of all switch resources.
*/
- public fun addOutput(capacity: Double): SimResourceProvider
+ public val counters: SimResourceCounters
+
+ /**
+ * Create a new output on the switch.
+ */
+ public fun newOutput(): SimResourceProvider
/**
* Add the specified [input] to the switch.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 1a9dd0bc..2950af80 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -44,11 +44,28 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
override val inputs: Set<SimResourceProvider>
get() = _inputs
- override fun addOutput(capacity: Double): SimResourceProvider {
+ override val counters: SimResourceCounters = object : SimResourceCounters {
+ override val demand: Double
+ get() = _inputs.sumOf { it.counters.demand }
+ override val actual: Double
+ get() = _inputs.sumOf { it.counters.actual }
+ override val overcommit: Double
+ get() = _inputs.sumOf { it.counters.overcommit }
+
+ override fun reset() {
+ for (input in _inputs) {
+ input.counters.reset()
+ }
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ override fun newOutput(): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
- val output = Provider(capacity, forwarder)
+ val output = Provider(forwarder)
_outputs += output
return output
}
@@ -84,13 +101,9 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
_inputs.forEach(SimResourceProvider::cancel)
}
- private inner class Provider(
- private val capacity: Double,
- private val forwarder: SimResourceTransformer
- ) : SimResourceProvider by forwarder {
+ private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder {
override fun close() {
// We explicitly do not close the forwarder here in order to re-use it across output resources.
-
_outputs -= this
availableResources += forwarder
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index 5dc1e68d..684a1b52 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -22,23 +22,31 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.*
-
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
* fair sharing.
*/
public class SimResourceSwitchMaxMin(
- scheduler: SimResourceScheduler,
- private val listener: Listener? = null
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem? = null
) : SimResourceSwitch {
- private val _outputs = mutableSetOf<SimResourceProvider>()
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
override val outputs: Set<SimResourceProvider>
- get() = _outputs
+ get() = distributor.outputs
- private val _inputs = mutableSetOf<SimResourceProvider>()
+ /**
+ * The input resources that will be switched between the output providers.
+ */
override val inputs: Set<SimResourceProvider>
- get() = _inputs
+ get() = aggregator.inputs
+
+ /**
+ * The resource counters to track the execution metrics of all switch resources.
+ */
+ override val counters: SimResourceCounters
+ get() = aggregator.counters
/**
* A flag to indicate that the switch was closed.
@@ -48,37 +56,24 @@ public class SimResourceSwitchMaxMin(
/**
* The aggregator to aggregate the resources.
*/
- private val aggregator = SimResourceAggregatorMaxMin(scheduler)
+ private val aggregator = SimResourceAggregatorMaxMin(interpreter, parent)
/**
* The distributor to distribute the aggregated resources.
*/
- private val distributor = SimResourceDistributorMaxMin(
- aggregator.output, scheduler,
- object : SimResourceDistributorMaxMin.Listener {
- override fun onSliceFinish(
- switch: SimResourceDistributor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
- }
- }
- )
+ private val distributor = SimResourceDistributorMaxMin(interpreter, parent)
+
+ init {
+ aggregator.startConsumer(distributor)
+ }
/**
- * Add an output to the switch represented by [resource].
+ * Add an output to the switch.
*/
- override fun addOutput(capacity: Double): SimResourceProvider {
+ override fun newOutput(): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
- val provider = distributor.addOutput(capacity)
- _outputs.add(provider)
- return provider
+ return distributor.newOutput()
}
/**
@@ -93,26 +88,7 @@ public class SimResourceSwitchMaxMin(
override fun close() {
if (!isClosed) {
isClosed = true
- distributor.close()
aggregator.close()
}
}
-
- /**
- * Event listener for hypervisor events.
- */
- public interface Listener {
- /**
- * This method is invoked when a slice is finished.
- */
- public fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- )
- }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt
new file mode 100644
index 00000000..609262cb
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A system of possible multiple sub-resources.
+ *
+ * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the
+ * resource provider.
+ */
+public interface SimResourceSystem {
+ /**
+ * The parent system to which this system belongs or `null` if it has no parent.
+ */
+ public val parent: SimResourceSystem?
+
+ /**
+ * This method is invoked when the system has converged to a steady-state.
+ *
+ * @param timestamp The timestamp at which the system converged.
+ */
+ public fun onConverge(timestamp: Long)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index 32f3f573..fd3d1230 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+
/**
* A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider.
*
@@ -53,6 +55,19 @@ public class SimResourceTransformer(
override var state: SimResourceState = SimResourceState.Pending
private set
+ override val capacity: Double
+ get() = ctx?.capacity ?: 0.0
+
+ override val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ override val demand: Double
+ get() = ctx?.demand ?: 0.0
+
+ override val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
override fun startConsumer(consumer: SimResourceConsumer) {
check(state == SimResourceState.Pending) { "Resource is in invalid state" }
@@ -97,10 +112,15 @@ public class SimResourceTransformer(
start()
}
+ updateCounters(ctx)
+
return if (state == SimResourceState.Stopped) {
SimResourceCommand.Exit
} else if (delegate != null) {
val command = transform(ctx, delegate.onNext(ctx))
+
+ _work = if (command is SimResourceCommand.Consume) command.work else 0.0
+
if (command == SimResourceCommand.Exit) {
// Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
// reset beforehand the existing state and check whether it has been updated afterwards
@@ -169,6 +189,22 @@ public class SimResourceTransformer(
state = SimResourceState.Pending
}
}
+
+ /**
+ * Counter to track the current submitted work.
+ */
+ private var _work = 0.0
+
+ /**
+ * Update the resource counters for the transformer.
+ */
+ private fun updateCounters(ctx: SimResourceContext) {
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+ counters.demand += _work
+ counters.actual += _work - remainingWork
+ counters.overcommit += remainingWork
+ }
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
new file mode 100644
index 00000000..46c5c63f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -0,0 +1,422 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.impl
+
+import org.opendc.simulator.resources.*
+import java.time.Clock
+import kotlin.math.min
+
+/**
+ * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
+ */
+internal class SimResourceContextImpl(
+ override val parent: SimResourceSystem?,
+ private val interpreter: SimResourceInterpreterImpl,
+ private val consumer: SimResourceConsumer,
+ private val logic: SimResourceProviderLogic
+) : SimResourceControllableContext, SimResourceSystem {
+ /**
+ * The clock of the context.
+ */
+ override val clock: Clock
+ get() = interpreter.clock
+
+ /**
+ * The capacity of the resource.
+ */
+ override var capacity: Double = 0.0
+ set(value) {
+ val oldValue = field
+
+ // Only changes will be propagated
+ if (value != oldValue) {
+ field = value
+ onCapacityChange()
+ }
+ }
+
+ /**
+ * The amount of work still remaining at this instant.
+ */
+ override val remainingWork: Double
+ get() {
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ computeRemainingWork(now).also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
+
+ /**
+ * A flag to indicate the state of the context.
+ */
+ override val state: SimResourceState
+ get() = _state
+ private var _state = SimResourceState.Pending
+
+ /**
+ * The current processing speed of the resource.
+ */
+ override val speed: Double
+ get() = _speed
+ private var _speed = 0.0
+
+ /**
+ * The current resource processing demand.
+ */
+ override val demand: Double
+ get() = _limit
+
+ private val counters = object : SimResourceCounters {
+ override var demand: Double = 0.0
+ override var actual: Double = 0.0
+ override var overcommit: Double = 0.0
+
+ override fun reset() {
+ demand = 0.0
+ actual = 0.0
+ overcommit = 0.0
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ /**
+ * The current state of the resource context.
+ */
+ private var _timestamp: Long = Long.MIN_VALUE
+ private var _work: Double = 0.0
+ private var _limit: Double = 0.0
+ private var _deadline: Long = Long.MAX_VALUE
+
+ /**
+ * The update flag indicating why the update was triggered.
+ */
+ private var _flag: Flag = Flag.None
+
+ /**
+ * The current pending update.
+ */
+ private var _pendingUpdate: SimResourceInterpreterImpl.Update? = null
+
+ override fun start() {
+ check(_state == SimResourceState.Pending) { "Consumer is already started" }
+ interpreter.batch {
+ consumer.onEvent(this, SimResourceEvent.Start)
+ _state = SimResourceState.Active
+ interrupt()
+ }
+ }
+
+ override fun close() {
+ if (_state != SimResourceState.Stopped) {
+ interpreter.batch {
+ _state = SimResourceState.Stopped
+ doStop()
+ }
+ }
+ }
+
+ override fun interrupt() {
+ if (_state == SimResourceState.Stopped) {
+ return
+ }
+
+ enableFlag(Flag.Interrupt)
+ scheduleUpdate()
+ }
+
+ override fun invalidate() {
+ if (_state == SimResourceState.Stopped) {
+ return
+ }
+
+ enableFlag(Flag.Invalidate)
+ scheduleUpdate()
+ }
+
+ override fun flush() {
+ if (_state == SimResourceState.Stopped) {
+ return
+ }
+
+ interpreter.scheduleSync(this)
+ }
+
+ /**
+ * Determine whether the state of the resource context should be updated.
+ */
+ fun requiresUpdate(timestamp: Long): Boolean {
+ // Either the resource context is flagged or there is a pending update at this timestamp
+ return _flag != Flag.None || _pendingUpdate?.timestamp == timestamp
+ }
+
+ /**
+ * Update the state of the resource context.
+ */
+ fun doUpdate(timestamp: Long) {
+ try {
+ val oldState = _state
+ val newState = doUpdate(timestamp, oldState)
+
+ _state = newState
+ _flag = Flag.None
+
+ when (newState) {
+ SimResourceState.Pending ->
+ if (oldState != SimResourceState.Pending) {
+ throw IllegalStateException("Illegal transition to pending state")
+ }
+ SimResourceState.Stopped ->
+ if (oldState != SimResourceState.Stopped) {
+ doStop()
+ }
+ else -> {}
+ }
+ } catch (cause: Throwable) {
+ doFail(cause)
+ } finally {
+ _remainingWorkFlush = Long.MIN_VALUE
+ _timestamp = timestamp
+ }
+ }
+
+ override fun onConverge(timestamp: Long) {
+ if (_state == SimResourceState.Active) {
+ consumer.onEvent(this, SimResourceEvent.Run)
+ }
+ }
+
+ override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]"
+
+ /**
+ * Update the state of the resource context.
+ */
+ private fun doUpdate(timestamp: Long, state: SimResourceState): SimResourceState {
+ return when (state) {
+ // Resource context is not active, so its state will not update
+ SimResourceState.Pending, SimResourceState.Stopped -> state
+ SimResourceState.Active -> {
+ val isInterrupted = _flag == Flag.Interrupt
+ val remainingWork = remainingWork
+ val isConsume = _limit > 0.0
+
+ // Update the resource counters only if there is some progress
+ if (timestamp > _timestamp) {
+ logic.onUpdate(this, _work)
+ }
+
+ // We should only continue processing the next command if:
+ // 1. The resource consumption was finished.
+ // 2. The resource capacity cannot satisfy the demand.
+ // 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) {
+ next(timestamp)
+ } else if (isConsume) {
+ interpret(SimResourceCommand.Consume(remainingWork, _limit, _deadline), timestamp)
+ } else {
+ interpret(SimResourceCommand.Idle(_deadline), timestamp)
+ }
+ }
+ }
+ }
+
+ /**
+ * Stop the resource context.
+ */
+ private fun doStop() {
+ try {
+ consumer.onEvent(this, SimResourceEvent.Exit)
+ logic.onFinish(this)
+ } catch (cause: Throwable) {
+ doFail(cause)
+ }
+ }
+
+ /**
+ * Fail the resource consumer.
+ */
+ private fun doFail(cause: Throwable) {
+ try {
+ consumer.onFailure(this, cause)
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ e.printStackTrace()
+ }
+
+ logic.onFinish(this)
+ }
+
+ /**
+ * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ */
+ private fun interpret(command: SimResourceCommand, now: Long): SimResourceState {
+ return when (command) {
+ is SimResourceCommand.Idle -> {
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ _speed = 0.0
+ _work = 0.0
+ _limit = 0.0
+ _deadline = deadline
+
+ val timestamp = logic.onIdle(this, deadline)
+ scheduleUpdate(timestamp)
+
+ SimResourceState.Active
+ }
+ is SimResourceCommand.Consume -> {
+ val work = command.work
+ val limit = command.limit
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ _speed = min(capacity, limit)
+ _work = work
+ _limit = limit
+ _deadline = deadline
+
+ val timestamp = logic.onConsume(this, work, limit, deadline)
+ scheduleUpdate(timestamp)
+
+ SimResourceState.Active
+ }
+ is SimResourceCommand.Exit -> {
+ _speed = 0.0
+ _work = 0.0
+ _limit = 0.0
+ _deadline = Long.MAX_VALUE
+
+ SimResourceState.Stopped
+ }
+ }
+ }
+
+ /**
+ * Request the workload for more work.
+ */
+ private fun next(now: Long): SimResourceState = interpret(consumer.onNext(this), now)
+
+ /**
+ * Compute the remaining work based on the current state.
+ */
+ private fun computeRemainingWork(now: Long): Double {
+ return if (_work > 0.0)
+ logic.getRemainingWork(this, _work, speed, now - _timestamp)
+ else 0.0
+ }
+
+ /**
+ * Indicate that the capacity of the resource has changed.
+ */
+ private fun onCapacityChange() {
+ // Do not inform the consumer if it has not been started yet
+ if (state != SimResourceState.Active) {
+ return
+ }
+
+ val isThrottled = speed > capacity
+
+ interpreter.batch {
+ // Inform the consumer of the capacity change. This might already trigger an interrupt.
+ consumer.onEvent(this, SimResourceEvent.Capacity)
+
+ // Optimization: only invalidate context if the new capacity cannot satisfy the active resource command.
+ if (isThrottled) {
+ invalidate()
+ }
+ }
+ }
+
+ /**
+ * Enable the specified [flag] taking into account precedence.
+ */
+ private fun enableFlag(flag: Flag) {
+ _flag = when (_flag) {
+ Flag.None -> flag
+ Flag.Invalidate ->
+ when (flag) {
+ Flag.None -> flag
+ else -> flag
+ }
+ Flag.Interrupt ->
+ when (flag) {
+ Flag.None, Flag.Invalidate -> flag
+ else -> flag
+ }
+ }
+ }
+
+ /**
+ * Schedule an update for this resource context.
+ */
+ private fun scheduleUpdate() {
+ // Cancel the pending update
+ val pendingUpdate = _pendingUpdate
+ if (pendingUpdate != null) {
+ _pendingUpdate = null
+ pendingUpdate.cancel()
+ }
+
+ interpreter.scheduleImmediate(this)
+ }
+
+ /**
+ * Schedule a delayed update for this resource context.
+ */
+ private fun scheduleUpdate(timestamp: Long) {
+ val pendingUpdate = _pendingUpdate
+ if (pendingUpdate != null) {
+ if (pendingUpdate.timestamp == timestamp) {
+ // Fast-path: A pending update for the same timestamp already exists
+ return
+ } else {
+ // Cancel the old pending update
+ _pendingUpdate = null
+ pendingUpdate.cancel()
+ }
+ }
+
+ if (timestamp != Long.MAX_VALUE) {
+ _pendingUpdate = interpreter.scheduleDelayed(this, timestamp)
+ }
+ }
+
+ /**
+ * An enumeration of flags that can be assigned to a resource context to indicate whether they are invalidated or
+ * interrupted.
+ */
+ enum class Flag {
+ None,
+ Interrupt,
+ Invalidate
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
index ddbe1ca0..827019c5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
@@ -20,17 +20,23 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.resources.impl
+
+import org.opendc.simulator.resources.SimResourceCounters
/**
- * A CPUFreq [ScalingGovernor] that requests the frequency based on the utilization of the machine.
+ * Mutable implementation of the [SimResourceCounters] interface.
*/
-public class DemandScalingGovernor : ScalingGovernor {
- override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic {
- override fun onLimit() {
- ctx.setTarget(ctx.cpu.speed)
- }
+internal class SimResourceCountersImpl : SimResourceCounters {
+ override var demand: Double = 0.0
+ override var actual: Double = 0.0
+ override var overcommit: Double = 0.0
- override fun toString(): String = "DemandScalingGovernor.Logic"
+ override fun reset() {
+ demand = 0.0
+ actual = 0.0
+ overcommit = 0.0
}
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
new file mode 100644
index 00000000..cb0d6160
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
@@ -0,0 +1,331 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.impl
+
+import kotlinx.coroutines.Delay
+import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.Runnable
+import org.opendc.simulator.resources.*
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [SimResourceInterpreter] queues all interrupts that occur during execution to be executed after.
+ *
+ * @param context The coroutine context to use.
+ * @param clock The virtual simulation clock.
+ */
+internal class SimResourceInterpreterImpl(private val context: CoroutineContext, override val clock: Clock) : SimResourceInterpreter {
+ /**
+ * The [Delay] instance that provides scheduled execution of [Runnable]s.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+
+ /**
+ * The queue of resource updates that are scheduled for immediate execution.
+ */
+ private val queue = ArrayDeque<Update>()
+
+ /**
+ * A priority queue containing the resource updates to be scheduled in the future.
+ */
+ private val futureQueue = PriorityQueue<Update>()
+
+ /**
+ * The stack of interpreter invocations to occur in the future.
+ */
+ private val futureInvocations = ArrayDeque<Invocation>()
+
+ /**
+ * The systems that have been visited during the interpreter cycle.
+ */
+ private val visited = linkedSetOf<SimResourceSystem>()
+
+ /**
+ * The index in the batch stack.
+ */
+ private var batchIndex = 0
+
+ /**
+ * A flag to indicate that the interpreter is currently active.
+ */
+ private val isRunning: Boolean
+ get() = batchIndex > 0
+
+ /**
+ * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle.
+ *
+ * This method should be used when the state of a resource context is invalidated/interrupted and needs to be
+ * re-computed. In case no interpreter is currently active, the interpreter will be started.
+ */
+ fun scheduleImmediate(ctx: SimResourceContextImpl) {
+ queue.add(Update(ctx, Long.MIN_VALUE))
+
+ // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active interpreter.
+ if (isRunning) {
+ return
+ }
+
+ try {
+ batchIndex++
+ runInterpreter()
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Update the specified [ctx] synchronously.
+ */
+ fun scheduleSync(ctx: SimResourceContextImpl) {
+ ctx.doUpdate(clock.millis())
+
+ if (visited.add(ctx)) {
+ collectAncestors(ctx, visited)
+ }
+
+ // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active interpreter.
+ if (isRunning) {
+ return
+ }
+
+ try {
+ batchIndex++
+ runInterpreter()
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Schedule the interpreter to run at [timestamp] to update the resource contexts.
+ *
+ * This method will override earlier calls to this method for the same [ctx].
+ *
+ * @param ctx The resource context to which the event applies.
+ * @param timestamp The timestamp when the interrupt should happen.
+ */
+ fun scheduleDelayed(ctx: SimResourceContextImpl, timestamp: Long): Update {
+ val now = clock.millis()
+ val futureQueue = futureQueue
+
+ require(timestamp >= now) { "Timestamp must be in the future" }
+
+ val update = Update(ctx, timestamp)
+ futureQueue.add(update)
+
+ // Optimization: Check if we need to push the interruption forward. Note that we check by timer reference.
+ if (futureQueue.peek() === update) {
+ trySchedule(futureQueue, futureInvocations)
+ }
+
+ return update
+ }
+
+ override fun newContext(
+ consumer: SimResourceConsumer,
+ provider: SimResourceProviderLogic,
+ parent: SimResourceSystem?
+ ): SimResourceControllableContext = SimResourceContextImpl(parent, this, consumer, provider)
+
+ override fun pushBatch() {
+ batchIndex++
+ }
+
+ override fun popBatch() {
+ try {
+ // Flush the work if the platform is not already running
+ if (batchIndex == 1 && queue.isNotEmpty()) {
+ runInterpreter()
+ }
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Interpret all actions that are scheduled for the current timestamp.
+ */
+ private fun runInterpreter() {
+ val now = clock.millis()
+ val queue = queue
+ val futureQueue = futureQueue
+ val futureInvocations = futureInvocations
+ val visited = visited
+
+ // Execute all scheduled updates at current timestamp
+ while (true) {
+ val update = futureQueue.peek() ?: break
+
+ assert(update.timestamp >= now) { "Internal inconsistency: found update of the past" }
+
+ if (update.timestamp > now && !update.isCancelled) {
+ // Schedule a task for the next event to occur.
+ trySchedule(futureQueue, futureInvocations)
+ break
+ }
+
+ futureQueue.poll()
+
+ if (update(now) && visited.add(update.ctx)) {
+ collectAncestors(update.ctx, visited)
+ }
+ }
+
+ // Repeat execution of all immediate updates until the system has converged to a steady-state
+ // We have to take into account that the onConverge callback can also trigger new actions.
+ do {
+ // Execute all immediate updates
+ while (true) {
+ val update = queue.poll() ?: break
+ if (update(now) && visited.add(update.ctx)) {
+ collectAncestors(update.ctx, visited)
+ }
+ }
+
+ for (system in visited) {
+ system.onConverge(now)
+ }
+
+ visited.clear()
+ } while (queue.isNotEmpty())
+ }
+
+ /**
+ * Try to schedule the next interpreter event.
+ */
+ private fun trySchedule(queue: PriorityQueue<Update>, scheduled: ArrayDeque<Invocation>) {
+ val nextTimer = queue.peek()
+ val now = clock.millis()
+
+ // Check whether we need to update our schedule:
+ if (nextTimer == null) {
+ // Case 1: all timers are cancelled
+ for (invocation in scheduled) {
+ invocation.cancel()
+ }
+ scheduled.clear()
+ return
+ }
+
+ while (true) {
+ val invocation = scheduled.peekFirst()
+ if (invocation == null || invocation.timestamp > nextTimer.timestamp) {
+ // Case 2: A new timer was registered ahead of the other timers.
+ // Solution: Schedule a new scheduler invocation
+ val nextTimestamp = nextTimer.timestamp
+ @OptIn(InternalCoroutinesApi::class)
+ val handle = delay.invokeOnTimeout(
+ nextTimestamp - now,
+ {
+ try {
+ batchIndex++
+ runInterpreter()
+ } finally {
+ batchIndex--
+ }
+ },
+ context
+ )
+ scheduled.addFirst(Invocation(nextTimestamp, handle))
+ break
+ } else if (invocation.timestamp < nextTimer.timestamp) {
+ // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted
+ // Solution: Cancel the next scheduler invocation
+ invocation.cancel()
+ scheduled.pollFirst()
+ } else {
+ break
+ }
+ }
+ }
+
+ /**
+ * Collect all the ancestors of the specified [system].
+ */
+ private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) {
+ val parent = system.parent
+ if (parent != null) {
+ systems.add(parent)
+ collectAncestors(parent, systems)
+ }
+ }
+
+ /**
+ * A future interpreter invocation.
+ *
+ * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case
+ * the invocation is not needed anymore, it can be cancelled via [cancel].
+ */
+ private data class Invocation(
+ @JvmField val timestamp: Long,
+ @JvmField private val disposableHandle: DisposableHandle
+ ) {
+ /**
+ * Cancel the interpreter invocation.
+ */
+ fun cancel() = disposableHandle.dispose()
+ }
+
+ /**
+ * An update call for [ctx] that is scheduled for [timestamp].
+ *
+ * This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be
+ * cancelled if the resource context was invalidated in the meantime.
+ */
+ class Update(@JvmField val ctx: SimResourceContextImpl, @JvmField val timestamp: Long) : Comparable<Update> {
+ /**
+ * A flag to indicate that the task has been cancelled.
+ */
+ @JvmField
+ var isCancelled: Boolean = false
+
+ /**
+ * Cancel the update.
+ */
+ fun cancel() {
+ isCancelled = true
+ }
+
+ /**
+ * Immediately run update.
+ */
+ operator fun invoke(timestamp: Long): Boolean {
+ val shouldExecute = !isCancelled && ctx.requiresUpdate(timestamp)
+ if (shouldExecute) {
+ ctx.doUpdate(timestamp)
+ }
+ return shouldExecute
+ }
+
+ override fun compareTo(other: Update): Int = timestamp.compareTo(other.timestamp)
+
+ override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]"
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index 2b32300e..51024e80 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* Test suite for the [SimResourceAggregatorMaxMin] class.
@@ -41,7 +42,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimResourceAggregatorMaxMinTest {
@Test
fun testSingleCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val forwarder = SimResourceForwarder()
@@ -58,7 +59,7 @@ internal class SimResourceAggregatorMaxMinTest {
source.startConsumer(adapter)
try {
- aggregator.output.consume(consumer)
+ aggregator.consume(consumer)
yield()
assertAll(
@@ -66,13 +67,13 @@ internal class SimResourceAggregatorMaxMinTest {
{ assertEquals(listOf(0.0, 0.5, 0.0), usage) }
)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testDoubleCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -86,20 +87,20 @@ internal class SimResourceAggregatorMaxMinTest {
val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
try {
- aggregator.output.consume(adapter)
+ aggregator.consume(adapter)
yield()
assertAll(
{ assertEquals(1000, clock.millis()) },
{ assertEquals(listOf(0.0, 2.0, 0.0), usage) }
)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testOvercommit() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -114,19 +115,19 @@ internal class SimResourceAggregatorMaxMinTest {
.andThen(SimResourceCommand.Exit)
try {
- aggregator.output.consume(consumer)
+ aggregator.consume(consumer)
yield()
assertEquals(1000, clock.millis())
verify(exactly = 2) { consumer.onNext(any()) }
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testException() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -141,17 +142,17 @@ internal class SimResourceAggregatorMaxMinTest {
.andThenThrows(IllegalStateException("Test Exception"))
try {
- assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }
+ assertThrows<IllegalStateException> { aggregator.consume(consumer) }
yield()
assertEquals(SimResourceState.Pending, sources[0].state)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -163,20 +164,20 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(4.0, 1.0)
try {
coroutineScope {
- launch { aggregator.output.consume(consumer) }
+ launch { aggregator.consume(consumer) }
delay(1000)
sources[0].capacity = 0.5
}
yield()
assertEquals(2334, clock.millis())
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testFailOverCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -188,14 +189,40 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(1.0, 0.5)
try {
coroutineScope {
- launch { aggregator.output.consume(consumer) }
+ launch { aggregator.consume(consumer) }
delay(500)
sources[0].capacity = 0.5
}
yield()
assertEquals(1000, clock.millis())
} finally {
- aggregator.output.close()
+ aggregator.close()
+ }
+ }
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
+ val sources = listOf(
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ aggregator.consume(consumer)
+ yield()
+ assertEquals(1000, clock.millis())
+ assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" }
+ } finally {
+ aggregator.close()
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index 2e2d6588..6cb507ce 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -26,98 +26,109 @@ import io.mockk.*
import kotlinx.coroutines.*
import org.junit.jupiter.api.*
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.impl.SimResourceContextImpl
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
- * A test suite for the [SimAbstractResourceContext] class.
+ * A test suite for the [SimResourceContextImpl] class.
*/
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceContextTest {
@Test
fun testFlushWithoutCommand() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish() {}
+ val logic = object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
}
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
- context.flush(isIntermediate = false)
+ context.doUpdate(interpreter.clock.millis())
}
@Test
fun testIntermediateFlush() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onFinish() {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ val logic = spyk(object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
})
+ val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic))
context.start()
delay(1) // Delay 1 ms to prevent hitting the fast path
- context.flush(isIntermediate = true)
+ context.doUpdate(interpreter.clock.millis())
- verify(exactly = 2) { context.onConsume(any(), any(), any()) }
+ verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) }
}
@Test
fun testIntermediateFlushIdle() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onFinish() {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ val logic = spyk(object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
})
+ val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic))
context.start()
delay(5)
- context.flush(isIntermediate = true)
+ context.invalidate()
delay(5)
- context.flush(isIntermediate = true)
+ context.invalidate()
assertAll(
- { verify(exactly = 2) { context.onIdle(any()) } },
- { verify(exactly = 1) { context.onFinish() } }
+ { verify(exactly = 2) { logic.onIdle(any(), any()) } },
+ { verify(exactly = 1) { logic.onFinish(any()) } }
)
}
@Test
fun testDoubleStart() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onFinish() {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ val logic = object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
}
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
context.start()
- assertThrows<IllegalStateException> { context.start() }
+
+ assertThrows<IllegalStateException> {
+ context.start()
+ }
}
@Test
fun testIdempodentCapacityChange() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish() {}
+ val logic = object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
}
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ context.capacity = 4200.0
context.start()
context.capacity = 4200.0
@@ -126,17 +137,19 @@ class SimResourceContextTest {
@Test
fun testFailureNoInfiniteLoop() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Exit
every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent")
every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure")
- val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish() {}
- }
+ val logic = spyk(object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
+ })
+
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
context.start()
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 5e86088d..08d88093 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimResourceSource] class.
@@ -40,7 +41,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
class SimResourceSourceTest {
@Test
fun testSpeed() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -63,7 +64,7 @@ class SimResourceSourceTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val provider = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
@@ -83,7 +84,7 @@ class SimResourceSourceTest {
@Test
fun testSpeedLimit() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -110,7 +111,7 @@ class SimResourceSourceTest {
*/
@Test
fun testIntermediateInterrupt() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -133,7 +134,7 @@ class SimResourceSourceTest {
@Test
fun testInterrupt() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
lateinit var resCtx: SimResourceContext
@@ -174,7 +175,7 @@ class SimResourceSourceTest {
@Test
fun testFailure() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -193,7 +194,7 @@ class SimResourceSourceTest {
@Test
fun testExceptionPropagationOnNext() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -213,7 +214,7 @@ class SimResourceSourceTest {
@Test
fun testConcurrentConsumption() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -236,7 +237,7 @@ class SimResourceSourceTest {
@Test
fun testClosedConsumption() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -257,7 +258,7 @@ class SimResourceSourceTest {
@Test
fun testCloseDuringConsumption() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -279,7 +280,7 @@ class SimResourceSourceTest {
@Test
fun testIdle() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -301,7 +302,7 @@ class SimResourceSourceTest {
fun testInfiniteSleep() {
assertThrows<IllegalStateException> {
runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -321,7 +322,7 @@ class SimResourceSourceTest {
@Test
fun testIncorrectDeadline() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index 32b6d8ad..ad8d82e3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* Test suite for the [SimResourceSwitchExclusive] class.
@@ -44,7 +45,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val speed = mutableListOf<Double>()
@@ -66,7 +67,7 @@ internal class SimResourceSwitchExclusiveTest {
source.startConsumer(adapter)
switch.addInput(forwarder)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -86,7 +87,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testRuntimeWorkload() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -97,7 +98,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -113,7 +114,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTwoWorkloads() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = object : SimResourceConsumer {
@@ -141,7 +142,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -158,7 +159,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -169,7 +170,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- switch.addOutput(3200.0)
- assertThrows<IllegalStateException> { switch.addOutput(3200.0) }
+ switch.newOutput()
+ assertThrows<IllegalStateException> { switch.newOutput() }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index e7dec172..e4292ec0 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* Test suite for the [SimResourceSwitch] implementations
@@ -40,13 +41,13 @@ import org.opendc.simulator.resources.consumer.SimTraceConsumer
internal class SimResourceSwitchMaxMinTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val switch = SimResourceSwitchMaxMin(scheduler)
val sources = List(2) { SimResourceSource(2000.0, scheduler) }
sources.forEach { switch.addInput(it) }
- val provider = switch.addOutput(1000.0)
+ val provider = switch.newOutput()
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
@@ -64,27 +65,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
-
- val listener = object : SimResourceSwitchMaxMin.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
-
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += requestedWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L
val workload =
@@ -97,8 +78,8 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(scheduler, listener)
- val provider = switch.addOutput(3200.0)
+ val switch = SimResourceSwitchMaxMin(scheduler)
+ val provider = switch.newOutput()
try {
switch.addInput(SimResourceSource(3200.0, scheduler))
@@ -109,9 +90,9 @@ internal class SimResourceSwitchMaxMinTest {
}
assertAll(
- { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
+ { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -121,27 +102,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedDual() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
-
- val listener = object : SimResourceSwitchMaxMin.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
-
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += requestedWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L
val workloadA =
@@ -163,9 +124,9 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(scheduler, listener)
- val providerA = switch.addOutput(3200.0)
- val providerB = switch.addOutput(3200.0)
+ val switch = SimResourceSwitchMaxMin(scheduler)
+ val providerA = switch.newOutput()
+ val providerB = switch.newOutput()
try {
switch.addInput(SimResourceSource(3200.0, scheduler))
@@ -180,9 +141,9 @@ internal class SimResourceSwitchMaxMinTest {
switch.close()
}
assertAll(
- { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
+ { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index 880e1755..810052b8 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimResourceTransformer] class.
@@ -41,7 +42,7 @@ internal class SimResourceTransformerTest {
@Test
fun testExitImmediately() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
launch {
@@ -61,7 +62,7 @@ internal class SimResourceTransformerTest {
@Test
fun testExit() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
launch {
@@ -122,7 +123,7 @@ internal class SimResourceTransformerTest {
@Test
fun testCancelStartedDelegate() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -141,7 +142,7 @@ internal class SimResourceTransformerTest {
@Test
fun testCancelPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -160,7 +161,7 @@ internal class SimResourceTransformerTest {
@Test
fun testExitPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder(isCoupled = true)
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -176,7 +177,7 @@ internal class SimResourceTransformerTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
@@ -195,7 +196,7 @@ internal class SimResourceTransformerTest {
@Test
fun testTransformExit() = runBlockingSimulation {
val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit }
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
@@ -205,4 +206,21 @@ internal class SimResourceTransformerTest {
assertEquals(0, clock.millis())
verify(exactly = 1) { consumer.onNext(any()) }
}
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
+
+ val consumer = SimWorkConsumer(2.0, 1.0)
+ source.startConsumer(forwarder)
+
+ forwarder.consume(consumer)
+
+ assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
+ assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
+ assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
+ assertEquals(2000, clock.millis())
+ }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
index ac8b5814..db4fe856 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimWorkConsumer] class.
@@ -35,7 +36,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimWorkConsumerTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 1.0)
@@ -50,7 +51,7 @@ internal class SimWorkConsumerTest {
@Test
fun testUtilization() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 0.5)