summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-11-13 18:16:19 +0000
committerGitHub <noreply@github.com>2022-11-13 18:16:19 +0000
commit52eed48441693149993db79b63431b99e0973027 (patch)
treeba267db531bc3d81409ddfe9caeb6d3b5a65e8c8
parent183cfa96910ebb74c668dea7ef98071966f8fcb9 (diff)
parent33d91ef30ad7bcb73365934fe536461210d1082a (diff)
merge: Increase minimum Java version to 17 (#115)
This pull request increases the minimum version of Java required by OpenDC to 17. This new version of Java introduces several new features compared to our old minimum version (11), which we attempt to apply in this conversion. ## Implementation Notes :hammer_and_pick: * Increase minimum Java version to Java 17 * Use RandomGenerator as randomness source * Add common dispatcher interface * Add compatibility with Kotlin coroutines * Use InstantSource as time source * Re-implement SimulationScheduler as Dispatcher * Replace use of CoroutineContext by Dispatcher ## External Dependencies :four_leaf_clover: * Java 17 ## Breaking API Changes :warning: * The use of `CoroutineContext` and `Clock` as parameters of classes has been replaced by the `Dispatcher` interface. * The use of `Clock` has been replaced by `InstantSource` which does not carry time zone info. * The use of `Random` and `SplittableRandom` as parameter type has been replaced by `RandomGenerator`
-rw-r--r--.github/workflows/build.yml6
-rw-r--r--.github/workflows/release.yml6
-rw-r--r--.github/workflows/test-gradle-rc.yml3
-rw-r--r--.github/workflows/test-java-ea.yml3
-rw-r--r--README.md39
-rw-r--r--buildSrc/src/main/kotlin/Libs.kt37
-rw-r--r--buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts4
-rw-r--r--buildSrc/src/main/kotlin/java-conventions.gradle.kts6
-rw-r--r--buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts9
-rw-r--r--buildSrc/src/main/kotlin/testing-conventions.gradle.kts26
-rw-r--r--gradle/libs.versions.toml10
-rw-r--r--gradle/wrapper/gradle-wrapper.jarbin59821 -> 60756 bytes
-rw-r--r--gradle/wrapper/gradle-wrapper.properties2
-rwxr-xr-xgradlew6
-rw-r--r--gradlew.bat14
-rw-r--r--opendc-common/src/main/java/org/opendc/common/Dispatcher.java63
-rw-r--r--opendc-common/src/main/java/org/opendc/common/DispatcherHandle.java33
-rw-r--r--opendc-common/src/main/java/org/opendc/common/DispatcherProvider.java33
-rw-r--r--opendc-common/src/main/java/org/opendc/common/util/Pacer.java (renamed from opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt)80
-rw-r--r--opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java256
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/DispatcherCoroutineDispatcher.kt (renamed from opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt)48
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/Dispatchers.kt32
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt230
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/DispatcherCoroutineDispatcherTest.kt69
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt18
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt31
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt11
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt12
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt7
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt26
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt6
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt16
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt12
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt11
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt11
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt16
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt7
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt2
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt8
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt8
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt6
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt11
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt12
-rw-r--r--opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt5
-rw-r--r--opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt6
-rw-r--r--opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt6
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt23
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt7
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt18
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt7
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt4
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt3
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt11
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt10
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt24
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt8
-rw-r--r--opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt20
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt11
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt4
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt48
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt20
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt38
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt16
-rw-r--r--opendc-simulator/opendc-simulator-core/build.gradle.kts1
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationDispatcher.java (renamed from opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java)134
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt74
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt34
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt36
-rw-r--r--opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationDispatcherTest.kt (renamed from opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt)23
-rw-r--r--opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/kotlin/SimulationBuildersTest.kt98
-rw-r--r--opendc-simulator/opendc-simulator-flow/build.gradle.kts4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt18
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt8
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt4
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt4
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt13
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt17
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt6
-rw-r--r--site/docs/advanced-guides/toolchain.md2
-rw-r--r--site/docs/getting-started/0-installation.md2
112 files changed, 1289 insertions, 958 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 0747ad33..96b86233 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -13,10 +13,10 @@ jobs:
strategy:
matrix:
os: [ ubuntu-latest ]
- java: [ 11, 17, 18 ]
+ java: [ 17, 19 ]
include:
- os: windows-latest
- java: 18
+ java: 17
steps:
- name: Checkout repository
uses: actions/checkout@v3
@@ -36,7 +36,7 @@ jobs:
cache-read-only: ${{ github.ref != 'refs/heads/main' }}
- name: Publish report
if: always()
- uses: mikepenz/action-junit-report@v3.5.2
+ uses: mikepenz/action-junit-report@v3
with:
check_name: test (Java ${{ matrix.java }})
report_paths: '**/build/test-results/test/TEST-*.xml'
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 2d893edf..4f62843c 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -8,9 +8,6 @@ jobs:
build:
name: Build OpenDC
runs-on: ubuntu-latest
- strategy:
- matrix:
- java: [ 17 ]
steps:
- name: Checkout repository
uses: actions/checkout@v3
@@ -20,13 +17,12 @@ jobs:
uses: actions/setup-java@v3
with:
distribution: 'zulu'
- java-version: ${{ matrix.java }}
+ java-version: 17
- name: Publish with Gradle
uses: gradle/gradle-build-action@v2
with:
arguments: publish
env:
- ORG_GRADLE_PROJECT_signingKeyId: F8134F9C
ORG_GRADLE_PROJECT_signingKey: ${{ secrets.GPG_PRIVATE_KEY }}
ORG_GRADLE_PROJECT_signingPassword: ${{ secrets.GPG_PASSPHRASE }}
ORG_GRADLE_PROJECT_ossrhUsername: ${{ secrets.OSSRH_USERNAME }}
diff --git a/.github/workflows/test-gradle-rc.yml b/.github/workflows/test-gradle-rc.yml
index a39d36d7..b58bbcb1 100644
--- a/.github/workflows/test-gradle-rc.yml
+++ b/.github/workflows/test-gradle-rc.yml
@@ -11,8 +11,9 @@ jobs:
- uses: actions/setup-java@v3
with:
distribution: 'zulu'
- java-version: 18
+ java-version: 19
- uses: gradle/gradle-build-action@v2
with:
+ cache-disabled: true
gradle-version: release-candidate
arguments: build --dry-run # just test build configuration
diff --git a/.github/workflows/test-java-ea.yml b/.github/workflows/test-java-ea.yml
index f3c31928..d0324b1b 100644
--- a/.github/workflows/test-java-ea.yml
+++ b/.github/workflows/test-java-ea.yml
@@ -11,8 +11,9 @@ jobs:
- uses: actions/setup-java@v3
with:
distribution: 'zulu'
- java-version: 19-ea
+ java-version: 20-ea
- uses: gradle/gradle-build-action@v2
with:
+ cache-disabled: true
gradle-version: release-candidate
arguments: build
diff --git a/README.md b/README.md
index a633b264..a82848bb 100644
--- a/README.md
+++ b/README.md
@@ -13,42 +13,23 @@ Collaborative Datacenter Simulation and Exploration for Everybody
-----
-OpenDC is a free and open-source platform for datacenter simulation aimed at both research and education.
+This repository is the home of the OpenDC project, a free and open-source platform for cloud datacenter simulation.
-![Datacenter construction in OpenDC](site/src/components/HomepageFeatures/screenshot-construction.png)
+## Latest Release
-Users can construct datacenters (see above) and define portfolios of scenarios (experiments) to see how these
-datacenters perform under different workloads and schedulers (see below).
-
-![Datacenter simulation in OpenDC](site/src/components/HomepageFeatures/screenshot-construction.png))
-
-The simulator is accessible both as a ready-to-use website hosted by us at [app.opendc.org](https://app.opendc.org), and
-as source code that users can run locally on their own machine or via Docker.
-
-To learn more
-about OpenDC, have a look through our paper [OpenDC 2.0](https://atlarge-research.com/pdfs/ccgrid21-opendc-paper.pdf)
-or on our [vision](https://atlarge-research.com/pdfs/opendc-vision17ispdc_cr.pdf).
-
-🛠 OpenDC is a project by the [@Large Research Group](https://atlarge-research.com).
-
-🐟 OpenDC comes bundled
-with [Capelin](https://repository.tudelft.nl/islandora/object/uuid:d6d50861-86a3-4dd3-a13f-42d84db7af66?collection=education)
-, the capacity planning tool for cloud datacenters based on portfolios of what-if scenarios. More information on how to
-use and extend Capelin coming soon!
+- General Availability (GA): [OpenDC v2.0](https://github.com/atlarge-research/opendc/releases/tag/v2.0) (May 10, 2021)
+- Preview (Release Candidate): n/a
## Documentation
-You can find the OpenDC documentation [on the website](https://opendc.org/).
-
-Check out the [Getting Started](https://opendc.org/docs/getting-started.html) page for a quick overview.
-
+You can find the OpenDC documentation [on the website](https://atlarge-research.github.io/opendc/).
The documentation is divided into several sections:
-* [Main Concepts](https://opendc.org/docs/category/getting-started)
-* [Tutorials](https://opendc.org/docs/category/tutorials)
-* [Advanced Guides](https://opendc.org/docs/category/advanced-guides)
-* [Where to Get Support](https://opendc.org/community/support)
-* [Contributing Guide](https://opendc.org/community/contributing)
+* [Getting Started](https://atlarge-research.github.io/opendc/docs/category/getting-started/)
+* [Tutorials](https://atlarge-research.github.io/opendc/docs/category/tutorials/)
+* [Advanced Guides](https://atlarge-research.github.io/opendc/docs/category/advanced-guides/)
+* [Where to Get Support](https://atlarge-research.github.io/opendc/community/support/)
+* [Contributing Guide](https://atlarge-research.github.io/opendc/community/contributing/)
The source code for the documentation is located in [site](site).
diff --git a/buildSrc/src/main/kotlin/Libs.kt b/buildSrc/src/main/kotlin/Libs.kt
index 6a73e1b9..09432b06 100644
--- a/buildSrc/src/main/kotlin/Libs.kt
+++ b/buildSrc/src/main/kotlin/Libs.kt
@@ -24,32 +24,27 @@
import org.gradle.api.JavaVersion
import org.gradle.api.Project
+import org.gradle.api.artifacts.VersionCatalog
import org.gradle.api.artifacts.VersionCatalogsExtension
import org.gradle.kotlin.dsl.getByType
/**
- * This class makes the version catalog accessible for the build scripts until Gradle adds support for it.
- *
- * See https://github.com/gradle/gradle/issues/15383
+ * Obtain the default [VersionCatalog] of the project.
*/
-public class Libs(project: Project) {
- /**
- * The version catalog of the project.
- */
- private val versionCatalog = project.extensions.getByType(VersionCatalogsExtension::class).named("libs")
+public val Project.defaultVersionCatalog: VersionCatalog
+ get() = project.extensions.getByType(VersionCatalogsExtension::class).named("libs")
- /**
- * Obtain the version for the specified [dependency][name].
- */
- operator fun get(name: String): String {
- val dep = versionCatalog.findLibrary(name).get().get()
- return "${dep.module.group}:${dep.module.name}:${dep.versionConstraint.displayName}"
- }
+/**
+ * Obtain the dependency string for the specified [name].
+ */
+public operator fun VersionCatalog.get(name: String): String {
+ val dep = findLibrary(name).get().get()
+ return "${dep.module.group}:${dep.module.name}:${dep.versionConstraint.displayName}"
+}
- companion object {
- /**
- * The JVM version to target.
- */
- val jvmTarget = JavaVersion.VERSION_11
- }
+/**
+ * Obtain the string representation of the version with the given [name].
+ */
+public fun VersionCatalog.getVersion(name: String): String {
+ return findVersion(name).get().displayName
}
diff --git a/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts b/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts
index 554d2279..e16733a4 100644
--- a/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts
@@ -50,7 +50,7 @@ tasks.named("jmh", JMHTask::class) {
dependencies {
constraints {
- val libs = Libs(project)
- jmh(libs["commons.math3"]) // XXX Force JMH to use the same commons-math3 version as OpenDC
+ val versionCatalog = project.defaultVersionCatalog
+ jmh(versionCatalog["commons.math3"]) // XXX Force JMH to use the same commons-math3 version as OpenDC
}
}
diff --git a/buildSrc/src/main/kotlin/java-conventions.gradle.kts b/buildSrc/src/main/kotlin/java-conventions.gradle.kts
index 8857d4ab..875ae2a7 100644
--- a/buildSrc/src/main/kotlin/java-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/java-conventions.gradle.kts
@@ -31,8 +31,10 @@ repositories {
}
java {
- sourceCompatibility = Libs.jvmTarget
- targetCompatibility = Libs.jvmTarget
+ toolchain {
+ val javaVersion = project.defaultVersionCatalog.getVersion("java")
+ languageVersion.set(JavaLanguageVersion.of(javaVersion))
+ }
}
tasks.withType<JavaCompile> {
diff --git a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts
index 5ccc06a4..79afdf7c 100644
--- a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts
@@ -29,8 +29,15 @@ plugins {
}
/* Project configuration */
+
+kotlin {
+ jvmToolchain {
+ val javaVersion = project.defaultVersionCatalog.getVersion("java")
+ languageVersion.set(JavaLanguageVersion.of(javaVersion))
+ }
+}
+
tasks.withType<KotlinCompile>().configureEach {
- kotlinOptions.jvmTarget = Libs.jvmTarget.toString()
kotlinOptions.freeCompilerArgs += "-opt-in=kotlin.RequiresOptIn"
kotlinOptions.freeCompilerArgs += "-Xjvm-default=all"
}
diff --git a/buildSrc/src/main/kotlin/testing-conventions.gradle.kts b/buildSrc/src/main/kotlin/testing-conventions.gradle.kts
index ebeb58a4..a7fa9da9 100644
--- a/buildSrc/src/main/kotlin/testing-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/testing-conventions.gradle.kts
@@ -34,10 +34,26 @@ tasks.test {
}
dependencies {
- val libs = Libs(project)
+ val versionCatalog = project.defaultVersionCatalog
- testImplementation(libs["junit.jupiter.api"])
- testImplementation(libs["junit.jupiter.params"])
- testImplementation(libs["mockk"])
- testRuntimeOnly(libs["junit.jupiter.engine"])
+ testImplementation(versionCatalog["junit.jupiter.api"])
+ testImplementation(versionCatalog["junit.jupiter.params"])
+ testImplementation(versionCatalog["mockk"])
+ testRuntimeOnly(versionCatalog["junit.jupiter.engine"])
+}
+
+tasks.register<Test>("testsOn18") {
+ javaLauncher.set(javaToolchains.launcherFor {
+ languageVersion.set(JavaLanguageVersion.of(18))
+ })
+
+ useJUnitPlatform()
+}
+
+tasks.register<Test>("testsOn19") {
+ javaLauncher.set(javaToolchains.launcherFor {
+ languageVersion.set(JavaLanguageVersion.of(19))
+ })
+
+ useJUnitPlatform()
}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 033e8cfb..89de95b6 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -3,16 +3,17 @@ calcite = "1.32.0"
clikt = "3.5.0"
commons-math3 = "3.6.1"
dokka = "1.7.10"
-gradle-node = "3.4.0"
+gradle-node = "3.5.0"
hadoop = "3.3.4"
-jackson = "2.13.4"
+jackson = "2.14.0"
jandex-gradle = "0.13.2"
+java = "17"
jline = "3.21.0"
jmh-gradle = "0.6.8"
jakarta-validation = "2.0.2"
junit-jupiter = "5.9.1"
kotlin = "1.7.20"
-kotlin-logging = "3.0.0"
+kotlin-logging = "3.0.4"
kotlinx-coroutines = "1.6.4"
log4j = "2.19.0"
microprofile-openapi = "3.0"
@@ -21,7 +22,7 @@ mockk = "1.13.2"
node = "18.12.0"
parquet = "1.12.3"
progressbar = "0.9.3"
-quarkus = "2.13.1.Final"
+quarkus = "2.14.0.Final"
quarkus-junit5-mockk = "1.1.1"
sentry = "6.4.3"
slf4j = "2.0.3"
@@ -36,6 +37,7 @@ kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core",
# Logging
kotlin-logging = { module = "io.github.microutils:kotlin-logging", version.ref = "kotlin-logging" }
+slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
log4j-core = { module = "org.apache.logging.log4j:log4j-core", version.ref = "log4j" }
log4j-slf4j = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", version.ref = "log4j" }
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 41d9927a..249e5832 100644
--- a/gradle/wrapper/gradle-wrapper.jar
+++ b/gradle/wrapper/gradle-wrapper.jar
Binary files differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 8049c684..8fab764a 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.5-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-rc-3-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
index 1b6c7873..a69d9cb6 100755
--- a/gradlew
+++ b/gradlew
@@ -205,6 +205,12 @@ set -- \
org.gradle.wrapper.GradleWrapperMain \
"$@"
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+ die "xargs is not available"
+fi
+
# Use "xargs" to parse quoted args.
#
# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
diff --git a/gradlew.bat b/gradlew.bat
index 107acd32..f127cfd4 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -14,7 +14,7 @@
@rem limitations under the License.
@rem
-@if "%DEBUG%" == "" @echo off
+@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@@ -25,7 +25,7 @@
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
-if "%DIRNAME%" == "" set DIRNAME=.
+if "%DIRNAME%"=="" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@@ -40,7 +40,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
-if "%ERRORLEVEL%" == "0" goto execute
+if %ERRORLEVEL% equ 0 goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
@@ -75,13 +75,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
:end
@rem End local scope for the variables with windows NT shell
-if "%ERRORLEVEL%"=="0" goto mainEnd
+if %ERRORLEVEL% equ 0 goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
-if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
-exit /b 1
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
:mainEnd
if "%OS%"=="Windows_NT" endlocal
diff --git a/opendc-common/src/main/java/org/opendc/common/Dispatcher.java b/opendc-common/src/main/java/org/opendc/common/Dispatcher.java
new file mode 100644
index 00000000..8c919311
--- /dev/null
+++ b/opendc-common/src/main/java/org/opendc/common/Dispatcher.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common;
+
+import java.time.InstantSource;
+
+/**
+ * A {@link Dispatcher} is used in OpenDC to schedule the execution of future tasks over potentially multiple threads.
+ */
+public interface Dispatcher {
+ /**
+ * Return the time source of the dispatcher as an {@link InstantSource}.
+ */
+ InstantSource getTimeSource();
+
+ /**
+ * Schedule the specified {@link Runnable} to run as soon as possible.
+ *
+ * @param command The task to execute.
+ */
+ default void schedule(Runnable command) {
+ schedule(0, command);
+ }
+
+ /**
+ * Schedule the specified {@link Runnable} to run after the specified <code>delay</code>.
+ * <p>
+ * Use this method to eliminate potential allocations in case the task does not need to be cancellable.
+ *
+ * @param delayMs The time from now to the delayed execution (in milliseconds).
+ * @param command The task to execute.
+ */
+ void schedule(long delayMs, Runnable command);
+
+ /**
+ * Schedule the specified {@link Runnable} to run after the specified <code>delay</code>.
+ *
+ * @param delayMs The time from now to the delayed execution (in milliseconds).
+ * @param command The task to execute.
+ * @return A {@link DispatcherHandle} representing pending completion of the task.
+ */
+ DispatcherHandle scheduleCancellable(long delayMs, Runnable command);
+}
diff --git a/opendc-common/src/main/java/org/opendc/common/DispatcherHandle.java b/opendc-common/src/main/java/org/opendc/common/DispatcherHandle.java
new file mode 100644
index 00000000..e34e5e11
--- /dev/null
+++ b/opendc-common/src/main/java/org/opendc/common/DispatcherHandle.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common;
+
+/**
+ * A handle returned by a {@link Dispatcher} representing a scheduled task.
+ */
+public interface DispatcherHandle {
+ /**
+ * Attempt to cancel execution of the task.
+ */
+ void cancel();
+}
diff --git a/opendc-common/src/main/java/org/opendc/common/DispatcherProvider.java b/opendc-common/src/main/java/org/opendc/common/DispatcherProvider.java
new file mode 100644
index 00000000..2717bd0f
--- /dev/null
+++ b/opendc-common/src/main/java/org/opendc/common/DispatcherProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common;
+
+/**
+ * Interface to expose the {@link Dispatcher} instance used by a class.
+ */
+public interface DispatcherProvider {
+ /**
+ * Return the {@link Dispatcher} associated with this class.
+ */
+ Dispatcher getDispatcher();
+}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt b/opendc-common/src/main/java/org/opendc/common/util/Pacer.java
index edf607d2..5b8d8cb0 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt
+++ b/opendc-common/src/main/java/org/opendc/common/util/Pacer.java
@@ -20,75 +20,75 @@
* SOFTWARE.
*/
-package org.opendc.common.util
+package org.opendc.common.util;
-import kotlinx.coroutines.Delay
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.InternalCoroutinesApi
-import java.lang.Runnable
-import java.time.Clock
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.CoroutineContext
+import java.util.function.LongConsumer;
+import org.opendc.common.Dispatcher;
+import org.opendc.common.DispatcherHandle;
/**
* Helper class to pace the incoming scheduling requests.
- *
- * @param context The [CoroutineContext] in which the pacer runs.
- * @param clock The virtual simulation clock.
- * @param quantum The scheduling quantum.
- * @param process The process to invoke for the incoming requests.
*/
-public class Pacer(
- private val context: CoroutineContext,
- private val clock: Clock,
- private val quantum: Long,
- private val process: (Long) -> Unit
-) {
+public final class Pacer {
+ private final Dispatcher dispatcher;
+ private final long quantumMs;
+ private final LongConsumer process;
+
/**
- * The [Delay] instance that provides scheduled execution of [Runnable]s.
+ * The current {@link DispatcherHandle} representing the pending scheduling cycle.
*/
- @OptIn(InternalCoroutinesApi::class)
- private val delay =
- requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+ private DispatcherHandle handle;
/**
- * The current [DisposableHandle] representing the pending scheduling cycle.
+ * Construct a {@link Pacer} instance.
+ *
+ * @param dispatcher The {@link Dispatcher} to schedule future invocations.
+ * @param quantumMs The scheduling quantum in milliseconds.
+ * @param process The process to invoke for the incoming requests.
*/
- private var handle: DisposableHandle? = null
+ public Pacer(Dispatcher dispatcher, long quantumMs, LongConsumer process) {
+ this.dispatcher = dispatcher;
+ this.quantumMs = quantumMs;
+ this.process = process;
+ }
/**
* Determine whether a scheduling cycle is pending.
*/
- public val isPending: Boolean get() = handle != null
+ public boolean isPending() {
+ return handle != null;
+ }
/**
* Enqueue a new scheduling cycle.
*/
- public fun enqueue() {
+ public void enqueue() {
if (handle != null) {
- return
+ return;
}
- val quantum = quantum
- val now = clock.millis()
+ final Dispatcher dispatcher = this.dispatcher;
+ long quantumMs = this.quantumMs;
+ long now = dispatcher.getTimeSource().millis();
// We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// We calculate here the delay until the next scheduling slot.
- val timeUntilNextSlot = quantum - (now % quantum)
+ long timeUntilNextSlot = quantumMs - (now % quantumMs);
- @OptIn(InternalCoroutinesApi::class)
- handle = delay.invokeOnTimeout(timeUntilNextSlot, {
- process(now + timeUntilNextSlot)
- handle = null
- }, context)
+ handle = dispatcher.scheduleCancellable(timeUntilNextSlot, () -> {
+ process.accept(now + timeUntilNextSlot);
+ handle = null;
+ });
}
/**
* Cancel the currently pending scheduling cycle.
*/
- public fun cancel() {
- val handle = handle ?: return
- this.handle = null
- handle.dispose()
+ public void cancel() {
+ final DispatcherHandle handle = this.handle;
+ if (handle != null) {
+ this.handle = null;
+ handle.cancel();
+ }
}
}
diff --git a/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java b/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java
new file mode 100644
index 00000000..a85605e9
--- /dev/null
+++ b/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common.util;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.PriorityQueue;
+import org.jetbrains.annotations.NotNull;
+import org.opendc.common.Dispatcher;
+import org.opendc.common.DispatcherHandle;
+
+/**
+ * A {@link TimerScheduler} facilitates scheduled execution of future tasks.
+ */
+public final class TimerScheduler<T> {
+ private final Dispatcher dispatcher;
+
+ /**
+ * The stack of the invocations to occur in the future.
+ */
+ private final ArrayDeque<Invocation> invocations = new ArrayDeque<>();
+
+ /**
+ * A priority queue containing the tasks to be scheduled in the future.
+ */
+ private final PriorityQueue<Timer<T>> queue = new PriorityQueue<Timer<T>>();
+
+ /**
+ * A map that keeps track of the timers.
+ */
+ private final HashMap<T, Timer<T>> timers = new HashMap<>();
+
+ /**
+ * Construct a {@link TimerScheduler} instance.
+ *
+ * @param dispatcher The {@link Dispatcher} to schedule future invocations.
+ */
+ public TimerScheduler(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ /**
+ * Start a timer that will invoke the specified [block] after [delay].
+ * <p>
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param delay The delay before invoking the block.
+ * @param block The block to invoke.
+ */
+ public void startSingleTimer(T key, long delay, Runnable block) {
+ startSingleTimerTo(key, dispatcher.getTimeSource().millis() + delay, block);
+ }
+
+ /**
+ * Start a timer that will invoke the specified [block] at [timestamp].
+ * <p>
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param timestamp The timestamp at which to invoke the block.
+ * @param block The block to invoke.
+ */
+ public void startSingleTimerTo(T key, long timestamp, Runnable block) {
+ long now = dispatcher.getTimeSource().millis();
+ final PriorityQueue<Timer<T>> queue = this.queue;
+ final ArrayDeque<Invocation> invocations = this.invocations;
+
+ if (timestamp < now) {
+ throw new IllegalArgumentException("Timestamp must be in the future");
+ }
+
+ timers.compute(key, (k, old) -> {
+ if (old != null && old.timestamp == timestamp) {
+ // Fast-path: timer for the same timestamp already exists
+ old.block = block;
+ return old;
+ } else {
+ // Slow-path: cancel old timer and replace it with new timer
+ Timer<T> timer = new Timer<T>(key, timestamp, block);
+
+ if (old != null) {
+ old.isCancelled = true;
+ }
+ queue.add(timer);
+ trySchedule(now, invocations, timestamp);
+
+ return timer;
+ }
+ });
+ }
+
+ /**
+ * Check if a timer with a given key is active.
+ *
+ * @param key The key to check if active.
+ * @return `true` if the timer with the specified [key] is active, `false` otherwise.
+ */
+ public boolean isTimerActive(T key) {
+ return timers.containsKey(key);
+ }
+
+ /**
+ * Cancel a timer with a given key.
+ * <p>
+ * If canceling a timer that was already canceled, or key never was used to start
+ * a timer this operation will do nothing.
+ *
+ * @param key The key of the timer to cancel.
+ */
+ public void cancel(T key) {
+ final Timer<T> timer = timers.remove(key);
+
+ // Mark the timer as cancelled
+ if (timer != null) {
+ timer.isCancelled = true;
+ }
+ }
+
+ /**
+ * Cancel all timers.
+ */
+ public void cancelAll() {
+ queue.clear();
+ timers.clear();
+
+ // Cancel all pending invocations
+ for (final Invocation invocation : invocations) {
+ invocation.cancel();
+ }
+
+ invocations.clear();
+ }
+
+ /**
+ * Try to schedule an engine invocation at the specified [target].
+ *
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the engine invocation should happen.
+ * @param scheduled The queue of scheduled invocations.
+ */
+ private void trySchedule(long now, ArrayDeque<Invocation> scheduled, long target) {
+ final Invocation head = scheduled.peek();
+
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (head == null || target < head.timestamp) {
+ final DispatcherHandle handle = dispatcher.scheduleCancellable(target - now, this::doRunTimers);
+ scheduled.addFirst(new Invocation(target, handle));
+ }
+ }
+
+ /**
+ * This method is invoked when the earliest timer expires.
+ */
+ private void doRunTimers() {
+ final ArrayDeque<Invocation> invocations = this.invocations;
+ final Invocation invocation = invocations.remove();
+
+ final PriorityQueue<Timer<T>> queue = this.queue;
+ final HashMap<T, Timer<T>> timers = this.timers;
+ long now = invocation.timestamp;
+
+ while (!queue.isEmpty()) {
+ final Timer<T> timer = queue.peek();
+
+ long timestamp = timer.timestamp;
+ boolean isCancelled = timer.isCancelled;
+
+ assert timestamp >= now : "Found task in the past";
+
+ if (timestamp > now && !isCancelled) {
+ // Schedule a task for the next event to occur.
+ trySchedule(now, invocations, timestamp);
+ break;
+ }
+
+ queue.poll();
+
+ if (!isCancelled) {
+ timers.remove(timer.key);
+ timer.run();
+ }
+ }
+ }
+
+ /**
+ * A task that is scheduled to run in the future.
+ */
+ private static class Timer<T> implements Comparable<Timer<T>> {
+ final T key;
+ final long timestamp;
+ Runnable block;
+
+ /**
+ * A flag to indicate that the task has been cancelled.
+ */
+ boolean isCancelled;
+
+ /**
+ * Construct a {@link Timer} instance.
+ */
+ public Timer(T key, long timestamp, Runnable block) {
+ this.key = key;
+ this.timestamp = timestamp;
+ this.block = block;
+ }
+
+ /**
+ * Run the task.
+ */
+ void run() {
+ block.run();
+ }
+
+ @Override
+ public int compareTo(@NotNull Timer<T> other) {
+ return Long.compare(timestamp, other.timestamp);
+ }
+ }
+
+ /**
+ * A future engine invocation.
+ * <p>
+ * This class is used to keep track of the future engine invocations created using the {@link Dispatcher} instance.
+ * In case the invocation is not needed anymore, it can be cancelled via [cancel].
+ */
+ private record Invocation(long timestamp, DispatcherHandle handle) {
+ /**
+ * Cancel the engine invocation.
+ */
+ void cancel() {
+ handle.cancel();
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt b/opendc-common/src/main/kotlin/org/opendc/common/DispatcherCoroutineDispatcher.kt
index cacbbbf7..63744ef9 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/DispatcherCoroutineDispatcher.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.kotlin
+package org.opendc.common
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
@@ -28,61 +28,37 @@ import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
-import org.opendc.simulator.SimulationScheduler
-import java.lang.Runnable
-import java.time.Clock
import kotlin.coroutines.CoroutineContext
/**
- * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual
- * clock for time management.
+ * A [CoroutineDispatcher] that uses a [Dispatcher] to dispatch pending co-routines.
*
- * @param scheduler The [SimulationScheduler] used to manage the execution of future tasks.
+ * @param dispatcher The [Dispatcher] used to manage the execution of future tasks.
*/
@OptIn(InternalCoroutinesApi::class)
-public class SimulationCoroutineDispatcher(
- override val scheduler: SimulationScheduler = SimulationScheduler()
-) : CoroutineDispatcher(), SimulationController, Delay {
- /**
- * The virtual clock of this dispatcher.
- */
- override val clock: Clock = scheduler.clock
-
+internal class DispatcherCoroutineDispatcher(private val dispatcher: Dispatcher) : CoroutineDispatcher(), Delay, DispatcherProvider {
override fun dispatch(context: CoroutineContext, block: Runnable) {
block.run()
}
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
- scheduler.execute(block)
+ dispatcher.schedule(block)
}
@OptIn(ExperimentalCoroutinesApi::class)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- scheduler.schedule(timeMillis, CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) })
+ dispatcher.schedule(timeMillis, CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) })
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
- return object : DisposableHandle {
- private val deadline = (scheduler.currentTime + timeMillis).let { if (it >= 0) it else Long.MAX_VALUE }
- private val id = scheduler.schedule(timeMillis, block)
-
- override fun dispose() {
- scheduler.cancel(deadline, id)
- }
- }
+ val handle = dispatcher.scheduleCancellable(timeMillis, block)
+ return DisposableHandle { handle.cancel() }
}
- override fun toString(): String {
- return "SimulationCoroutineDispatcher[time=${scheduler.currentTime}ms]"
- }
-
- override fun advanceUntilIdle(): Long {
- val scheduler = scheduler
- val oldTime = scheduler.currentTime
+ override fun getDispatcher(): Dispatcher = dispatcher
- scheduler.advanceUntilIdle()
-
- return scheduler.currentTime - oldTime
+ override fun toString(): String {
+ return "DispatcherCoroutineDispatcher[dispatcher=$dispatcher]"
}
/**
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/Dispatchers.kt b/opendc-common/src/main/kotlin/org/opendc/common/Dispatchers.kt
new file mode 100644
index 00000000..6dcd17af
--- /dev/null
+++ b/opendc-common/src/main/kotlin/org/opendc/common/Dispatchers.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common
+
+import kotlinx.coroutines.CoroutineDispatcher
+
+/**
+ * Convert a [Dispatcher] to a [CoroutineDispatcher].
+ */
+public fun Dispatcher.asCoroutineDispatcher(): CoroutineDispatcher {
+ return DispatcherCoroutineDispatcher(this)
+}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
deleted file mode 100644
index 44d6010f..00000000
--- a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.common.util
-
-import kotlinx.coroutines.Delay
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.InternalCoroutinesApi
-import java.time.Clock
-import java.util.ArrayDeque
-import java.util.PriorityQueue
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A TimerScheduler facilitates scheduled execution of future tasks.
- *
- * @param context The [CoroutineContext] to run the tasks with.
- * @param clock The clock to keep track of the time.
- */
-public class TimerScheduler<T>(private val context: CoroutineContext, private val clock: Clock) {
- /**
- * The [Delay] instance that provides scheduled execution of [Runnable]s.
- */
- @OptIn(InternalCoroutinesApi::class)
- private val delay =
- requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
-
- /**
- * The stack of the invocations to occur in the future.
- */
- private val invocations = ArrayDeque<Invocation>()
-
- /**
- * A priority queue containing the tasks to be scheduled in the future.
- */
- private val queue = PriorityQueue<Timer<T>>()
-
- /**
- * A map that keeps track of the timers.
- */
- private val timers = mutableMapOf<T, Timer<T>>()
-
- /**
- * Start a timer that will invoke the specified [block] after [delay].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
- *
- * @param key The key of the timer to start.
- * @param delay The delay before invoking the block.
- * @param block The block to invoke.
- */
- public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) {
- startSingleTimerTo(key, clock.millis() + delay, block)
- }
-
- /**
- * Start a timer that will invoke the specified [block] at [timestamp].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
- *
- * @param key The key of the timer to start.
- * @param timestamp The timestamp at which to invoke the block.
- * @param block The block to invoke.
- */
- public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
- val now = clock.millis()
- val queue = queue
- val invocations = invocations
-
- require(timestamp >= now) { "Timestamp must be in the future" }
-
- timers.compute(key) { _, old ->
- if (old != null && old.timestamp == timestamp) {
- // Fast-path: timer for the same timestamp already exists
- old.block = block
- old
- } else {
- // Slow-path: cancel old timer and replace it with new timer
- val timer = Timer(key, timestamp, block)
-
- old?.isCancelled = true
- queue.add(timer)
- trySchedule(now, invocations, timestamp)
-
- timer
- }
- }
- }
-
- /**
- * Check if a timer with a given key is active.
- *
- * @param key The key to check if active.
- * @return `true` if the timer with the specified [key] is active, `false` otherwise.
- */
- public fun isTimerActive(key: T): Boolean = key in timers
-
- /**
- * Cancel a timer with a given key.
- *
- * If canceling a timer that was already canceled, or key never was used to start
- * a timer this operation will do nothing.
- *
- * @param key The key of the timer to cancel.
- */
- public fun cancel(key: T) {
- val timer = timers.remove(key)
-
- // Mark the timer as cancelled
- timer?.isCancelled = true
- }
-
- /**
- * Cancel all timers.
- */
- public fun cancelAll() {
- queue.clear()
- timers.clear()
-
- // Cancel all pending invocations
- for (invocation in invocations) {
- invocation.cancel()
- }
- invocations.clear()
- }
-
- /**
- * Try to schedule an engine invocation at the specified [target].
- *
- * @param now The current virtual timestamp.
- * @param target The virtual timestamp at which the engine invocation should happen.
- * @param scheduled The queue of scheduled invocations.
- */
- private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
- val head = scheduled.peek()
-
- // Only schedule a new scheduler invocation in case the target is earlier than all other pending
- // scheduler invocations
- if (head == null || target < head.timestamp) {
- @OptIn(InternalCoroutinesApi::class)
- val handle = delay.invokeOnTimeout(target - now, ::doRunTimers, context)
- scheduled.addFirst(Invocation(target, handle))
- }
- }
-
- /**
- * This method is invoked when the earliest timer expires.
- */
- private fun doRunTimers() {
- val invocations = invocations
- val invocation = checkNotNull(invocations.poll()) // Clear invocation from future invocation queue
- val now = invocation.timestamp
-
- while (queue.isNotEmpty()) {
- val timer = queue.peek()
-
- val timestamp = timer.timestamp
- val isCancelled = timer.isCancelled
-
- assert(timestamp >= now) { "Found task in the past" }
-
- if (timestamp > now && !isCancelled) {
- // Schedule a task for the next event to occur.
- trySchedule(now, invocations, timestamp)
- break
- }
-
- queue.poll()
-
- if (!isCancelled) {
- timers.remove(timer.key)
- timer()
- }
- }
- }
-
- /**
- * A task that is scheduled to run in the future.
- */
- private class Timer<T>(val key: T, val timestamp: Long, var block: () -> Unit) : Comparable<Timer<T>> {
- /**
- * A flag to indicate that the task has been cancelled.
- */
- @JvmField
- var isCancelled: Boolean = false
-
- /**
- * Run the task.
- */
- operator fun invoke(): Unit = block()
-
- override fun compareTo(other: Timer<T>): Int = timestamp.compareTo(other.timestamp)
- }
-
- /**
- * A future engine invocation.
- *
- * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
- * the invocation is not needed anymore, it can be cancelled via [cancel].
- */
- private class Invocation(
- @JvmField val timestamp: Long,
- @JvmField val handle: DisposableHandle
- ) {
- /**
- * Cancel the engine invocation.
- */
- fun cancel() = handle.dispose()
- }
-}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/DispatcherCoroutineDispatcherTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/DispatcherCoroutineDispatcherTest.kt
new file mode 100644
index 00000000..af8a7857
--- /dev/null
+++ b/opendc-common/src/test/kotlin/org/opendc/common/DispatcherCoroutineDispatcherTest.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common
+
+import kotlinx.coroutines.TimeoutCancellationException
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.withContext
+import kotlinx.coroutines.withTimeout
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Test suite for [DispatcherCoroutineDispatcher].
+ */
+class DispatcherCoroutineDispatcherTest {
+ @Test
+ fun testYield() = runSimulation {
+ withContext(dispatcher.asCoroutineDispatcher()) {
+ val startTime = dispatcher.currentTime
+ yield()
+ assertEquals(startTime, dispatcher.currentTime)
+ }
+ }
+
+ @Test
+ fun testDelay() = runSimulation {
+ withContext(dispatcher.asCoroutineDispatcher()) {
+ val startTime = dispatcher.currentTime
+ delay(10)
+ assertEquals(startTime + 10, dispatcher.currentTime)
+ }
+ }
+
+ @Test
+ fun testTimeout() = runSimulation {
+ withContext(dispatcher.asCoroutineDispatcher()) {
+ assertThrows<TimeoutCancellationException> {
+ withTimeout(10) {
+ delay(1000)
+ }
+ }
+
+ assertEquals(10, dispatcher.currentTime)
+ }
+ }
+}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
index 3fae2ebc..3235b046 100644
--- a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
+++ b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
@@ -28,26 +28,18 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.kotlin.runSimulation
-import java.time.Clock
-import kotlin.coroutines.EmptyCoroutineContext
/**
* Test suite for the [Pacer] class.
*/
class PacerTest {
@Test
- fun testEmptyContext() {
- assertThrows<IllegalArgumentException> { Pacer(EmptyCoroutineContext, Clock.systemUTC(), 100) {} }
- }
-
- @Test
fun testSingleEnqueue() {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -62,7 +54,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -80,7 +72,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -98,7 +90,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -116,7 +108,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
index 22a26111..3947fa2e 100644
--- a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
+++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
@@ -29,25 +29,18 @@ import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.kotlin.runSimulation
-import java.time.Clock
-import kotlin.coroutines.EmptyCoroutineContext
/**
* A test suite for the [TimerScheduler] class.
*/
-internal class TimerSchedulerTest {
- @Test
- fun testEmptyContext() {
- assertThrows<IllegalArgumentException> { TimerScheduler<Unit>(EmptyCoroutineContext, Clock.systemUTC()) }
- }
-
+class TimerSchedulerTest {
@Test
fun testBasicTimer() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) {
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
assertTrue(scheduler.isTimerActive(0))
@@ -58,7 +51,7 @@ internal class TimerSchedulerTest {
@Test
fun testCancelNonExisting() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.cancel(1)
}
@@ -67,7 +60,7 @@ internal class TimerSchedulerTest {
@Test
fun testCancelExisting() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) {
fail()
@@ -76,7 +69,7 @@ internal class TimerSchedulerTest {
scheduler.startSingleTimer(1, 100) {
scheduler.cancel(0)
- assertEquals(100, clock.millis())
+ assertEquals(100, timeSource.millis())
}
}
}
@@ -84,7 +77,7 @@ internal class TimerSchedulerTest {
@Test
fun testCancelAll() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) { fail() }
scheduler.startSingleTimer(1, 100) { fail() }
@@ -95,12 +88,12 @@ internal class TimerSchedulerTest {
@Test
fun testOverride() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) { fail() }
scheduler.startSingleTimer(0, 200) {
- assertEquals(200, clock.millis())
+ assertEquals(200, timeSource.millis())
}
}
}
@@ -108,12 +101,12 @@ internal class TimerSchedulerTest {
@Test
fun testOverrideBlock() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) { fail() }
scheduler.startSingleTimer(0, 1000) {
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
}
}
@@ -121,7 +114,7 @@ internal class TimerSchedulerTest {
@Test
fun testNegativeDelay() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
assertThrows<IllegalArgumentException> {
scheduler.startSingleTimer(1, -1) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 85222c10..9d7dcba6 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -22,15 +22,14 @@
package org.opendc.compute.service
+import org.opendc.common.Dispatcher
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Server
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* The [ComputeService] hosts the API implementation of the OpenDC Compute service.
@@ -80,18 +79,16 @@ public interface ComputeService : AutoCloseable {
/**
* Construct a new [ComputeService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] for scheduling future events.
* @param scheduler The scheduler implementation to use.
* @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
scheduler: ComputeScheduler,
schedulingQuantum: Duration = Duration.ofMinutes(5)
): ComputeService {
- return ComputeServiceImpl(context, clock, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(dispatcher, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index b377c3e3..77932545 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.compute.service.internal
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Flavor
@@ -35,27 +36,23 @@ import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.ArrayDeque
import java.util.Deque
import java.util.Random
import java.util.UUID
-import kotlin.coroutines.CoroutineContext
import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param coroutineContext The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] for scheduling future events.
* @param scheduler The scheduler implementation to use.
* @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
- coroutineContext: CoroutineContext,
- private val clock: Clock,
+ private val dispatcher: Dispatcher,
private val scheduler: ComputeScheduler,
schedulingQuantum: Duration
) : ComputeService, HostListener {
@@ -108,6 +105,7 @@ internal class ComputeServiceImpl(
override val hosts: Set<Host>
get() = hostToView.keys
+ private val clock = dispatcher.timeSource
private var maxCores = 0
private var maxMemory = 0L
private var _attemptsSuccess = 0L
@@ -120,7 +118,7 @@ internal class ComputeServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() }
override fun newClient(): ComputeClient {
check(!isClosed) { "Service is already closed" }
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
index 233f5ef6..0840ba7e 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
@@ -26,7 +26,8 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import org.opendc.compute.service.scheduler.weights.HostWeigher
-import java.util.Random
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
import kotlin.math.min
/**
@@ -39,13 +40,13 @@ import kotlin.math.min
* @param filters The list of filters to apply when searching for an appropriate host.
* @param weighers The list of weighers to apply when searching for an appropriate host.
* @param subsetSize The size of the subset of best hosts from which a target is randomly chosen.
- * @param random A [Random] instance for selecting
+ * @param random A [RandomGenerator] instance for selecting
*/
public class FilterScheduler(
private val filters: List<HostFilter>,
private val weighers: List<HostWeigher>,
private val subsetSize: Int = 1,
- private val random: Random = Random(0)
+ private val random: RandomGenerator = SplittableRandom(0)
) : ComputeScheduler {
/**
* The pool of hosts available to the scheduler.
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index c18709f3..b5685aba 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -62,12 +62,11 @@ internal class ComputeServiceTest {
@BeforeEach
fun setUp() {
scope = SimulationCoroutineScope()
- val clock = scope.clock
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
weighers = listOf(RamWeigher())
)
- service = ComputeService(scope.coroutineContext, clock, computeScheduler)
+ service = ComputeService(scope.dispatcher, computeScheduler)
}
@Test
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 5eccc6ec..a44ccc27 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -45,9 +45,9 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.compute.workload.SimWorkloads
-import java.time.Clock
import java.time.Duration
import java.time.Instant
+import java.time.InstantSource
import java.util.UUID
import java.util.function.Supplier
@@ -68,7 +68,7 @@ public class SimHost(
override val uid: UUID,
override val name: String,
override val meta: Map<String, Any>,
- private val clock: Clock,
+ private val clock: InstantSource,
private val machine: SimBareMetalMachine,
private val hypervisor: SimHypervisor,
private val mapper: SimWorkloadMapper = DefaultWorkloadMapper,
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
index 258ccc89..d34f70d7 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
@@ -23,7 +23,7 @@
package org.opendc.compute.simulator.failure
import org.opendc.compute.simulator.SimHost
-import java.time.Clock
+import java.time.InstantSource
/**
* Interface responsible for applying the fault to a host.
@@ -32,5 +32,5 @@ public interface HostFault {
/**
* Apply the fault to the specified [victims].
*/
- public suspend fun apply(clock: Clock, victims: List<SimHost>)
+ public suspend fun apply(clock: InstantSource, victims: List<SimHost>)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
index 5eff439f..afbb99d2 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
@@ -26,6 +26,7 @@ import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.internal.HostFaultInjectorImpl
import java.time.Clock
+import java.time.InstantSource
import kotlin.coroutines.CoroutineContext
/**
@@ -55,7 +56,7 @@ public interface HostFaultInjector : AutoCloseable {
*/
public operator fun invoke(
context: CoroutineContext,
- clock: Clock,
+ clock: InstantSource,
hosts: Set<SimHost>,
iat: RealDistribution,
selector: VictimSelector,
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
index fc7cebfc..8bd25391 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
@@ -25,14 +25,14 @@ package org.opendc.compute.simulator.failure
import kotlinx.coroutines.delay
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.simulator.SimHost
-import java.time.Clock
+import java.time.InstantSource
import kotlin.math.roundToLong
/**
* A type of [HostFault] where the hosts are stopped and recover after some random amount of time.
*/
public class StartStopHostFault(private val duration: RealDistribution) : HostFault {
- override suspend fun apply(clock: Clock, victims: List<SimHost>) {
+ override suspend fun apply(clock: InstantSource, victims: List<SimHost>) {
for (host in victims) {
host.fail()
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
index b6d466bd..4aba0e91 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
@@ -24,7 +24,9 @@ package org.opendc.compute.simulator.failure
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.simulator.SimHost
-import java.util.Random
+import java.util.ArrayList
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
import kotlin.math.roundToInt
/**
@@ -32,12 +34,30 @@ import kotlin.math.roundToInt
*/
public class StochasticVictimSelector(
private val size: RealDistribution,
- private val random: Random = Random(0)
+ private val random: RandomGenerator = SplittableRandom(0)
) : VictimSelector {
override fun select(hosts: Set<SimHost>): List<SimHost> {
val n = size.sample().roundToInt()
- return hosts.shuffled(random).take(n)
+ val result = ArrayList<SimHost>(n)
+
+ val random = random
+ var samplesNeeded = n
+ var remainingHosts = hosts.size
+ val iterator = hosts.iterator()
+
+ while (iterator.hasNext() && samplesNeeded > 0) {
+ val host = iterator.next()
+
+ if (random.nextInt(remainingHosts) < samplesNeeded) {
+ result.add(host)
+ samplesNeeded--
+ }
+
+ remainingHosts--
+ }
+
+ return result
}
override fun toString(): String = "StochasticVictimSelector[$size]"
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index ca947625..02766cb1 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -32,15 +32,15 @@ import org.opendc.compute.simulator.SimWorkloadMapper
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimVirtualMachine
-import java.time.Clock
import java.time.Duration
import java.time.Instant
+import java.time.InstantSource
/**
* A virtual machine instance that is managed by a [SimHost].
*/
internal class Guest(
- private val clock: Clock,
+ private val clock: InstantSource,
val host: SimHost,
private val hypervisor: SimHypervisor,
private val mapper: SimWorkloadMapper,
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
index f03bffe9..afc0b0d4 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
@@ -32,7 +32,7 @@ import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.failure.HostFault
import org.opendc.compute.simulator.failure.HostFaultInjector
import org.opendc.compute.simulator.failure.VictimSelector
-import java.time.Clock
+import java.time.InstantSource
import kotlin.coroutines.CoroutineContext
import kotlin.math.roundToLong
@@ -40,7 +40,7 @@ import kotlin.math.roundToLong
* Internal implementation of the [HostFaultInjector] interface.
*
* @param context The scope to run the fault injector in.
- * @param clock The [Clock] to keep track of simulation time.
+ * @param clock The [InstantSource] to keep track of simulation time.
* @param hosts The set of hosts to inject faults into.
* @param iat The inter-arrival time distribution of the failures (in hours).
* @param selector The [VictimSelector] to select the host victims.
@@ -48,7 +48,7 @@ import kotlin.math.roundToLong
*/
internal class HostFaultInjectorImpl(
private val context: CoroutineContext,
- private val clock: Clock,
+ private val clock: InstantSource,
private val hosts: Set<SimHost>,
private val iat: RealDistribution,
private val selector: VictimSelector,
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index fc581d3e..a496cc99 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -75,7 +75,7 @@ internal class SimHostTest {
fun testSingle() = runSimulation {
val duration = 5 * 60L
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -85,7 +85,7 @@ internal class SimHostTest {
uid = UUID.randomUUID(),
name = "test",
meta = emptyMap(),
- clock,
+ timeSource,
machine,
hypervisor
)
@@ -131,7 +131,7 @@ internal class SimHostTest {
{ assertEquals(639, cpuStats.activeTime, "Active time does not match") },
{ assertEquals(2360, cpuStats.idleTime, "Idle time does not match") },
{ assertEquals(56, cpuStats.stealTime, "Steal time does not match") },
- { assertEquals(1500001, clock.millis()) }
+ { assertEquals(1500001, timeSource.millis()) }
)
}
@@ -142,7 +142,7 @@ internal class SimHostTest {
fun testOvercommitted() = runSimulation {
val duration = 5 * 60L
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -152,7 +152,7 @@ internal class SimHostTest {
uid = UUID.randomUUID(),
name = "test",
meta = emptyMap(),
- clock,
+ timeSource,
machine,
hypervisor
)
@@ -218,7 +218,7 @@ internal class SimHostTest {
{ assertEquals(658, cpuStats.activeTime, "Active time does not match") },
{ assertEquals(2341, cpuStats.idleTime, "Idle time does not match") },
{ assertEquals(637, cpuStats.stealTime, "Steal time does not match") },
- { assertEquals(1500001, clock.millis()) }
+ { assertEquals(1500001, timeSource.millis()) }
)
}
@@ -229,7 +229,7 @@ internal class SimHostTest {
fun testFailure() = runSimulation {
val duration = 5 * 60L
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -238,7 +238,7 @@ internal class SimHostTest {
uid = UUID.randomUUID(),
name = "test",
meta = emptyMap(),
- clock,
+ timeSource,
machine,
hypervisor
)
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
index 90f534e6..29d0b5e7 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
@@ -30,15 +30,15 @@ import org.apache.commons.math3.random.Well19937c
import org.junit.jupiter.api.Test
import org.opendc.compute.simulator.SimHost
import org.opendc.simulator.kotlin.runSimulation
-import java.time.Clock
import java.time.Duration
+import java.time.InstantSource
import kotlin.coroutines.CoroutineContext
import kotlin.math.ln
/**
* Test suite for [HostFaultInjector] class.
*/
-internal class HostFaultInjectorTest {
+class HostFaultInjectorTest {
/**
* Simple test case to test that nothing happens when the injector is not started.
*/
@@ -46,7 +46,7 @@ internal class HostFaultInjectorTest {
fun testInjectorNotStarted() = runSimulation {
val host = mockk<SimHost>(relaxUnitFun = true)
- val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+ val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host))
coVerify(exactly = 0) { host.fail() }
coVerify(exactly = 0) { host.recover() }
@@ -61,7 +61,7 @@ internal class HostFaultInjectorTest {
fun testInjectorStopsMachine() = runSimulation {
val host = mockk<SimHost>(relaxUnitFun = true)
- val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+ val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host))
injector.start()
@@ -83,7 +83,7 @@ internal class HostFaultInjectorTest {
mockk(relaxUnitFun = true)
)
- val injector = createSimpleInjector(coroutineContext, clock, hosts.toSet())
+ val injector = createSimpleInjector(coroutineContext, timeSource, hosts.toSet())
injector.start()
@@ -100,7 +100,7 @@ internal class HostFaultInjectorTest {
/**
* Create a simple start stop fault injector.
*/
- private fun createSimpleInjector(context: CoroutineContext, clock: Clock, hosts: Set<SimHost>): HostFaultInjector {
+ private fun createSimpleInjector(context: CoroutineContext, clock: InstantSource, hosts: Set<SimHost>): HostFaultInjector {
val rng = Well19937c(0)
val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03)
val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25))
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt
index 7fe3a2eb..eae5806e 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt
@@ -22,13 +22,12 @@
package org.opendc.experiments.provisioner
+import org.opendc.common.Dispatcher
import org.opendc.experiments.MutableServiceRegistry
import org.opendc.experiments.ServiceRegistry
import org.opendc.experiments.internal.ServiceRegistryImpl
-import java.time.Clock
import java.util.ArrayDeque
import java.util.SplittableRandom
-import kotlin.coroutines.CoroutineContext
/**
* A helper class to set up the experimental environment in a reproducible manner.
@@ -37,17 +36,15 @@ import kotlin.coroutines.CoroutineContext
* [ProvisioningStep]s are executed sequentially and ensure that the necessary infrastructure is configured and teared
* down after the simulation completes.
*
- * @param coroutineContext The [CoroutineContext] in which the environment is set up.
- * @param clock The simulation [Clock].
+ * @param dispatcher The [Dispatcher] implementation for scheduling future tasks.
* @param seed A seed for initializing the randomness of the environment.
*/
-public class Provisioner(coroutineContext: CoroutineContext, clock: Clock, seed: Long) : AutoCloseable {
+public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable {
/**
* Implementation of [ProvisioningContext].
*/
private val context = object : ProvisioningContext {
- override val clock: Clock = clock
- override val coroutineContext: CoroutineContext = coroutineContext
+ override val dispatcher: Dispatcher = dispatcher
override val seeder: SplittableRandom = SplittableRandom(seed)
override val registry: MutableServiceRegistry = ServiceRegistryImpl()
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt
index 73897315..e53044ce 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt
@@ -22,31 +22,26 @@
package org.opendc.experiments.provisioner
+import org.opendc.common.Dispatcher
import org.opendc.experiments.MutableServiceRegistry
-import java.time.Clock
import java.util.SplittableRandom
-import kotlin.coroutines.CoroutineContext
+import java.util.random.RandomGenerator
/**
* The [ProvisioningContext] class provides access to shared state between subsequent [ProvisioningStep]s, as well as
- * access to the simulation dispatcher (via [CoroutineContext]), the virtual clock, and a randomness seeder to allow
+ * access to the simulation dispatcher, the virtual clock, and a randomness seeder to allow
* the provisioning steps to initialize the (simulated) resources.
*/
public interface ProvisioningContext {
/**
- * The [CoroutineContext] in which the provisioner runs.
+ * The [Dispatcher] provided by the provisioner to schedule future events during the simulation.
*/
- public val coroutineContext: CoroutineContext
-
- /**
- * The [Clock] tracking the virtual simulation time.
- */
- public val clock: Clock
+ public val dispatcher: Dispatcher
/**
* A [SplittableRandom] instance used to seed the provisioners.
*/
- public val seeder: SplittableRandom
+ public val seeder: RandomGenerator
/**
* A [MutableServiceRegistry] where the provisioned services are registered.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
index 3e3d758d..1221f084 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
@@ -75,7 +75,7 @@ class CapelinBenchmarks {
fun benchmarkCapelin() = runSimulation {
val serviceDomain = "compute.opendc.org"
- Provisioner(coroutineContext, clock, seed = 0).use { provisioner ->
+ Provisioner(dispatcher, seed = 0).use { provisioner ->
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -87,7 +87,7 @@ class CapelinBenchmarks {
)
val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
- service.replay(clock, vms, 0L, interference = true)
+ service.replay(timeSource, vms, 0L, interference = true)
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
index 2c3573dc..2567a4d5 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
@@ -64,7 +64,7 @@ public class CapelinRunner(
val serviceDomain = "compute.opendc.org"
val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt"))
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }),
setupHosts(serviceDomain, topology, optimize = true)
@@ -96,7 +96,7 @@ public class CapelinRunner(
null
}
- service.replay(clock, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference)
+ service.replay(timeSource, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference)
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
index 0b4cafa6..3a2acbd7 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
@@ -34,8 +34,9 @@ import org.opendc.simulator.compute.power.CpuPowerModel
import org.opendc.simulator.compute.power.CpuPowerModels
import java.io.File
import java.io.InputStream
-import java.util.Random
+import java.util.SplittableRandom
import java.util.UUID
+import java.util.random.RandomGenerator
import kotlin.math.roundToLong
/**
@@ -49,7 +50,7 @@ private val reader = ClusterSpecReader()
fun clusterTopology(
file: File,
powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0),
- random: Random = Random(0)
+ random: RandomGenerator = SplittableRandom(0)
): List<HostSpec> {
return clusterTopology(reader.read(file), powerModel, random)
}
@@ -60,7 +61,7 @@ fun clusterTopology(
fun clusterTopology(
input: InputStream,
powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0),
- random: Random = Random(0)
+ random: RandomGenerator = SplittableRandom(0)
): List<HostSpec> {
return clusterTopology(reader.read(input), powerModel, random)
}
@@ -68,14 +69,14 @@ fun clusterTopology(
/**
* Construct a topology from the given list of [clusters].
*/
-fun clusterTopology(clusters: List<ClusterSpec>, powerModel: CpuPowerModel, random: Random = Random(0)): List<HostSpec> {
+fun clusterTopology(clusters: List<ClusterSpec>, powerModel: CpuPowerModel, random: RandomGenerator = SplittableRandom(0)): List<HostSpec> {
return clusters.flatMap { it.toHostSpecs(random, powerModel) }
}
/**
* Helper method to convert a [ClusterSpec] into a list of [HostSpec]s.
*/
-private fun ClusterSpec.toHostSpecs(random: Random, powerModel: CpuPowerModel): List<HostSpec> {
+private fun ClusterSpec.toHostSpecs(random: RandomGenerator, powerModel: CpuPowerModel): List<HostSpec> {
val cpuSpeed = cpuSpeed
val memoryPerHost = memCapacityPerHost.roundToLong()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 77b0d09f..7e01bb64 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -94,7 +94,7 @@ class CapelinIntegrationTest {
val topology = createTopology()
val monitor = monitor
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -102,7 +102,7 @@ class CapelinIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(clock, workload, seed)
+ service.replay(timeSource, workload, seed)
}
println(
@@ -138,7 +138,7 @@ class CapelinIntegrationTest {
val topology = createTopology("single")
val monitor = monitor
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -146,7 +146,7 @@ class CapelinIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(clock, workload, seed)
+ service.replay(timeSource, workload, seed)
}
println(
@@ -177,7 +177,7 @@ class CapelinIntegrationTest {
val workload = createTestWorkload(1.0, seed)
val topology = createTopology("single")
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -185,7 +185,7 @@ class CapelinIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(clock, workload, seed, interference = true)
+ service.replay(timeSource, workload, seed, interference = true)
}
println(
@@ -216,7 +216,7 @@ class CapelinIntegrationTest {
val workload = createTestWorkload(0.25, seed)
val monitor = monitor
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -224,7 +224,7 @@ class CapelinIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(clock, workload, seed, failureModel = grid5000(Duration.ofDays(7)))
+ service.replay(timeSource, workload, seed, failureModel = grid5000(Duration.ofDays(7)))
}
// Note that these values have been verified beforehand
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt
index bbc70489..125ba6ef 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt
@@ -34,12 +34,13 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import java.util.Random
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
/**
* Create a [ComputeScheduler] for the experiment.
*/
-public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler {
+public fun createComputeScheduler(name: String, seeder: RandomGenerator, placements: Map<String, String> = emptyMap()): ComputeScheduler {
val cpuAllocationRatio = 16.0
val ramAllocationRatio = 1.5
return when (name) {
@@ -79,7 +80,7 @@ public fun createComputeScheduler(name: String, seeder: Random, placements: Map<
filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
weighers = emptyList(),
subsetSize = Int.MAX_VALUE,
- random = Random(seeder.nextLong())
+ random = SplittableRandom(seeder.nextLong())
)
"replay" -> ReplayScheduler(placements)
else -> throw IllegalArgumentException("Unknown policy $name")
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt
index 38cbf2dc..d7347327 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt
@@ -41,7 +41,7 @@ public class ComputeServiceProvisioningStep internal constructor(
private val schedulingQuantum: Duration
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
- val service = ComputeService(ctx.coroutineContext, ctx.clock, scheduler(ctx), schedulingQuantum)
+ val service = ComputeService(ctx.dispatcher, scheduler(ctx), schedulingQuantum)
ctx.registry.register(serviceDomain, ComputeService::class.java, service)
return AutoCloseable { service.close() }
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt
index 2200880d..b7884293 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.compute
-import java.util.Random
+import java.util.random.RandomGenerator
/**
* An interface that describes how a workload is resolved.
@@ -31,5 +31,5 @@ public interface ComputeWorkload {
/**
* Resolve the workload into a list of [VirtualMachine]s to simulate.
*/
- public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
+ public fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine>
}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt
index 81a5cf33..eb85dbb8 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt
@@ -24,8 +24,8 @@ package org.opendc.experiments.compute
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.failure.HostFaultInjector
-import java.time.Clock
-import java.util.Random
+import java.time.InstantSource
+import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
/**
@@ -37,8 +37,8 @@ public interface FailureModel {
*/
public fun createInjector(
context: CoroutineContext,
- clock: Clock,
+ clock: InstantSource,
service: ComputeService,
- random: Random
+ random: RandomGenerator
): HostFaultInjector
}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt
index ff747066..679e370a 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt
@@ -31,9 +31,9 @@ import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.failure.HostFaultInjector
import org.opendc.compute.simulator.failure.StartStopHostFault
import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import java.time.Clock
import java.time.Duration
-import java.util.Random
+import java.time.InstantSource
+import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
import kotlin.math.ln
@@ -47,9 +47,9 @@ public fun grid5000(failureInterval: Duration): FailureModel {
return object : FailureModel {
override fun createInjector(
context: CoroutineContext,
- clock: Clock,
+ clock: InstantSource,
service: ComputeService,
- random: Random
+ random: RandomGenerator
): HostFaultInjector {
val rng = Well19937c(random.nextLong())
val hosts = service.hosts.map { it as SimHost }.toSet()
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
index e224fb84..310aa54c 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
@@ -46,7 +46,7 @@ public class HostsProvisioningStep internal constructor(
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" }
- val engine = FlowEngine.create(ctx.coroutineContext, ctx.clock)
+ val engine = FlowEngine.create(ctx.dispatcher)
val graph = engine.newGraph()
val hosts = mutableSetOf<SimHost>()
@@ -58,7 +58,7 @@ public class HostsProvisioningStep internal constructor(
spec.uid,
spec.name,
spec.meta,
- ctx.clock,
+ ctx.dispatcher.timeSource,
machine,
hypervisor,
optimize = optimize
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
index f0e31932..16d28edb 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
@@ -29,7 +29,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
import org.opendc.compute.service.ComputeService
-import java.time.Clock
+import java.time.InstantSource
import java.util.Random
import kotlin.coroutines.coroutineContext
import kotlin.math.max
@@ -45,7 +45,7 @@ import kotlin.math.max
* @param interference A flag to indicate that VM interference needs to be enabled.
*/
public suspend fun ComputeService.replay(
- clock: Clock,
+ clock: InstantSource,
trace: List<VirtualMachine>,
seed: Long,
submitImmediately: Boolean = false,
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt
index 3a7a51f2..ca23a7c5 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt
@@ -26,7 +26,7 @@ import mu.KotlinLogging
import org.opendc.experiments.compute.ComputeWorkload
import org.opendc.experiments.compute.ComputeWorkloadLoader
import org.opendc.experiments.compute.VirtualMachine
-import java.util.Random
+import java.util.random.RandomGenerator
/**
* A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads.
@@ -37,7 +37,7 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> {
val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt
index a6055762..583405da 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt
@@ -26,8 +26,8 @@ import mu.KotlinLogging
import org.opendc.experiments.compute.ComputeWorkload
import org.opendc.experiments.compute.ComputeWorkloadLoader
import org.opendc.experiments.compute.VirtualMachine
-import java.util.Random
import java.util.UUID
+import java.util.random.RandomGenerator
/**
* A [ComputeWorkload] that samples HPC VMs in the workload.
@@ -46,7 +46,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
*/
private val pattern = Regex("^(ComputeNode|cn).*")
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> {
val vms = source.resolve(loader, random)
val (hpc, nonHpc) = vms.partition { entry ->
@@ -58,7 +58,6 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
.map { index ->
val res = mutableListOf<VirtualMachine>()
hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
res
}
.flatten()
@@ -67,7 +66,6 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
.map { index ->
val res = mutableListOf<VirtualMachine>()
nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
res
}
.flatten()
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt
index 793f1de9..ffb7e0c6 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt
@@ -26,7 +26,7 @@ import mu.KotlinLogging
import org.opendc.experiments.compute.ComputeWorkload
import org.opendc.experiments.compute.ComputeWorkloadLoader
import org.opendc.experiments.compute.VirtualMachine
-import java.util.Random
+import java.util.random.RandomGenerator
/**
* A [ComputeWorkload] that is sampled based on total load.
@@ -37,7 +37,7 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> {
val vms = source.resolve(loader, random)
val res = mutableListOf<VirtualMachine>()
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt
index b4e9005f..d9e311cd 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt
@@ -25,13 +25,13 @@ package org.opendc.experiments.compute.internal
import org.opendc.experiments.compute.ComputeWorkload
import org.opendc.experiments.compute.ComputeWorkloadLoader
import org.opendc.experiments.compute.VirtualMachine
-import java.util.Random
+import java.util.random.RandomGenerator
/**
* A [ComputeWorkload] from a trace.
*/
internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> {
return loader.get(name, format)
}
}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
index ac058171..efd38a3c 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
@@ -27,6 +27,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.compute.api.Server
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
@@ -35,7 +37,6 @@ import org.opendc.experiments.compute.telemetry.table.HostTableReader
import org.opendc.experiments.compute.telemetry.table.ServerInfo
import org.opendc.experiments.compute.telemetry.table.ServerTableReader
import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
-import java.time.Clock
import java.time.Duration
import java.time.Instant
@@ -43,20 +44,20 @@ import java.time.Instant
* A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
* export interval.
*
- * @param scope The [CoroutineScope] to run the reader in.
- * @param clock The virtual clock.
+ * @param dispatcher A [Dispatcher] for scheduling the future events.
* @param service The [ComputeService] to monitor.
* @param monitor The monitor to export the metrics to.
* @param exportInterval The export interval.
*/
public class ComputeMetricReader(
- scope: CoroutineScope,
- clock: Clock,
+ dispatcher: Dispatcher,
private val service: ComputeService,
private val monitor: ComputeMonitor,
private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher())
+ private val clock = dispatcher.timeSource
/**
* Aggregator for service metrics.
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
index 68ca5ae8..665611dd 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
@@ -22,9 +22,6 @@
package org.opendc.experiments.compute.telemetry
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import org.opendc.compute.service.ComputeService
import org.opendc.experiments.provisioner.ProvisioningContext
import org.opendc.experiments.provisioner.ProvisioningStep
@@ -40,13 +37,8 @@ public class ComputeMonitorProvisioningStep internal constructor(
private val exportInterval: Duration
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
- val scope = CoroutineScope(ctx.coroutineContext + Job())
val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" }
- val metricReader = ComputeMetricReader(scope, ctx.clock, service, monitor, exportInterval)
-
- return AutoCloseable {
- metricReader.close()
- scope.cancel()
- }
+ val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval)
+ return AutoCloseable { metricReader.close() }
}
}
diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt
index 3b4200c8..e5c2f86a 100644
--- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt
@@ -56,10 +56,9 @@ public class FaaSServiceProvisioningStep internal constructor(
} else {
ZeroDelayInjector
}
- val deployer = SimFunctionDeployer(ctx.coroutineContext, ctx.clock, machineModel, delayInjector)
+ val deployer = SimFunctionDeployer(ctx.dispatcher, machineModel, delayInjector)
val service = FaaSService(
- ctx.coroutineContext,
- ctx.clock,
+ ctx.dispatcher,
deployer,
routingPolicy(ctx),
terminationPolicy(ctx)
diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt
index c4001e2e..7a354d69 100644
--- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt
@@ -28,16 +28,16 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.opendc.faas.service.FaaSService
-import java.time.Clock
+import java.time.InstantSource
import kotlin.math.max
/**
* Run a simulation of the [FaaSService] by replaying the workload trace given by [trace].
*
- * @param clock A [Clock] instance tracking simulation time.
+ * @param clock An [InstantSource] instance tracking simulation time.
* @param trace The trace to simulate.
*/
-public suspend fun FaaSService.replay(clock: Clock, trace: List<FunctionTrace>) {
+public suspend fun FaaSService.replay(clock: InstantSource, trace: List<FunctionTrace>) {
val client = newClient()
try {
coroutineScope {
diff --git a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt
index 1ad9c57f..4a4d9ae0 100644
--- a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt
+++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt
@@ -49,12 +49,12 @@ class FaaSExperiment {
fun testSmoke() = runSimulation {
val faasService = "faas.opendc.org"
- Provisioner(coroutineContext, clock, seed = 0L).use { provisioner ->
+ Provisioner(dispatcher, seed = 0L).use { provisioner ->
provisioner.runStep(
setupFaaSService(
faasService,
{ RandomRoutingPolicy() },
- { FunctionTerminationPolicyFixed(it.coroutineContext, it.clock, timeout = Duration.ofMinutes(10)) },
+ { FunctionTerminationPolicyFixed(it.dispatcher, timeout = Duration.ofMinutes(10)) },
createMachineModel(),
coldStartModel = ColdStartModel.GOOGLE
)
@@ -63,7 +63,7 @@ class FaaSExperiment {
val service = provisioner.registry.resolve(faasService, FaaSService::class.java)!!
val trace = ServerlessTraceReader().parse(File("src/test/resources/trace"))
- service.replay(clock, trace)
+ service.replay(timeSource, trace)
val stats = service.getSchedulerStats()
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index eb308970..53bf5aa6 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -22,12 +22,9 @@
package org.opendc.experiments.tf20.core
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimMachineContext
@@ -36,17 +33,14 @@ import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.CpuPowerModel
-import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow2.FlowEngine
import org.opendc.simulator.flow2.FlowStage
import org.opendc.simulator.flow2.FlowStageLogic
import org.opendc.simulator.flow2.OutPort
-import java.time.Clock
import java.util.ArrayDeque
import java.util.UUID
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.math.ceil
import kotlin.math.roundToLong
@@ -57,22 +51,16 @@ import kotlin.math.roundToLong
public class SimTFDevice(
override val uid: UUID,
override val isGpu: Boolean,
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
pu: ProcessingUnit,
private val memory: MemoryUnit,
powerModel: CpuPowerModel
) : TFDevice {
/**
- * The scope in which the device runs.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The [SimMachine] representing the device.
*/
private val machine = SimBareMetalMachine.create(
- FlowEngine.create(context, clock).newGraph(),
+ FlowEngine.create(dispatcher).newGraph(),
MachineModel(listOf(pu), listOf(memory)),
SimPsuFactories.simple(powerModel)
)
@@ -162,9 +150,7 @@ public class SimTFDevice(
}
init {
- scope.launch {
- machine.runWorkload(workload)
- }
+ machine.startWorkload(workload, emptyMap()) {}
}
override suspend fun load(dataSize: Long) {
@@ -185,7 +171,6 @@ public class SimTFDevice(
override fun close() {
machine.cancel()
- scope.cancel()
}
private data class Work(var flops: Double, val cont: Continuation<Unit>) {
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
index 7d65a674..5b408fb3 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
@@ -23,19 +23,18 @@
package org.opendc.experiments.tf20.network
import kotlinx.coroutines.channels.Channel
+import org.opendc.common.Dispatcher
import org.opendc.common.util.TimerScheduler
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
/**
* The network controller represents a simple network model between the worker and master nodes during
* TensorFlow execution.
*/
-public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCloseable {
+public class NetworkController(dispatcher: Dispatcher) : AutoCloseable {
/**
* The scheduler for the message.
*/
- private val scheduler = TimerScheduler<Message>(context, clock)
+ private val scheduler = TimerScheduler<Message>(dispatcher)
/**
* The outbound communication channels.
diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
index 32f72686..899aafc0 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
@@ -48,8 +48,7 @@ class TensorFlowTest {
val device = SimTFDevice(
def.uid,
def.meta["gpu"] as Boolean,
- coroutineContext,
- clock,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -67,7 +66,7 @@ class TensorFlowTest {
val stats = device.getDeviceStats()
assertAll(
- { assertEquals(3309694252, clock.millis()) },
+ { assertEquals(3309694252, timeSource.millis()) },
{ assertEquals(8.27423563E8, stats.energyUsage) }
)
}
@@ -83,8 +82,7 @@ class TensorFlowTest {
val device = SimTFDevice(
def.uid,
def.meta["gpu"] as Boolean,
- coroutineContext,
- clock,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -102,7 +100,7 @@ class TensorFlowTest {
val stats = device.getDeviceStats()
assertAll(
- { assertEquals(176230328513, clock.millis()) },
+ { assertEquals(176230328513, timeSource.millis()) },
{ assertEquals(4.405758212825E10, stats.energyUsage) }
)
}
@@ -118,8 +116,7 @@ class TensorFlowTest {
val deviceA = SimTFDevice(
def.uid,
def.meta["gpu"] as Boolean,
- coroutineContext,
- clock,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -128,8 +125,7 @@ class TensorFlowTest {
val deviceB = SimTFDevice(
UUID.randomUUID(),
def.meta["gpu"] as Boolean,
- coroutineContext,
- clock,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -150,7 +146,7 @@ class TensorFlowTest {
val statsA = deviceA.getDeviceStats()
val statsB = deviceB.getDeviceStats()
assertAll(
- { assertEquals(1704994000, clock.millis()) },
+ { assertEquals(1704994000, timeSource.millis()) },
{ assertEquals(4.262485E8, statsA.energyUsage) },
{ assertEquals(4.262485E8, statsB.energyUsage) }
)
diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
index 910cbcc9..549c6f3e 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
@@ -47,8 +47,7 @@ internal class SimTFDeviceTest {
val device = SimTFDevice(
UUID.randomUUID(),
isGpu = true,
- coroutineContext,
- clock,
+ dispatcher,
pu,
memory,
CpuPowerModels.linear(250.0, 100.0)
@@ -56,7 +55,7 @@ internal class SimTFDeviceTest {
// Load 1 GiB into GPU memory
device.load(1000)
- assertEquals(1140, clock.millis())
+ assertEquals(1140, timeSource.millis())
coroutineScope {
launch { device.compute(1e6) }
@@ -68,7 +67,7 @@ internal class SimTFDeviceTest {
val stats = device.getDeviceStats()
assertAll(
- { assertEquals(3681, clock.millis()) },
+ { assertEquals(3681, timeSource.millis()) },
{ assertEquals(749.25, stats.energyUsage) }
)
}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
index b622362a..2037dad4 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
@@ -42,7 +42,7 @@ import org.opendc.workflow.api.Task
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
import org.opendc.workflow.service.WorkflowService
-import java.time.Clock
+import java.time.InstantSource
import java.util.UUID
import kotlin.collections.HashMap
import kotlin.collections.HashSet
@@ -110,7 +110,7 @@ public fun Trace.toJobs(): List<Job> {
/**
* Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished.
*/
-public suspend fun WorkflowService.replay(clock: Clock, jobs: List<Job>) {
+public suspend fun WorkflowService.replay(clock: InstantSource, jobs: List<Job>) {
// Sort jobs by their arrival time
val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
if (orderedJobs.isEmpty()) {
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
index 5cee9abf..fe4fde17 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
@@ -47,8 +47,7 @@ public class WorkflowServiceProvisioningStep internal constructor(
val client = computeService.newClient()
val service = WorkflowService(
- ctx.coroutineContext,
- ctx.clock,
+ ctx.dispatcher,
client,
scheduler.schedulingQuantum,
jobAdmissionPolicy = scheduler.jobAdmissionPolicy,
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 7b40d867..96619cdb 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -22,6 +22,7 @@
package org.opendc.faas.service
+import org.opendc.common.Dispatcher
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
@@ -30,9 +31,7 @@ import org.opendc.faas.service.internal.FaaSServiceImpl
import org.opendc.faas.service.router.RoutingPolicy
import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* The [FaaSService] hosts the service implementation of the OpenDC FaaS platform.
@@ -62,22 +61,20 @@ public interface FaaSService : AutoCloseable {
/**
* Construct a new [FaaSService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] used for scheduling events.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
* @param terminationPolicy The policy for terminating function instances.
* @param quantum The scheduling quantum of the service (100 ms default)
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
terminationPolicy: FunctionTerminationPolicy,
quantum: Duration = Duration.ofMillis(100)
): FaaSService {
- return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum)
+ return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum)
}
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
index d579ad0c..a2c371e1 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -22,12 +22,11 @@
package org.opendc.faas.service.autoscaler
+import org.opendc.common.Dispatcher
import org.opendc.common.util.TimerScheduler
import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceState
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time.
@@ -35,14 +34,13 @@ import kotlin.coroutines.CoroutineContext
* @param timeout The idle timeout after which the function instance is terminated.
*/
public class FunctionTerminationPolicyFixed(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
public val timeout: Duration
) : FunctionTerminationPolicy {
/**
* The [TimerScheduler] used to schedule the function terminations.
*/
- private val scheduler = TimerScheduler<FunctionInstance>(context, clock)
+ private val scheduler = TimerScheduler<FunctionInstance>(dispatcher)
override fun enqueue(instance: FunctionInstance) {
// Cancel the existing timeout timer
@@ -61,6 +59,6 @@ public class FunctionTerminationPolicyFixed(
* Schedule termination for the specified [instance].
*/
private fun schedule(instance: FunctionInstance) {
- scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() }
+ scheduler.startSingleTimer(instance, timeout.toMillis()) { instance.close() }
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index 901213af..b1e6b3f5 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -22,13 +22,11 @@
package org.opendc.faas.service.internal
-import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.suspendCancellableCoroutine
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
import org.opendc.common.util.Pacer
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
@@ -43,13 +41,12 @@ import org.opendc.faas.service.router.RoutingPolicy
import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
import java.lang.IllegalStateException
-import java.time.Clock
import java.time.Duration
+import java.time.InstantSource
import java.util.ArrayDeque
import java.util.Random
import java.util.UUID
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resumeWithException
/**
@@ -60,19 +57,13 @@ import kotlin.coroutines.resumeWithException
* this component queues the events to await the deployment of new instances.
*/
internal class FaaSServiceImpl(
- context: CoroutineContext,
- private val clock: Clock,
+ dispatcher: Dispatcher,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
private val terminationPolicy: FunctionTerminationPolicy,
quantum: Duration
) : FaaSService, FunctionInstanceListener {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -80,7 +71,12 @@ internal class FaaSServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, quantum.toMillis()) { doSchedule() }
+
+ /**
+ * The [InstantSource] instance representing the clock.
+ */
+ private val clock = dispatcher.timeSource
/**
* The [Random] instance used to generate unique identifiers for the objects.
@@ -266,8 +262,6 @@ internal class FaaSServiceImpl(
}
override fun close() {
- scope.cancel()
-
// Stop all function instances
for ((_, function) in functions) {
function.close()
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt
index 5bd9d4d3..22bf7266 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt
@@ -24,13 +24,15 @@ package org.opendc.faas.service.router
import org.opendc.faas.service.FunctionObject
import org.opendc.faas.service.deployer.FunctionInstance
-import kotlin.random.Random
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
/**
* A [RoutingPolicy] that selects a random function instance.
*/
-public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy {
+public class RandomRoutingPolicy(private val random: RandomGenerator = SplittableRandom(0)) : RoutingPolicy {
override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance {
- return instances.random(random)
+ val idx = random.nextInt(instances.size)
+ return instances.elementAt(idx)
}
}
diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
index 97ffc5a5..9676744b 100644
--- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
@@ -44,7 +44,7 @@ internal class FaaSServiceTest {
@Test
fun testClientState() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -58,7 +58,7 @@ internal class FaaSServiceTest {
@Test
fun testClientInvokeUnknown() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -67,7 +67,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCreation() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -78,7 +78,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionQuery() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -91,7 +91,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindById() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -104,7 +104,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindByName() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -117,7 +117,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDuplicateName() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -128,7 +128,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDelete() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -142,7 +142,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -155,7 +155,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionInvoke() = runSimulation {
val deployer = mockk<FunctionDeployer>()
- val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true))
+ val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true))
every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
index 307ad5a5..47b4d4fa 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
@@ -31,6 +31,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.faas.service.FunctionObject
import org.opendc.faas.service.deployer.FunctionDeployer
import org.opendc.faas.service.deployer.FunctionInstance
@@ -44,10 +46,8 @@ import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.flow2.FlowEngine
-import java.time.Clock
import java.util.ArrayDeque
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
@@ -55,8 +55,7 @@ import kotlin.coroutines.resumeWithException
* A [FunctionDeployer] that uses that simulates the [FunctionInstance]s.
*/
public class SimFunctionDeployer(
- context: CoroutineContext,
- private val clock: Clock,
+ private val dispatcher: Dispatcher,
private val model: MachineModel,
private val delayInjector: DelayInjector,
private val mapper: SimFaaSWorkloadMapper = SimMetaFaaSWorkloadMapper()
@@ -64,7 +63,7 @@ public class SimFunctionDeployer(
/**
* The [CoroutineScope] of this deployer.
*/
- private val scope = CoroutineScope(context + Job())
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + Job())
override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): Instance {
val instance = Instance(function, listener)
@@ -86,7 +85,7 @@ public class SimFunctionDeployer(
* The machine that will execute the workloads.
*/
public val machine: SimMachine = SimBareMetalMachine.create(
- FlowEngine.create(scope.coroutineContext, clock).newGraph(),
+ FlowEngine.create(dispatcher).newGraph(),
model
)
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt
index d3b31bb9..de7b4aa5 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt
@@ -23,13 +23,13 @@
package org.opendc.faas.simulator.delay
import org.opendc.faas.service.deployer.FunctionInstance
-import java.util.Random
+import java.util.random.RandomGenerator
import kotlin.math.abs
/*
* Interface for instance deployment delay estimation.
*/
-public class StochasticDelayInjector(private val model: ColdStartModel, private val random: Random) : DelayInjector {
+public class StochasticDelayInjector(private val model: ColdStartModel, private val random: RandomGenerator) : DelayInjector {
override fun getColdStartDelay(instance: FunctionInstance): Long {
val (mean, sd) = model.coldStartParam(instance.function.memorySize.toInt())
return abs(random.nextGaussian() * sd + mean).toLong()
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index 6baee7ea..be133ded 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -73,13 +73,12 @@ internal class SimFaaSServiceTest {
})
val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random)
- val deployer = SimFunctionDeployer(coroutineContext, clock, machineModel, delayInjector) { workload }
+ val deployer = SimFunctionDeployer(dispatcher, machineModel, delayInjector) { workload }
val service = FaaSService(
- coroutineContext,
- clock,
+ dispatcher,
deployer,
RandomRoutingPolicy(),
- FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000))
+ FunctionTerminationPolicyFixed(dispatcher, timeout = Duration.ofMillis(10000))
)
val client = service.newClient()
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index ec032070..eea46b95 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -74,7 +74,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkBareMetal() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
return@runSimulation machine.runWorkload(trace.createWorkload(0))
@@ -84,7 +84,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkSpaceSharedHypervisor() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1))
@@ -105,7 +105,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkFairShareHypervisorSingle() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
@@ -126,7 +126,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkFairShareHypervisorDouble() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java
index 52d04052..05b40cf8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute;
-import java.time.Clock;
+import java.time.InstantSource;
import org.jetbrains.annotations.NotNull;
import org.opendc.simulator.compute.model.ProcessingUnit;
import org.opendc.simulator.compute.power.CpuPowerModel;
@@ -117,7 +117,7 @@ public class SimPsuFactories {
private final FlowStage stage;
private final OutPort out;
private final CpuPowerModel model;
- private final Clock clock;
+ private final InstantSource clock;
private double targetFreq;
private double totalUsage;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
index 4ebcba71..a1623351 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute.kernel;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -253,7 +253,7 @@ public final class SimHypervisor implements SimWorkload {
private final FlowMultiplexer multiplexer;
private final FlowStage stage;
private final List<ScalingGovernor> scalingGovernors;
- private final Clock clock;
+ private final InstantSource clock;
private final HvCounters counters;
private long lastCounterUpdate;
@@ -526,7 +526,7 @@ public final class SimHypervisor implements SimWorkload {
private final VmInterferenceMember interferenceMember;
private final FlowStage stage;
private final FlowMultiplexer multiplexer;
- private final Clock clock;
+ private final InstantSource clock;
private final List<VCpu> cpus;
private final SimAbstractMachine.Memory memory;
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 2acf6ec7..58b01e06 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -72,7 +72,7 @@ class SimMachineTest {
@Test
fun testFlopsWorkload() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -83,7 +83,7 @@ class SimMachineTest {
machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0))
// Two cores execute 1000 MFlOps per second (1000 ms)
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
@@ -97,7 +97,7 @@ class SimMachineTest {
}
val trace = builder.build()
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
graph,
@@ -107,12 +107,12 @@ class SimMachineTest {
machine.runWorkload(trace.createWorkload(0))
// Two cores execute 1000 MFlOps per second (1000 ms)
- assertEquals(1000000000, clock.millis())
+ assertEquals(1000000000, timeSource.millis())
}
@Test
fun testDualSocketMachine() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val cpuNode = machineModel.cpus[0].node
@@ -128,12 +128,12 @@ class SimMachineTest {
machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0))
// Two sockets with two cores execute 2000 MFlOps per second (500 ms)
- assertEquals(500, clock.millis())
+ assertEquals(500, timeSource.millis())
}
@Test
fun testPower() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
graph,
@@ -156,7 +156,7 @@ class SimMachineTest {
@Test
fun testCapacityClamp() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -184,7 +184,7 @@ class SimMachineTest {
@Test
fun testMemory() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -206,7 +206,7 @@ class SimMachineTest {
@Test
fun testMemoryUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -225,12 +225,12 @@ class SimMachineTest {
override fun snapshot(): SimWorkload = TODO()
})
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testNetUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -253,12 +253,12 @@ class SimMachineTest {
override fun snapshot(): SimWorkload = TODO()
})
- assertEquals(40, clock.millis())
+ assertEquals(40, timeSource.millis())
}
@Test
fun testDiskReadUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -278,12 +278,12 @@ class SimMachineTest {
override fun snapshot(): SimWorkload = TODO()
})
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testDiskWriteUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -303,12 +303,12 @@ class SimMachineTest {
override fun snapshot(): SimWorkload = TODO()
})
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testCancellation() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -325,12 +325,12 @@ class SimMachineTest {
// Ignore
}
- assertEquals(0, clock.millis())
+ assertEquals(0, timeSource.millis())
}
@Test
fun testConcurrentRuns() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -351,7 +351,7 @@ class SimMachineTest {
@Test
fun testCatchStartFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -367,7 +367,7 @@ class SimMachineTest {
@Test
fun testCatchStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -384,7 +384,7 @@ class SimMachineTest {
@Test
fun testCatchShutdownFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -400,7 +400,7 @@ class SimMachineTest {
@Test
fun testCatchNestedFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 79669d40..99f47b2f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -74,7 +74,7 @@ internal class SimFairShareHypervisorTest {
SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
).createWorkload(0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
@@ -93,7 +93,7 @@ internal class SimFairShareHypervisorTest {
{ assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") },
{ assertEquals(880219, hypervisor.counters.cpuIdleTime, "Idle time does not match") },
{ assertEquals(28125, hypervisor.counters.cpuStealTime, "Steal time does not match") },
- { assertEquals(1200000, clock.millis()) { "Current time is correct" } }
+ { assertEquals(1200000, timeSource.millis()) { "Current time is correct" } }
)
}
@@ -118,7 +118,7 @@ internal class SimFairShareHypervisorTest {
SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1)
).createWorkload(0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
@@ -145,7 +145,7 @@ internal class SimFairShareHypervisorTest {
{ assertEquals(329250, hypervisor.counters.cpuActiveTime, "Active time does not match") },
{ assertEquals(870750, hypervisor.counters.cpuIdleTime, "Idle time does not match") },
{ assertEquals(318750, hypervisor.counters.cpuStealTime, "Steal time does not match") },
- { assertEquals(1200000, clock.millis()) }
+ { assertEquals(1200000, timeSource.millis()) }
)
}
@@ -157,7 +157,7 @@ internal class SimFairShareHypervisorTest {
/*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
@@ -184,7 +184,7 @@ internal class SimFairShareHypervisorTest {
.addGroup(setOf("a", "n"), 0.1, 0.8)
.build()
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index d11b91ee..93b67aa3 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -75,7 +75,7 @@ internal class SimSpaceSharedHypervisorTest {
SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
).createWorkload(0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -89,7 +89,7 @@ internal class SimSpaceSharedHypervisorTest {
hypervisor.removeMachine(vm)
machine.cancel()
- assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" }
+ assertEquals(5 * 60L * 4000, timeSource.millis()) { "Took enough time" }
}
/**
@@ -99,7 +99,7 @@ internal class SimSpaceSharedHypervisorTest {
fun testRuntimeWorkload() = runSimulation {
val duration = 5 * 60L * 1000
val workload = SimWorkloads.runtime(duration, 1.0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -113,7 +113,7 @@ internal class SimSpaceSharedHypervisorTest {
machine.cancel()
- assertEquals(duration, clock.millis()) { "Took enough time" }
+ assertEquals(duration, timeSource.millis()) { "Took enough time" }
}
/**
@@ -123,7 +123,7 @@ internal class SimSpaceSharedHypervisorTest {
fun testFlopsWorkload() = runSimulation {
val duration = 5 * 60L * 1000
val workload = SimWorkloads.flops((duration * 3.2).toLong(), 1.0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -135,7 +135,7 @@ internal class SimSpaceSharedHypervisorTest {
vm.runWorkload(workload)
machine.cancel()
- assertEquals(duration, clock.millis()) { "Took enough time" }
+ assertEquals(duration, timeSource.millis()) { "Took enough time" }
}
/**
@@ -144,7 +144,7 @@ internal class SimSpaceSharedHypervisorTest {
@Test
fun testTwoWorkloads() = runSimulation {
val duration = 5 * 60L * 1000
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -165,7 +165,7 @@ internal class SimSpaceSharedHypervisorTest {
machine.cancel()
- assertEquals(duration * 2, clock.millis()) { "Took enough time" }
+ assertEquals(duration * 2, timeSource.millis()) { "Took enough time" }
}
/**
@@ -173,7 +173,7 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadFails() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -200,7 +200,7 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadSucceeds() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
index d0b0efaa..08bb6509 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
@@ -59,7 +59,7 @@ class SimChainWorkloadTest {
@Test
fun testMultipleWorkloads() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -75,12 +75,12 @@ class SimChainWorkloadTest {
machine.runWorkload(workload)
- assertEquals(2000, clock.millis())
+ assertEquals(2000, timeSource.millis())
}
@Test
fun testStartFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -100,12 +100,12 @@ class SimChainWorkloadTest {
assertThrows<IllegalStateException> { machine.runWorkload(workload) }
- assertEquals(0, clock.millis())
+ assertEquals(0, timeSource.millis())
}
@Test
fun testStartFailureSecond() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -126,12 +126,12 @@ class SimChainWorkloadTest {
assertThrows<IllegalStateException> { machine.runWorkload(workload) }
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -150,12 +150,12 @@ class SimChainWorkloadTest {
assertThrows<IllegalStateException> { machine.runWorkload(workload) }
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testStopFailureSecond() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -175,12 +175,12 @@ class SimChainWorkloadTest {
assertThrows<IllegalStateException> { machine.runWorkload(workload) }
- assertEquals(2000, clock.millis())
+ assertEquals(2000, timeSource.millis())
}
@Test
fun testStartAndStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -201,12 +201,12 @@ class SimChainWorkloadTest {
val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
assertEquals(2, exc.cause!!.suppressedExceptions.size)
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testShutdownAndStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -227,12 +227,12 @@ class SimChainWorkloadTest {
val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
assertEquals(1, exc.cause!!.suppressedExceptions.size)
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testShutdownAndStartFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -255,12 +255,12 @@ class SimChainWorkloadTest {
val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
assertEquals(1, exc.cause!!.suppressedExceptions.size)
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testSnapshot() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -276,10 +276,10 @@ class SimChainWorkloadTest {
job.join()
- assertEquals(2000, clock.millis())
+ assertEquals(2000, timeSource.millis())
machine.runWorkload(snapshot)
- assertEquals(3500, clock.millis())
+ assertEquals(3500, timeSource.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
index e3b6e6c5..5c888fbc 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -53,7 +53,7 @@ class SimTraceWorkloadTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -71,12 +71,12 @@ class SimTraceWorkloadTest {
machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testOffset() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -94,12 +94,12 @@ class SimTraceWorkloadTest {
machine.runWorkload(workload)
- assertEquals(5000, clock.millis())
+ assertEquals(5000, timeSource.millis())
}
@Test
fun testSkipFragment() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -118,12 +118,12 @@ class SimTraceWorkloadTest {
delay(1000L)
machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testZeroCores() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -141,6 +141,6 @@ class SimTraceWorkloadTest {
machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-core/build.gradle.kts b/opendc-simulator/opendc-simulator-core/build.gradle.kts
index 0de96a8e..0ae95d42 100644
--- a/opendc-simulator/opendc-simulator-core/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-core/build.gradle.kts
@@ -28,5 +28,6 @@ plugins {
}
dependencies {
+ api(projects.opendc.opendcCommon)
api(libs.kotlinx.coroutines)
}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationDispatcher.java
index 305bdf5e..8c74aacf 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java
+++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationDispatcher.java
@@ -22,17 +22,17 @@
package org.opendc.simulator;
-import java.time.Clock;
import java.time.Instant;
-import java.time.ZoneId;
-import java.util.concurrent.Executor;
+import java.time.InstantSource;
+import org.opendc.common.Dispatcher;
+import org.opendc.common.DispatcherHandle;
/**
- * A scheduler is used by simulations to manage execution of (future) tasks, providing a controllable (virtual) clock to
- * skip over delays.
+ * A {@link Dispatcher} used by simulations to manage execution of (future) tasks, providing a controllable (virtual)
+ * clock to skip over delays.
*
* <p>
- * The scheduler can be queried to advance the time (via {@link #advanceBy}), run all the scheduled tasks advancing the
+ * The dispatcher can be queried to advance the time (via {@link #advanceBy}), run all the scheduled tasks advancing the
* virtual time as needed (via {@link #advanceUntilIdle}), or run the tasks that are scheduled to run as soon as
* possible but have not yet been dispatched (via {@link #runCurrent}). These methods execute the pending tasks using
* a single thread.
@@ -40,7 +40,7 @@ import java.util.concurrent.Executor;
* <p>
* This class is not thread-safe and must not be used concurrently by multiple threads.
*/
-public final class SimulationScheduler implements Executor {
+public final class SimulationDispatcher implements Dispatcher {
/**
* The {@link TaskQueue} containing the pending tasks.
*/
@@ -57,36 +57,27 @@ public final class SimulationScheduler implements Executor {
private int count = 0;
/**
- * The {@link Clock} instance linked to this scheduler.
+ * The {@link InstantSource} instance linked to this scheduler.
*/
- private final SimulationClock clock = new SimulationClock(this, ZoneId.systemDefault());
+ private final SimulationClock timeSource = new SimulationClock(this);
/**
- * Construct a {@link SimulationScheduler} instance with the specified initial time.
+ * Construct a {@link SimulationDispatcher} instance with the specified initial time.
*
* @param initialTimeMs The initial virtual time of the scheduler in milliseconds since epoch.
*/
- public SimulationScheduler(long initialTimeMs) {
+ public SimulationDispatcher(long initialTimeMs) {
this.currentTime = initialTimeMs;
}
/**
- * Construct a {@link SimulationScheduler} instance with the initial time set to UNIX Epoch 0.
+ * Construct a {@link SimulationDispatcher} instance with the initial time set to UNIX Epoch 0.
*/
- public SimulationScheduler() {
+ public SimulationDispatcher() {
this(0);
}
/**
- * Return the virtual clock associated with this dispatcher.
- *
- * @return A {@link Clock} tracking the virtual time of the dispatcher.
- */
- public Clock getClock() {
- return clock;
- }
-
- /**
* Return the current virtual timestamp of the dispatcher (in milliseconds since epoch).
*
* @return A long value representing the virtual timestamp of the dispatcher in milliseconds since epoch.
@@ -96,37 +87,30 @@ public final class SimulationScheduler implements Executor {
}
/**
- * Schedule a <code>task</code> that executes after the specified <code>delayMs</code>.
+ * Return the virtual time source associated with this dispatcher.
*
- * @param delayMs The time from now until the execution of the task (in milliseconds).
- * @param task The task to execute after the delay.
- * @return The identifier of the task that can be used together with the timestamp of the task to cancel it.
+ * @return A {@link InstantSource} tracking the virtual time of the dispatcher.
*/
- public int schedule(long delayMs, Runnable task) {
- if (delayMs < 0) {
- throw new IllegalArgumentException(
- "Attempted scheduling an event earlier in time (delay " + delayMs + " ms)");
- }
+ @Override
+ public InstantSource getTimeSource() {
+ return timeSource;
+ }
+ @Override
+ public void schedule(long delayMs, Runnable command) {
+ internalSchedule(delayMs, command);
+ }
+
+ @Override
+ public DispatcherHandle scheduleCancellable(long delayMs, Runnable command) {
long target = currentTime + delayMs;
if (target < 0) {
target = Long.MAX_VALUE;
}
- int id = count++;
- queue.add(target, id, task);
- return id;
- }
-
- /**
- * Cancel a pending task.
- *
- * @param deadline The deadline of the task.
- * @param id The identifier of the task (returned by {@link #schedule(long, Runnable)}).
- * @return A boolean indicating whether a task was actually cancelled.
- */
- public boolean cancel(long deadline, int id) {
- return queue.remove(deadline, id);
+ long deadline = target;
+ int id = internalSchedule(delayMs, command);
+ return () -> internalCancel(deadline, id);
}
/**
@@ -198,50 +182,62 @@ public final class SimulationScheduler implements Executor {
}
/**
- * Schedule the specified command to run at this moment of virtual time.
+ * Schedule a <code>task</code> that executes after the specified <code>delayMs</code>.
*
- * @param command The command to execute.
+ * @param delayMs The time from now until the execution of the task (in milliseconds).
+ * @param task The task to execute after the delay.
+ * @return The identifier of the task that can be used together with the timestamp of the task to cancel it.
*/
- @Override
- public void execute(Runnable command) {
- schedule(0, command);
+ private int internalSchedule(long delayMs, Runnable task) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException(
+ "Attempted scheduling an event earlier in time (delay " + delayMs + " ms)");
+ }
+
+ long target = currentTime + delayMs;
+ if (target < 0) {
+ target = Long.MAX_VALUE;
+ }
+
+ int id = count++;
+ queue.add(target, id, task);
+ return id;
}
/**
- * A {@link Clock} implementation for a {@link SimulationScheduler}.
+ * Cancel a pending task.
+ *
+ * @param deadline The deadline of the task.
+ * @param id The identifier of the task (returned by {@link #internalSchedule(long, Runnable)}).
+ * @return A boolean indicating whether a task was actually cancelled.
*/
- private static class SimulationClock extends Clock {
- private final SimulationScheduler scheduler;
- private final ZoneId zone;
-
- SimulationClock(SimulationScheduler scheduler, ZoneId zone) {
- this.scheduler = scheduler;
- this.zone = zone;
- }
+ private boolean internalCancel(long deadline, int id) {
+ return queue.remove(deadline, id);
+ }
- @Override
- public ZoneId getZone() {
- return zone;
- }
+ /**
+ * A {@link InstantSource} implementation for a {@link SimulationDispatcher}.
+ */
+ private static class SimulationClock implements InstantSource {
+ private final SimulationDispatcher dispatcher;
- @Override
- public Clock withZone(ZoneId zoneId) {
- return new SimulationClock(scheduler, zone);
+ SimulationClock(SimulationDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
}
@Override
public Instant instant() {
- return Instant.ofEpochMilli(scheduler.currentTime);
+ return Instant.ofEpochMilli(dispatcher.currentTime);
}
@Override
public long millis() {
- return scheduler.currentTime;
+ return dispatcher.currentTime;
}
@Override
public String toString() {
- return "SimulationClock[time=" + millis() + "ms]";
+ return "SimulationDispatcher.InstantSource[time=" + millis() + "ms]";
}
}
}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt
index 882a0fc5..6e568137 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt
@@ -22,11 +22,18 @@
package org.opendc.simulator.kotlin
+import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
+import kotlinx.coroutines.NonCancellable.children
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
-import org.opendc.simulator.SimulationScheduler
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.opendc.common.DispatcherProvider
+import org.opendc.common.asCoroutineDispatcher
+import org.opendc.simulator.SimulationDispatcher
+import java.time.InstantSource
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@@ -54,16 +61,16 @@ import kotlin.coroutines.EmptyCoroutineContext
* The simulation is run in a single thread, unless other [CoroutineDispatcher] are used for child coroutines.
* Because of this, child coroutines are not executed in parallel to [body].
* In order for the spawned-off asynchronous code to actually be executed, one must either [yield] or suspend the
- * body some other way, or use commands that control scheduling (see [SimulationScheduler]).
+ * body some other way, or use commands that control scheduling (see [SimulationDispatcher]).
*/
@OptIn(ExperimentalCoroutinesApi::class)
public fun runSimulation(
context: CoroutineContext = EmptyCoroutineContext,
- scheduler: SimulationScheduler = SimulationScheduler(),
+ scheduler: SimulationDispatcher = SimulationDispatcher(),
body: suspend SimulationCoroutineScope.() -> Unit
) {
- val (safeContext, dispatcher) = context.checkArguments(scheduler)
- val startingJobs = safeContext.activeJobs()
+ val (safeContext, job, dispatcher) = context.checkArguments(scheduler)
+ val startingJobs = job.activeJobs()
val scope = SimulationCoroutineScope(safeContext)
val deferred = scope.async {
body(scope)
@@ -72,7 +79,7 @@ public fun runSimulation(
deferred.getCompletionExceptionOrNull()?.let {
throw it
}
- val endingJobs = safeContext.activeJobs()
+ val endingJobs = job.activeJobs()
if ((endingJobs - startingJobs).isNotEmpty()) {
throw IllegalStateException("Test finished with active jobs: $endingJobs")
}
@@ -82,24 +89,51 @@ public fun runSimulation(
* Convenience method for calling [runSimulation] on an existing [SimulationCoroutineScope].
*/
public fun SimulationCoroutineScope.runSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit =
- runSimulation(coroutineContext, scheduler, block)
+ runSimulation(coroutineContext, dispatcher, block)
+
+private fun CoroutineContext.checkArguments(scheduler: SimulationDispatcher): Triple<CoroutineContext, Job, SimulationController> {
+ val job = get(Job) ?: SupervisorJob()
+ val dispatcher = get(ContinuationInterceptor) ?: scheduler.asCoroutineDispatcher()
+ val simulationDispatcher = dispatcher.asSimulationDispatcher()
+ return Triple(this + dispatcher + job, job, simulationDispatcher.asController())
+}
+
+private fun Job.activeJobs(): Set<Job> {
+ return children.filter { it.isActive }.toSet()
+}
/**
- * Convenience method for calling [runSimulation] on an existing [SimulationCoroutineDispatcher].
+ * Convert a [ContinuationInterceptor] into a [SimulationDispatcher] if possible.
*/
-public fun SimulationCoroutineDispatcher.runSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit =
- runSimulation(this, scheduler, block)
-
-private fun CoroutineContext.checkArguments(scheduler: SimulationScheduler): Pair<CoroutineContext, SimulationController> {
- val dispatcher = get(ContinuationInterceptor).run {
- this?.let { require(this is SimulationController) { "Dispatcher must implement SimulationController: $this" } }
- this ?: SimulationCoroutineDispatcher(scheduler)
- }
+internal fun ContinuationInterceptor.asSimulationDispatcher(): SimulationDispatcher {
+ val provider = this as? DispatcherProvider ?: throw IllegalArgumentException(
+ "DispatcherProvider such as SimulatorCoroutineDispatcher as the ContinuationInterceptor(Dispatcher) is required"
+ )
- val job = get(Job) ?: SupervisorJob()
- return Pair(this + dispatcher + job, dispatcher as SimulationController)
+ return provider.dispatcher as? SimulationDispatcher ?: throw IllegalArgumentException("Active dispatcher is not a SimulationDispatcher")
}
-private fun CoroutineContext.activeJobs(): Set<Job> {
- return checkNotNull(this[Job]).children.filter { it.isActive }.toSet()
+/**
+ * Helper method to convert a [SimulationDispatcher] into a [SimulationController].
+ */
+internal fun SimulationDispatcher.asController(): SimulationController {
+ return object : SimulationController {
+ override val dispatcher: SimulationDispatcher
+ get() = this@asController
+
+ override val timeSource: InstantSource
+ get() = this@asController.timeSource
+
+ override fun advanceUntilIdle() {
+ dispatcher.advanceUntilIdle()
+ }
+
+ override fun advanceBy(delayMs: Long) {
+ dispatcher.advanceBy(delayMs)
+ }
+
+ override fun runCurrent() {
+ dispatcher.runCurrent()
+ }
+ }
}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt
index f96b2326..f7470ad9 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt
@@ -23,30 +23,48 @@
package org.opendc.simulator.kotlin
import kotlinx.coroutines.CoroutineDispatcher
-import org.opendc.simulator.SimulationScheduler
-import java.time.Clock
+import org.opendc.simulator.SimulationDispatcher
+import java.time.InstantSource
/**
- * Control the virtual clock of a [CoroutineDispatcher].
+ * Interface to control the virtual clock of a [CoroutineDispatcher].
*/
public interface SimulationController {
/**
* The current virtual clock as it is known to this Dispatcher.
*/
- public val clock: Clock
+ public val timeSource: InstantSource
/**
- * The [SimulationScheduler] driving the simulation.
+ * The current virtual timestamp of the dispatcher (in milliseconds since epoch).
*/
- public val scheduler: SimulationScheduler
+ public val currentTime: Long
+ get() = timeSource.millis()
+
+ /**
+ * Return the [SimulationDispatcher] driving the simulation.
+ */
+ public val dispatcher: SimulationDispatcher
/**
* Immediately execute all pending tasks and advance the virtual clock-time to the last delay.
*
* If new tasks are scheduled due to advancing virtual time, they will be executed before `advanceUntilIdle`
* returns.
+ */
+ public fun advanceUntilIdle()
+
+ /**
+ * Move the virtual clock of this dispatcher forward by the specified amount, running the scheduled tasks in the
+ * meantime.
*
- * @return the amount of delay-time that this Dispatcher's clock has been forwarded in milliseconds.
+ * @param delayMs The amount of time to move the virtual clock forward (in milliseconds).
+ * @throws IllegalStateException if passed a negative <code>delay</code>.
+ */
+ public fun advanceBy(delayMs: Long)
+
+ /**
+ * Execute the tasks that are scheduled to execute at this moment of virtual time.
*/
- public fun advanceUntilIdle(): Long
+ public fun runCurrent()
}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt
index 6be8e67a..ca49fc53 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt
@@ -24,7 +24,8 @@ package org.opendc.simulator.kotlin
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
-import org.opendc.simulator.SimulationScheduler
+import org.opendc.common.asCoroutineDispatcher
+import org.opendc.simulator.SimulationDispatcher
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@@ -34,33 +35,28 @@ import kotlin.coroutines.EmptyCoroutineContext
*/
public interface SimulationCoroutineScope : CoroutineScope, SimulationController
-private class SimulationCoroutineScopeImpl(
- override val coroutineContext: CoroutineContext
-) :
- SimulationCoroutineScope,
- SimulationController by coroutineContext.simulationController
-
/**
* A scope which provides detailed control over the execution of coroutines for simulations.
*
* If the provided context does not provide a [ContinuationInterceptor] (Dispatcher) or [CoroutineExceptionHandler], the
- * scope adds [SimulationCoroutineDispatcher] automatically.
+ * scope adds a dispatcher automatically.
*/
-@Suppress("FunctionName")
public fun SimulationCoroutineScope(
context: CoroutineContext = EmptyCoroutineContext,
- scheduler: SimulationScheduler = SimulationScheduler()
+ scheduler: SimulationDispatcher = SimulationDispatcher()
): SimulationCoroutineScope {
var safeContext = context
- if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher(scheduler)
- return SimulationCoroutineScopeImpl(safeContext)
-}
+ val simulationDispatcher: SimulationDispatcher
+ val interceptor = context[ContinuationInterceptor]
-private inline val CoroutineContext.simulationController: SimulationController
- get() {
- val handler = this[ContinuationInterceptor]
- return handler as? SimulationController ?: throw IllegalArgumentException(
- "SimulationCoroutineScope requires a SimulationController such as SimulatorCoroutineDispatcher as " +
- "the ContinuationInterceptor (Dispatcher)"
- )
+ if (interceptor != null) {
+ simulationDispatcher = interceptor.asSimulationDispatcher()
+ } else {
+ simulationDispatcher = scheduler
+ safeContext += scheduler.asCoroutineDispatcher()
}
+
+ return object : SimulationCoroutineScope, SimulationController by simulationDispatcher.asController() {
+ override val coroutineContext: CoroutineContext = safeContext
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationDispatcherTest.kt
index eca3b582..600102be 100644
--- a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt
+++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationDispatcherTest.kt
@@ -28,15 +28,15 @@ import org.junit.jupiter.api.assertThrows
import java.time.Instant
/**
- * Test suite for the [SimulationScheduler] class.
+ * Test suite for the [SimulationDispatcher] class.
*/
-class SimulationSchedulerTest {
+class SimulationDispatcherTest {
/**
- * Test the basic functionality of [SimulationScheduler.runCurrent].
+ * Test the basic functionality of [SimulationDispatcher.runCurrent].
*/
@Test
fun testRunCurrent() {
- val scheduler = SimulationScheduler()
+ val scheduler = SimulationDispatcher()
var count = 0
scheduler.schedule(1) { count += 1 }
@@ -58,11 +58,11 @@ class SimulationSchedulerTest {
}
/**
- * Test the clock of the [SimulationScheduler].
+ * Test the clock of the [SimulationDispatcher].
*/
@Test
fun testClock() {
- val scheduler = SimulationScheduler()
+ val scheduler = SimulationDispatcher()
var count = 0
scheduler.schedule(1) { count += 1 }
@@ -70,8 +70,8 @@ class SimulationSchedulerTest {
scheduler.advanceBy(2)
assertEquals(2, scheduler.currentTime)
- assertEquals(2, scheduler.clock.millis())
- assertEquals(Instant.ofEpochMilli(2), scheduler.clock.instant())
+ assertEquals(2, scheduler.timeSource.millis())
+ assertEquals(Instant.ofEpochMilli(2), scheduler.timeSource.instant())
}
/**
@@ -79,7 +79,7 @@ class SimulationSchedulerTest {
*/
@Test
fun testAdvanceByLargeDelays() {
- val scheduler = SimulationScheduler()
+ val scheduler = SimulationDispatcher()
var count = 0
scheduler.schedule(1) { count += 1 }
@@ -87,10 +87,11 @@ class SimulationSchedulerTest {
scheduler.advanceBy(10)
scheduler.schedule(Long.MAX_VALUE) { count += 1 }
+ scheduler.scheduleCancellable(Long.MAX_VALUE) { count += 1 }
scheduler.schedule(100_000_000) { count += 1 }
scheduler.advanceUntilIdle()
- assertEquals(3, count)
+ assertEquals(4, count)
}
/**
@@ -98,7 +99,7 @@ class SimulationSchedulerTest {
*/
@Test
fun testNegativeDelays() {
- val scheduler = SimulationScheduler()
+ val scheduler = SimulationDispatcher()
assertThrows<IllegalArgumentException> { scheduler.schedule(-100) { } }
assertThrows<IllegalArgumentException> { scheduler.advanceBy(-100) }
diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/kotlin/SimulationBuildersTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/kotlin/SimulationBuildersTest.kt
new file mode 100644
index 00000000..26419a50
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/kotlin/SimulationBuildersTest.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.kotlin
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertFalse
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+
+/**
+ * Test suite for the Kotlin simulation builders.
+ */
+class SimulationBuildersTest {
+ @Test
+ fun testDelay() = runSimulation {
+ assertEquals(0, currentTime)
+ delay(100)
+ assertEquals(100, currentTime)
+ }
+
+ @Test
+ fun testController() = runSimulation {
+ var completed = false
+
+ launch {
+ delay(20)
+ completed = true
+ }
+
+ advanceBy(10)
+ assertFalse(completed)
+ advanceBy(11)
+ assertTrue(completed)
+
+ completed = false
+ launch { completed = true }
+ runCurrent()
+ assertTrue(completed)
+ }
+
+ @Test
+ fun testFailOnActiveJobs() {
+ assertThrows<IllegalStateException> {
+ runSimulation {
+ launch { suspendCancellableCoroutine {} }
+ }
+ }
+ }
+
+ @Test
+ fun testPropagateException() {
+ assertThrows<IllegalStateException> {
+ runSimulation {
+ throw IllegalStateException("Test")
+ }
+ }
+ }
+
+ @Test
+ fun testInvalidDispatcher() {
+ assertThrows<IllegalArgumentException> {
+ runSimulation(Dispatchers.Default) { }
+ }
+ }
+
+ @Test
+ fun testExistingJob() {
+ runSimulation(Job()) {
+ delay(10)
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
index 04d46607..4f04bdc1 100644
--- a/opendc-simulator/opendc-simulator-flow/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
@@ -28,8 +28,8 @@ plugins {
}
dependencies {
- api(libs.kotlinx.coroutines)
- implementation(libs.kotlin.logging)
+ api(projects.opendc.opendcCommon)
+ implementation(libs.slf4j.api)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(libs.slf4j.simple)
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
index fb112082..59dd3bad 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
@@ -60,7 +60,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkSink() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
@@ -71,7 +71,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkForward() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
@@ -85,7 +85,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkMuxMaxMinSingleSource() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
@@ -103,7 +103,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkMuxMaxMinTripleSource() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
index 0ebb0da9..c0f52505 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
@@ -23,11 +23,11 @@
package org.opendc.simulator.flow2;
import java.time.Clock;
+import java.time.InstantSource;
import java.util.ArrayList;
import java.util.List;
-import kotlin.coroutines.ContinuationInterceptor;
import kotlin.coroutines.CoroutineContext;
-import kotlinx.coroutines.Delay;
+import org.opendc.common.Dispatcher;
/**
* A {@link FlowEngine} simulates a generic flow network.
@@ -56,29 +56,25 @@ public final class FlowEngine implements Runnable {
*/
private boolean active;
- private final CoroutineContext coroutineContext;
- private final Clock clock;
- private final Delay delay;
+ private final Dispatcher dispatcher;
+ private final InstantSource clock;
/**
- * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link Clock}.
+ * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}.
*/
- public static FlowEngine create(CoroutineContext coroutineContext, Clock clock) {
- return new FlowEngine(coroutineContext, clock);
+ public static FlowEngine create(Dispatcher dispatcher) {
+ return new FlowEngine(dispatcher);
}
- FlowEngine(CoroutineContext coroutineContext, Clock clock) {
- this.coroutineContext = coroutineContext;
- this.clock = clock;
-
- CoroutineContext.Key<? extends ContinuationInterceptor> key = ContinuationInterceptor.Key;
- this.delay = (Delay) coroutineContext.get(key);
+ FlowEngine(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ this.clock = dispatcher.getTimeSource();
}
/**
* Obtain the (virtual) {@link Clock} driving the simulation.
*/
- public Clock getClock() {
+ public InstantSource getClock() {
return clock;
}
@@ -204,7 +200,7 @@ public final class FlowEngine implements Runnable {
// Only schedule a new scheduler invocation in case the target is earlier than all other pending
// scheduler invocations
if (scheduled.tryAdd(target)) {
- delay.invokeOnTimeout(target - now, this, coroutineContext);
+ dispatcher.schedule(target - now, this);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
index ed5579ea..25f87e04 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow2;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
@@ -67,7 +67,7 @@ public final class FlowStage {
*/
int timerIndex = -1;
- final Clock clock;
+ final InstantSource clock;
private final FlowStageLogic logic;
final FlowGraphInternal parentGraph;
private final FlowEngine engine;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
index fba12aaf..16fed4eb 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow2;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.Objects;
/**
@@ -40,7 +40,7 @@ public final class InPort implements Inlet {
OutPort output;
private InHandler handler = InHandlers.noop();
- private final Clock clock;
+ private final InstantSource clock;
private final String name;
private final FlowStage stage;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
index 332296a0..1f7ed4ee 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow2;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.Objects;
/**
@@ -42,7 +42,7 @@ public final class OutPort implements Outlet {
private OutHandler handler = OutHandlers.noop();
private final String name;
private final FlowStage stage;
- private final Clock clock;
+ private final InstantSource clock;
OutPort(FlowStage stage, String name, int id) {
this.name = name;
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
index 839835ce..467bf334 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
@@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation
class FlowEngineTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val multiplexer = MaxMinFlowMultiplexer(graph)
@@ -55,7 +55,7 @@ class FlowEngineTest {
@Test
fun testConnectInvalidInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val inlet = mockk<Inlet>()
@@ -65,7 +65,7 @@ class FlowEngineTest {
@Test
fun testConnectInvalidOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val outlet = mockk<Outlet>()
@@ -75,7 +75,7 @@ class FlowEngineTest {
@Test
fun testConnectInletBelongsToDifferentGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -87,7 +87,7 @@ class FlowEngineTest {
@Test
fun testConnectOutletBelongsToDifferentGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -99,7 +99,7 @@ class FlowEngineTest {
@Test
fun testConnectInletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 2.0f)
@@ -112,7 +112,7 @@ class FlowEngineTest {
@Test
fun testConnectOutletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sinkA = SimpleFlowSink(graph, 2.0f)
@@ -125,7 +125,7 @@ class FlowEngineTest {
@Test
fun testDisconnectInletInvalid() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val inlet = mockk<Inlet>()
@@ -134,7 +134,7 @@ class FlowEngineTest {
@Test
fun testDisconnectOutletInvalid() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val outlet = mockk<Outlet>()
@@ -143,7 +143,7 @@ class FlowEngineTest {
@Test
fun testDisconnectInletInvalidGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -154,7 +154,7 @@ class FlowEngineTest {
@Test
fun testDisconnectOutletInvalidGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -165,7 +165,7 @@ class FlowEngineTest {
@Test
fun testInletEquality() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sinkA = SimpleFlowSink(graph, 2.0f)
@@ -181,7 +181,7 @@ class FlowEngineTest {
@Test
fun testOutletEquality() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
index a2ed2195..fef49786 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
@@ -39,7 +39,7 @@ class ForwardingFlowMultiplexerTest {
*/
@Test
fun testTrace() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = ForwardingFlowMultiplexer(graph)
@@ -60,7 +60,7 @@ class ForwardingFlowMultiplexerTest {
advanceUntilIdle()
assertAll(
- { assertEquals(4000, clock.millis()) { "Took enough time" } }
+ { assertEquals(4000, timeSource.millis()) { "Took enough time" } }
)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
index ba339ee3..ebae2d4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
class MaxMinFlowMultiplexerTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
@@ -49,6 +49,6 @@ class MaxMinFlowMultiplexerTest {
advanceUntilIdle()
- assertEquals(500, clock.millis())
+ assertEquals(500, timeSource.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
index a75efba3..ea516c63 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
@@ -37,7 +37,7 @@ import java.util.concurrent.ThreadLocalRandom
class FlowSinkTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -46,12 +46,12 @@ class FlowSinkTest {
graph.connect(source.output, sink.input)
advanceUntilIdle()
- assertEquals(2000, clock.millis())
+ assertEquals(2000, timeSource.millis())
}
@Test
fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -64,12 +64,12 @@ class FlowSinkTest {
advanceUntilIdle()
- assertEquals(3000, clock.millis())
+ assertEquals(3000, timeSource.millis())
}
@Test
fun testUtilization() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -78,12 +78,12 @@ class FlowSinkTest {
graph.connect(source.output, sink.input)
advanceUntilIdle()
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testFragments() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -100,7 +100,7 @@ class FlowSinkTest {
graph.connect(source.output, sink.input)
advanceUntilIdle()
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
@@ -114,7 +114,7 @@ class FlowSinkTest {
)
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
index 8b4ebb89..181d9a20 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
@@ -43,7 +43,7 @@ import org.opendc.simulator.kotlin.runSimulation
class SimNetworkSinkTest {
@Test
fun testInitialState() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
@@ -56,7 +56,7 @@ class SimNetworkSinkTest {
@Test
fun testDisconnectIdempotent() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
@@ -66,7 +66,7 @@ class SimNetworkSinkTest {
@Test
fun testConnectCircular() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
@@ -77,7 +77,7 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnectedTarget() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = mockk<SimNetworkPort>(relaxUnitFun = true)
@@ -90,7 +90,7 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source1 = TestSource(graph)
@@ -107,7 +107,7 @@ class SimNetworkSinkTest {
@Test
fun testConnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = TestSource(graph)
@@ -127,7 +127,7 @@ class SimNetworkSinkTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = TestSource(graph)
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
index 1507c4a1..4a489478 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
@@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation
class SimNetworkSwitchVirtualTest {
@Test
fun testConnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = TestSource(graph)
@@ -60,7 +60,7 @@ class SimNetworkSwitchVirtualTest {
@Test
fun testConnectClosedPort() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val switch = SimNetworkSwitchVirtual(graph)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
index 6adb0548..f596ca4e 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
internal class SimPduTest {
@Test
fun testZeroOutlets() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val pdu = SimPdu(graph)
@@ -48,7 +48,7 @@ internal class SimPduTest {
@Test
fun testSingleOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val pdu = SimPdu(graph)
@@ -62,7 +62,7 @@ internal class SimPduTest {
@Test
fun testDoubleOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 200.0f)
val pdu = SimPdu(graph)
@@ -78,7 +78,7 @@ internal class SimPduTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 300.0f)
val pdu = SimPdu(graph)
@@ -95,7 +95,7 @@ internal class SimPduTest {
@Test
fun testLoss() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 500.0f)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
@@ -110,7 +110,7 @@ internal class SimPduTest {
@Test
fun testOutletClose() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val pdu = SimPdu(graph)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
index 03b8182c..03c942b4 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
@@ -42,7 +42,7 @@ import org.opendc.simulator.kotlin.runSimulation
internal class SimPowerSourceTest {
@Test
fun testInitialState() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
@@ -57,7 +57,7 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnectIdempotent() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
@@ -67,7 +67,7 @@ internal class SimPowerSourceTest {
@Test
fun testConnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = TestInlet(graph)
@@ -87,7 +87,7 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = TestInlet(graph)
@@ -102,7 +102,7 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnectAssertion() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
@@ -120,7 +120,7 @@ internal class SimPowerSourceTest {
@Test
fun testOutletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = TestInlet(graph)
@@ -135,7 +135,7 @@ internal class SimPowerSourceTest {
@Test
fun testInletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = mockk<SimPowerInlet>(relaxUnitFun = true)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
index 0dd7bb05..89fede63 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
internal class SimUpsTest {
@Test
fun testSingleInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 200.0f)
val ups = SimUps(graph)
@@ -49,7 +49,7 @@ internal class SimUpsTest {
@Test
fun testDoubleInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source1 = SimPowerSource(graph, /*capacity*/ 200.0f)
val source2 = SimPowerSource(graph, /*capacity*/ 200.0f)
@@ -69,7 +69,7 @@ internal class SimUpsTest {
@Test
fun testLoss() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 500.0f)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
@@ -84,7 +84,7 @@ internal class SimUpsTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source1 = SimPowerSource(graph, /*capacity*/ 200.0f)
val source2 = SimPowerSource(graph, /*capacity*/ 200.0f)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt
index 7ba12ed9..d5f509e7 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.power
-import io.mockk.spyk
import org.opendc.simulator.flow2.FlowGraph
import org.opendc.simulator.flow2.FlowStage
import org.opendc.simulator.flow2.FlowStageLogic
@@ -32,8 +31,7 @@ import org.opendc.simulator.flow2.Outlet
* A test inlet.
*/
class TestInlet(graph: FlowGraph) : SimPowerInlet(), FlowStageLogic {
- val logic = spyk(this)
- private val stage = graph.newStage(logic)
+ private val stage = graph.newStage(this)
val flowOutlet = stage.getOutlet("out")
init {
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 3aac2630..86c1c521 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -260,7 +260,7 @@ public class OpenDCRunner(
val scenario = scenario
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(
serviceDomain,
@@ -285,7 +285,7 @@ public class OpenDCRunner(
}
// Run workload trace
- service.replay(clock, vms, seed, failureModel = failureModel, interference = phenomena.interference)
+ service.replay(timeSource, vms, seed, failureModel = failureModel, interference = phenomena.interference)
val serviceMetrics = service.getSchedulerStats()
logger.debug {
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index 2436c387..07b43b6d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,6 +22,7 @@
package org.opendc.workflow.service
+import org.opendc.common.Dispatcher
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -30,9 +31,7 @@ import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* A service for cloud workflow execution.
@@ -59,9 +58,7 @@ public interface WorkflowService : AutoCloseable {
/**
* Construct a new [WorkflowService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
- * @param meterProvider The meter provider to use.
+ * @param dispatcher A [Dispatcher] to schedule future events.
* @param compute The "Compute" client to use.
* @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles).
* @param jobAdmissionPolicy The job admission policy to use.
@@ -70,8 +67,7 @@ public interface WorkflowService : AutoCloseable {
* @param taskOrderPolicy The task order policy to use.
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
compute: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -80,8 +76,7 @@ public interface WorkflowService : AutoCloseable {
taskOrderPolicy: TaskOrderPolicy
): WorkflowService {
return WorkflowServiceImpl(
- context,
- clock,
+ dispatcher,
compute,
schedulingQuantum,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index b1780896..01c1f565 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -26,6 +26,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Image
@@ -40,11 +42,10 @@ import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
+import java.time.InstantSource
import java.util.PriorityQueue
import java.util.Queue
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
/**
@@ -52,8 +53,7 @@ import kotlin.coroutines.resume
* Datacenter Scheduling.
*/
public class WorkflowServiceImpl(
- context: CoroutineContext,
- private val clock: Clock,
+ dispatcher: Dispatcher,
private val computeClient: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -64,7 +64,12 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- private val scope = CoroutineScope(context + kotlinx.coroutines.Job())
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + kotlinx.coroutines.Job())
+
+ /**
+ * The [InstantSource] representing the clock of this service.
+ */
+ private val clock = dispatcher.timeSource
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -149,7 +154,7 @@ public class WorkflowServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() }
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index b165418a..e5e05a92 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -70,7 +70,7 @@ internal class WorkflowServiceTest {
val computeService = "compute.opendc.org"
val workflowService = "workflow.opendc.org"
- Provisioner(coroutineContext, clock, seed = 0L).use { provisioner ->
+ Provisioner(dispatcher, seed = 0L).use { provisioner ->
val scheduler: (ProvisioningContext) -> ComputeScheduler = {
FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
@@ -103,7 +103,7 @@ internal class WorkflowServiceTest {
Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
format = "gwf"
)
- service.replay(clock, trace.toJobs())
+ service.replay(timeSource, trace.toJobs())
val metrics = service.getSchedulerStats()
@@ -119,7 +119,7 @@ internal class WorkflowServiceTest {
},
{ assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
{ assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(45977707L, clock.millis()) { "Total duration incorrect" } }
+ { assertEquals(45977707L, timeSource.millis()) { "Total duration incorrect" } }
)
}
}
diff --git a/site/docs/advanced-guides/toolchain.md b/site/docs/advanced-guides/toolchain.md
index 36efece7..a1735767 100644
--- a/site/docs/advanced-guides/toolchain.md
+++ b/site/docs/advanced-guides/toolchain.md
@@ -22,7 +22,7 @@ Follow the steps below to get it all set up!
## 1. Installing Java
-OpenDC requires a Java installation of version 11 or higher. Make sure to install
+OpenDC requires a Java installation of version 17 or higher. Make sure to install
the [JDK](https://www.oracle.com/technetwork/java/javase/downloads/index.html), not only the JRE (the JDK also includes
a JRE).
diff --git a/site/docs/getting-started/0-installation.md b/site/docs/getting-started/0-installation.md
index 2747c344..9b9b0e82 100644
--- a/site/docs/getting-started/0-installation.md
+++ b/site/docs/getting-started/0-installation.md
@@ -14,7 +14,7 @@ quicker.
1. **Supported Platforms**
OpenDC is actively tested on Windows, macOS and GNU/Linux.
2. **Required Software**
- A Java installation of version 11 or higher is required for OpenDC. You may download the
+ A Java installation of version 17 or higher is required for OpenDC. You may download the
[Java distribution from Oracle](https://www.oracle.com/java/technologies/downloads/) or use the distribution provided
by your package manager.