summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/build.yml6
-rw-r--r--.github/workflows/release.yml6
-rw-r--r--.github/workflows/test-gradle-rc.yml3
-rw-r--r--.github/workflows/test-java-ea.yml3
-rw-r--r--README.md39
-rw-r--r--buildSrc/src/main/kotlin/Libs.kt37
-rw-r--r--buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts4
-rw-r--r--buildSrc/src/main/kotlin/java-conventions.gradle.kts6
-rw-r--r--buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts9
-rw-r--r--buildSrc/src/main/kotlin/testing-conventions.gradle.kts26
-rw-r--r--gradle/libs.versions.toml10
-rw-r--r--gradle/wrapper/gradle-wrapper.jarbin59821 -> 60756 bytes
-rw-r--r--gradle/wrapper/gradle-wrapper.properties2
-rwxr-xr-xgradlew6
-rw-r--r--gradlew.bat14
-rw-r--r--opendc-common/src/main/java/org/opendc/common/Dispatcher.java63
-rw-r--r--opendc-common/src/main/java/org/opendc/common/DispatcherHandle.java33
-rw-r--r--opendc-common/src/main/java/org/opendc/common/DispatcherProvider.java33
-rw-r--r--opendc-common/src/main/java/org/opendc/common/util/Pacer.java (renamed from opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt)80
-rw-r--r--opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java256
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/DispatcherCoroutineDispatcher.kt (renamed from opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt)48
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/Dispatchers.kt32
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt230
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/DispatcherCoroutineDispatcherTest.kt69
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt18
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt31
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt11
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt12
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt7
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt26
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt6
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt16
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt12
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt11
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt11
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt16
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt7
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt2
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt8
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt8
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt6
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt11
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt12
-rw-r--r--opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt5
-rw-r--r--opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt6
-rw-r--r--opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt6
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt23
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt7
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt18
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt7
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt4
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt3
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt11
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt10
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt24
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt8
-rw-r--r--opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt20
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt11
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt4
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt48
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt20
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt38
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt16
-rw-r--r--opendc-simulator/opendc-simulator-core/build.gradle.kts1
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationDispatcher.java (renamed from opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java)134
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt74
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt34
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt36
-rw-r--r--opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationDispatcherTest.kt (renamed from opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt)23
-rw-r--r--opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/kotlin/SimulationBuildersTest.kt98
-rw-r--r--opendc-simulator/opendc-simulator-flow/build.gradle.kts4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt18
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt8
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt4
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt4
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt13
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt17
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt6
-rw-r--r--site/docs/advanced-guides/toolchain.md2
-rw-r--r--site/docs/getting-started/0-installation.md2
112 files changed, 1289 insertions, 958 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 0747ad33..96b86233 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -13,10 +13,10 @@ jobs:
strategy:
matrix:
os: [ ubuntu-latest ]
- java: [ 11, 17, 18 ]
+ java: [ 17, 19 ]
include:
- os: windows-latest
- java: 18
+ java: 17
steps:
- name: Checkout repository
uses: actions/checkout@v3
@@ -36,7 +36,7 @@ jobs:
cache-read-only: ${{ github.ref != 'refs/heads/main' }}
- name: Publish report
if: always()
- uses: mikepenz/action-junit-report@v3.5.2
+ uses: mikepenz/action-junit-report@v3
with:
check_name: test (Java ${{ matrix.java }})
report_paths: '**/build/test-results/test/TEST-*.xml'
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 2d893edf..4f62843c 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -8,9 +8,6 @@ jobs:
build:
name: Build OpenDC
runs-on: ubuntu-latest
- strategy:
- matrix:
- java: [ 17 ]
steps:
- name: Checkout repository
uses: actions/checkout@v3
@@ -20,13 +17,12 @@ jobs:
uses: actions/setup-java@v3
with:
distribution: 'zulu'
- java-version: ${{ matrix.java }}
+ java-version: 17
- name: Publish with Gradle
uses: gradle/gradle-build-action@v2
with:
arguments: publish
env:
- ORG_GRADLE_PROJECT_signingKeyId: F8134F9C
ORG_GRADLE_PROJECT_signingKey: ${{ secrets.GPG_PRIVATE_KEY }}
ORG_GRADLE_PROJECT_signingPassword: ${{ secrets.GPG_PASSPHRASE }}
ORG_GRADLE_PROJECT_ossrhUsername: ${{ secrets.OSSRH_USERNAME }}
diff --git a/.github/workflows/test-gradle-rc.yml b/.github/workflows/test-gradle-rc.yml
index a39d36d7..b58bbcb1 100644
--- a/.github/workflows/test-gradle-rc.yml
+++ b/.github/workflows/test-gradle-rc.yml
@@ -11,8 +11,9 @@ jobs:
- uses: actions/setup-java@v3
with:
distribution: 'zulu'
- java-version: 18
+ java-version: 19
- uses: gradle/gradle-build-action@v2
with:
+ cache-disabled: true
gradle-version: release-candidate
arguments: build --dry-run # just test build configuration
diff --git a/.github/workflows/test-java-ea.yml b/.github/workflows/test-java-ea.yml
index f3c31928..d0324b1b 100644
--- a/.github/workflows/test-java-ea.yml
+++ b/.github/workflows/test-java-ea.yml
@@ -11,8 +11,9 @@ jobs:
- uses: actions/setup-java@v3
with:
distribution: 'zulu'
- java-version: 19-ea
+ java-version: 20-ea
- uses: gradle/gradle-build-action@v2
with:
+ cache-disabled: true
gradle-version: release-candidate
arguments: build
diff --git a/README.md b/README.md
index a633b264..a82848bb 100644
--- a/README.md
+++ b/README.md
@@ -13,42 +13,23 @@ Collaborative Datacenter Simulation and Exploration for Everybody
-----
-OpenDC is a free and open-source platform for datacenter simulation aimed at both research and education.
+This repository is the home of the OpenDC project, a free and open-source platform for cloud datacenter simulation.
-![Datacenter construction in OpenDC](site/src/components/HomepageFeatures/screenshot-construction.png)
+## Latest Release
-Users can construct datacenters (see above) and define portfolios of scenarios (experiments) to see how these
-datacenters perform under different workloads and schedulers (see below).
-
-![Datacenter simulation in OpenDC](site/src/components/HomepageFeatures/screenshot-construction.png))
-
-The simulator is accessible both as a ready-to-use website hosted by us at [app.opendc.org](https://app.opendc.org), and
-as source code that users can run locally on their own machine or via Docker.
-
-To learn more
-about OpenDC, have a look through our paper [OpenDC 2.0](https://atlarge-research.com/pdfs/ccgrid21-opendc-paper.pdf)
-or on our [vision](https://atlarge-research.com/pdfs/opendc-vision17ispdc_cr.pdf).
-
-🛠 OpenDC is a project by the [@Large Research Group](https://atlarge-research.com).
-
-🐟 OpenDC comes bundled
-with [Capelin](https://repository.tudelft.nl/islandora/object/uuid:d6d50861-86a3-4dd3-a13f-42d84db7af66?collection=education)
-, the capacity planning tool for cloud datacenters based on portfolios of what-if scenarios. More information on how to
-use and extend Capelin coming soon!
+- General Availability (GA): [OpenDC v2.0](https://github.com/atlarge-research/opendc/releases/tag/v2.0) (May 10, 2021)
+- Preview (Release Candidate): n/a
## Documentation
-You can find the OpenDC documentation [on the website](https://opendc.org/).
-
-Check out the [Getting Started](https://opendc.org/docs/getting-started.html) page for a quick overview.
-
+You can find the OpenDC documentation [on the website](https://atlarge-research.github.io/opendc/).
The documentation is divided into several sections:
-* [Main Concepts](https://opendc.org/docs/category/getting-started)
-* [Tutorials](https://opendc.org/docs/category/tutorials)
-* [Advanced Guides](https://opendc.org/docs/category/advanced-guides)
-* [Where to Get Support](https://opendc.org/community/support)
-* [Contributing Guide](https://opendc.org/community/contributing)
+* [Getting Started](https://atlarge-research.github.io/opendc/docs/category/getting-started/)
+* [Tutorials](https://atlarge-research.github.io/opendc/docs/category/tutorials/)
+* [Advanced Guides](https://atlarge-research.github.io/opendc/docs/category/advanced-guides/)
+* [Where to Get Support](https://atlarge-research.github.io/opendc/community/support/)
+* [Contributing Guide](https://atlarge-research.github.io/opendc/community/contributing/)
The source code for the documentation is located in [site](site).
diff --git a/buildSrc/src/main/kotlin/Libs.kt b/buildSrc/src/main/kotlin/Libs.kt
index 6a73e1b9..09432b06 100644
--- a/buildSrc/src/main/kotlin/Libs.kt
+++ b/buildSrc/src/main/kotlin/Libs.kt
@@ -24,32 +24,27 @@
import org.gradle.api.JavaVersion
import org.gradle.api.Project
+import org.gradle.api.artifacts.VersionCatalog
import org.gradle.api.artifacts.VersionCatalogsExtension
import org.gradle.kotlin.dsl.getByType
/**
- * This class makes the version catalog accessible for the build scripts until Gradle adds support for it.
- *
- * See https://github.com/gradle/gradle/issues/15383
+ * Obtain the default [VersionCatalog] of the project.
*/
-public class Libs(project: Project) {
- /**
- * The version catalog of the project.
- */
- private val versionCatalog = project.extensions.getByType(VersionCatalogsExtension::class).named("libs")
+public val Project.defaultVersionCatalog: VersionCatalog
+ get() = project.extensions.getByType(VersionCatalogsExtension::class).named("libs")
- /**
- * Obtain the version for the specified [dependency][name].
- */
- operator fun get(name: String): String {
- val dep = versionCatalog.findLibrary(name).get().get()
- return "${dep.module.group}:${dep.module.name}:${dep.versionConstraint.displayName}"
- }
+/**
+ * Obtain the dependency string for the specified [name].
+ */
+public operator fun VersionCatalog.get(name: String): String {
+ val dep = findLibrary(name).get().get()
+ return "${dep.module.group}:${dep.module.name}:${dep.versionConstraint.displayName}"
+}
- companion object {
- /**
- * The JVM version to target.
- */
- val jvmTarget = JavaVersion.VERSION_11
- }
+/**
+ * Obtain the string representation of the version with the given [name].
+ */
+public fun VersionCatalog.getVersion(name: String): String {
+ return findVersion(name).get().displayName
}
diff --git a/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts b/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts
index 554d2279..e16733a4 100644
--- a/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts
@@ -50,7 +50,7 @@ tasks.named("jmh", JMHTask::class) {
dependencies {
constraints {
- val libs = Libs(project)
- jmh(libs["commons.math3"]) // XXX Force JMH to use the same commons-math3 version as OpenDC
+ val versionCatalog = project.defaultVersionCatalog
+ jmh(versionCatalog["commons.math3"]) // XXX Force JMH to use the same commons-math3 version as OpenDC
}
}
diff --git a/buildSrc/src/main/kotlin/java-conventions.gradle.kts b/buildSrc/src/main/kotlin/java-conventions.gradle.kts
index 8857d4ab..875ae2a7 100644
--- a/buildSrc/src/main/kotlin/java-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/java-conventions.gradle.kts
@@ -31,8 +31,10 @@ repositories {
}
java {
- sourceCompatibility = Libs.jvmTarget
- targetCompatibility = Libs.jvmTarget
+ toolchain {
+ val javaVersion = project.defaultVersionCatalog.getVersion("java")
+ languageVersion.set(JavaLanguageVersion.of(javaVersion))
+ }
}
tasks.withType<JavaCompile> {
diff --git a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts
index 5ccc06a4..79afdf7c 100644
--- a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts
@@ -29,8 +29,15 @@ plugins {
}
/* Project configuration */
+
+kotlin {
+ jvmToolchain {
+ val javaVersion = project.defaultVersionCatalog.getVersion("java")
+ languageVersion.set(JavaLanguageVersion.of(javaVersion))
+ }
+}
+
tasks.withType<KotlinCompile>().configureEach {
- kotlinOptions.jvmTarget = Libs.jvmTarget.toString()
kotlinOptions.freeCompilerArgs += "-opt-in=kotlin.RequiresOptIn"
kotlinOptions.freeCompilerArgs += "-Xjvm-default=all"
}
diff --git a/buildSrc/src/main/kotlin/testing-conventions.gradle.kts b/buildSrc/src/main/kotlin/testing-conventions.gradle.kts
index ebeb58a4..a7fa9da9 100644
--- a/buildSrc/src/main/kotlin/testing-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/testing-conventions.gradle.kts
@@ -34,10 +34,26 @@ tasks.test {
}
dependencies {
- val libs = Libs(project)
+ val versionCatalog = project.defaultVersionCatalog
- testImplementation(libs["junit.jupiter.api"])
- testImplementation(libs["junit.jupiter.params"])
- testImplementation(libs["mockk"])
- testRuntimeOnly(libs["junit.jupiter.engine"])
+ testImplementation(versionCatalog["junit.jupiter.api"])
+ testImplementation(versionCatalog["junit.jupiter.params"])
+ testImplementation(versionCatalog["mockk"])
+ testRuntimeOnly(versionCatalog["junit.jupiter.engine"])
+}
+
+tasks.register<Test>("testsOn18") {
+ javaLauncher.set(javaToolchains.launcherFor {
+ languageVersion.set(JavaLanguageVersion.of(18))
+ })
+
+ useJUnitPlatform()
+}
+
+tasks.register<Test>("testsOn19") {
+ javaLauncher.set(javaToolchains.launcherFor {
+ languageVersion.set(JavaLanguageVersion.of(19))
+ })
+
+ useJUnitPlatform()
}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 033e8cfb..89de95b6 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -3,16 +3,17 @@ calcite = "1.32.0"
clikt = "3.5.0"
commons-math3 = "3.6.1"
dokka = "1.7.10"
-gradle-node = "3.4.0"
+gradle-node = "3.5.0"
hadoop = "3.3.4"
-jackson = "2.13.4"
+jackson = "2.14.0"
jandex-gradle = "0.13.2"
+java = "17"
jline = "3.21.0"
jmh-gradle = "0.6.8"
jakarta-validation = "2.0.2"
junit-jupiter = "5.9.1"
kotlin = "1.7.20"
-kotlin-logging = "3.0.0"
+kotlin-logging = "3.0.4"
kotlinx-coroutines = "1.6.4"
log4j = "2.19.0"
microprofile-openapi = "3.0"
@@ -21,7 +22,7 @@ mockk = "1.13.2"
node = "18.12.0"
parquet = "1.12.3"
progressbar = "0.9.3"
-quarkus = "2.13.1.Final"
+quarkus = "2.14.0.Final"
quarkus-junit5-mockk = "1.1.1"
sentry = "6.4.3"
slf4j = "2.0.3"
@@ -36,6 +37,7 @@ kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core",
# Logging
kotlin-logging = { module = "io.github.microutils:kotlin-logging", version.ref = "kotlin-logging" }
+slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
log4j-core = { module = "org.apache.logging.log4j:log4j-core", version.ref = "log4j" }
log4j-slf4j = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", version.ref = "log4j" }
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 41d9927a..249e5832 100644
--- a/gradle/wrapper/gradle-wrapper.jar
+++ b/gradle/wrapper/gradle-wrapper.jar
Binary files differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 8049c684..8fab764a 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.5-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-rc-3-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
index 1b6c7873..a69d9cb6 100755
--- a/gradlew
+++ b/gradlew
@@ -205,6 +205,12 @@ set -- \
org.gradle.wrapper.GradleWrapperMain \
"$@"
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+ die "xargs is not available"
+fi
+
# Use "xargs" to parse quoted args.
#
# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
diff --git a/gradlew.bat b/gradlew.bat
index 107acd32..f127cfd4 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -14,7 +14,7 @@
@rem limitations under the License.
@rem
-@if "%DEBUG%" == "" @echo off
+@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@@ -25,7 +25,7 @@
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
-if "%DIRNAME%" == "" set DIRNAME=.
+if "%DIRNAME%"=="" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@@ -40,7 +40,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
-if "%ERRORLEVEL%" == "0" goto execute
+if %ERRORLEVEL% equ 0 goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
@@ -75,13 +75,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
:end
@rem End local scope for the variables with windows NT shell
-if "%ERRORLEVEL%"=="0" goto mainEnd
+if %ERRORLEVEL% equ 0 goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
-if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
-exit /b 1
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
:mainEnd
if "%OS%"=="Windows_NT" endlocal
diff --git a/opendc-common/src/main/java/org/opendc/common/Dispatcher.java b/opendc-common/src/main/java/org/opendc/common/Dispatcher.java
new file mode 100644
index 00000000..8c919311
--- /dev/null
+++ b/opendc-common/src/main/java/org/opendc/common/Dispatcher.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common;
+
+import java.time.InstantSource;
+
+/**
+ * A {@link Dispatcher} is used in OpenDC to schedule the execution of future tasks over potentially multiple threads.
+ */
+public interface Dispatcher {
+ /**
+ * Return the time source of the dispatcher as an {@link InstantSource}.
+ */
+ InstantSource getTimeSource();
+
+ /**
+ * Schedule the specified {@link Runnable} to run as soon as possible.
+ *
+ * @param command The task to execute.
+ */
+ default void schedule(Runnable command) {
+ schedule(0, command);
+ }
+
+ /**
+ * Schedule the specified {@link Runnable} to run after the specified <code>delay</code>.
+ * <p>
+ * Use this method to eliminate potential allocations in case the task does not need to be cancellable.
+ *
+ * @param delayMs The time from now to the delayed execution (in milliseconds).
+ * @param command The task to execute.
+ */
+ void schedule(long delayMs, Runnable command);
+
+ /**
+ * Schedule the specified {@link Runnable} to run after the specified <code>delay</code>.
+ *
+ * @param delayMs The time from now to the delayed execution (in milliseconds).
+ * @param command The task to execute.
+ * @return A {@link DispatcherHandle} representing pending completion of the task.
+ */
+ DispatcherHandle scheduleCancellable(long delayMs, Runnable command);
+}
diff --git a/opendc-common/src/main/java/org/opendc/common/DispatcherHandle.java b/opendc-common/src/main/java/org/opendc/common/DispatcherHandle.java
new file mode 100644
index 00000000..e34e5e11
--- /dev/null
+++ b/opendc-common/src/main/java/org/opendc/common/DispatcherHandle.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common;
+
+/**
+ * A handle returned by a {@link Dispatcher} representing a scheduled task.
+ */
+public interface DispatcherHandle {
+ /**
+ * Attempt to cancel execution of the task.
+ */
+ void cancel();
+}
diff --git a/opendc-common/src/main/java/org/opendc/common/DispatcherProvider.java b/opendc-common/src/main/java/org/opendc/common/DispatcherProvider.java
new file mode 100644
index 00000000..2717bd0f
--- /dev/null
+++ b/opendc-common/src/main/java/org/opendc/common/DispatcherProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common;
+
+/**
+ * Interface to expose the {@link Dispatcher} instance used by a class.
+ */
+public interface DispatcherProvider {
+ /**
+ * Return the {@link Dispatcher} associated with this class.
+ */
+ Dispatcher getDispatcher();
+}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt b/opendc-common/src/main/java/org/opendc/common/util/Pacer.java
index edf607d2..5b8d8cb0 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt
+++ b/opendc-common/src/main/java/org/opendc/common/util/Pacer.java
@@ -20,75 +20,75 @@
* SOFTWARE.
*/
-package org.opendc.common.util
+package org.opendc.common.util;
-import kotlinx.coroutines.Delay
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.InternalCoroutinesApi
-import java.lang.Runnable
-import java.time.Clock
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.CoroutineContext
+import java.util.function.LongConsumer;
+import org.opendc.common.Dispatcher;
+import org.opendc.common.DispatcherHandle;
/**
* Helper class to pace the incoming scheduling requests.
- *
- * @param context The [CoroutineContext] in which the pacer runs.
- * @param clock The virtual simulation clock.
- * @param quantum The scheduling quantum.
- * @param process The process to invoke for the incoming requests.
*/
-public class Pacer(
- private val context: CoroutineContext,
- private val clock: Clock,
- private val quantum: Long,
- private val process: (Long) -> Unit
-) {
+public final class Pacer {
+ private final Dispatcher dispatcher;
+ private final long quantumMs;
+ private final LongConsumer process;
+
/**
- * The [Delay] instance that provides scheduled execution of [Runnable]s.
+ * The current {@link DispatcherHandle} representing the pending scheduling cycle.
*/
- @OptIn(InternalCoroutinesApi::class)
- private val delay =
- requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+ private DispatcherHandle handle;
/**
- * The current [DisposableHandle] representing the pending scheduling cycle.
+ * Construct a {@link Pacer} instance.
+ *
+ * @param dispatcher The {@link Dispatcher} to schedule future invocations.
+ * @param quantumMs The scheduling quantum in milliseconds.
+ * @param process The process to invoke for the incoming requests.
*/
- private var handle: DisposableHandle? = null
+ public Pacer(Dispatcher dispatcher, long quantumMs, LongConsumer process) {
+ this.dispatcher = dispatcher;
+ this.quantumMs = quantumMs;
+ this.process = process;
+ }
/**
* Determine whether a scheduling cycle is pending.
*/
- public val isPending: Boolean get() = handle != null
+ public boolean isPending() {
+ return handle != null;
+ }
/**
* Enqueue a new scheduling cycle.
*/
- public fun enqueue() {
+ public void enqueue() {
if (handle != null) {
- return
+ return;
}
- val quantum = quantum
- val now = clock.millis()
+ final Dispatcher dispatcher = this.dispatcher;
+ long quantumMs = this.quantumMs;
+ long now = dispatcher.getTimeSource().millis();
// We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// We calculate here the delay until the next scheduling slot.
- val timeUntilNextSlot = quantum - (now % quantum)
+ long timeUntilNextSlot = quantumMs - (now % quantumMs);
- @OptIn(InternalCoroutinesApi::class)
- handle = delay.invokeOnTimeout(timeUntilNextSlot, {
- process(now + timeUntilNextSlot)
- handle = null
- }, context)
+ handle = dispatcher.scheduleCancellable(timeUntilNextSlot, () -> {
+ process.accept(now + timeUntilNextSlot);
+ handle = null;
+ });
}
/**
* Cancel the currently pending scheduling cycle.
*/
- public fun cancel() {
- val handle = handle ?: return
- this.handle = null
- handle.dispose()
+ public void cancel() {
+ final DispatcherHandle handle = this.handle;
+ if (handle != null) {
+ this.handle = null;
+ handle.cancel();
+ }
}
}
diff --git a/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java b/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java
new file mode 100644
index 00000000..a85605e9
--- /dev/null
+++ b/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common.util;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.PriorityQueue;
+import org.jetbrains.annotations.NotNull;
+import org.opendc.common.Dispatcher;
+import org.opendc.common.DispatcherHandle;
+
+/**
+ * A {@link TimerScheduler} facilitates scheduled execution of future tasks.
+ */
+public final class TimerScheduler<T> {
+ private final Dispatcher dispatcher;
+
+ /**
+ * The stack of the invocations to occur in the future.
+ */
+ private final ArrayDeque<Invocation> invocations = new ArrayDeque<>();
+
+ /**
+ * A priority queue containing the tasks to be scheduled in the future.
+ */
+ private final PriorityQueue<Timer<T>> queue = new PriorityQueue<Timer<T>>();
+
+ /**
+ * A map that keeps track of the timers.
+ */
+ private final HashMap<T, Timer<T>> timers = new HashMap<>();
+
+ /**
+ * Construct a {@link TimerScheduler} instance.
+ *
+ * @param dispatcher The {@link Dispatcher} to schedule future invocations.
+ */
+ public TimerScheduler(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ /**
+ * Start a timer that will invoke the specified [block] after [delay].
+ * <p>
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param delay The delay before invoking the block.
+ * @param block The block to invoke.
+ */
+ public void startSingleTimer(T key, long delay, Runnable block) {
+ startSingleTimerTo(key, dispatcher.getTimeSource().millis() + delay, block);
+ }
+
+ /**
+ * Start a timer that will invoke the specified [block] at [timestamp].
+ * <p>
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param timestamp The timestamp at which to invoke the block.
+ * @param block The block to invoke.
+ */
+ public void startSingleTimerTo(T key, long timestamp, Runnable block) {
+ long now = dispatcher.getTimeSource().millis();
+ final PriorityQueue<Timer<T>> queue = this.queue;
+ final ArrayDeque<Invocation> invocations = this.invocations;
+
+ if (timestamp < now) {
+ throw new IllegalArgumentException("Timestamp must be in the future");
+ }
+
+ timers.compute(key, (k, old) -> {
+ if (old != null && old.timestamp == timestamp) {
+ // Fast-path: timer for the same timestamp already exists
+ old.block = block;
+ return old;
+ } else {
+ // Slow-path: cancel old timer and replace it with new timer
+ Timer<T> timer = new Timer<T>(key, timestamp, block);
+
+ if (old != null) {
+ old.isCancelled = true;
+ }
+ queue.add(timer);
+ trySchedule(now, invocations, timestamp);
+
+ return timer;
+ }
+ });
+ }
+
+ /**
+ * Check if a timer with a given key is active.
+ *
+ * @param key The key to check if active.
+ * @return `true` if the timer with the specified [key] is active, `false` otherwise.
+ */
+ public boolean isTimerActive(T key) {
+ return timers.containsKey(key);
+ }
+
+ /**
+ * Cancel a timer with a given key.
+ * <p>
+ * If canceling a timer that was already canceled, or key never was used to start
+ * a timer this operation will do nothing.
+ *
+ * @param key The key of the timer to cancel.
+ */
+ public void cancel(T key) {
+ final Timer<T> timer = timers.remove(key);
+
+ // Mark the timer as cancelled
+ if (timer != null) {
+ timer.isCancelled = true;
+ }
+ }
+
+ /**
+ * Cancel all timers.
+ */
+ public void cancelAll() {
+ queue.clear();
+ timers.clear();
+
+ // Cancel all pending invocations
+ for (final Invocation invocation : invocations) {
+ invocation.cancel();
+ }
+
+ invocations.clear();
+ }
+
+ /**
+ * Try to schedule an engine invocation at the specified [target].
+ *
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the engine invocation should happen.
+ * @param scheduled The queue of scheduled invocations.
+ */
+ private void trySchedule(long now, ArrayDeque<Invocation> scheduled, long target) {
+ final Invocation head = scheduled.peek();
+
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (head == null || target < head.timestamp) {
+ final DispatcherHandle handle = dispatcher.scheduleCancellable(target - now, this::doRunTimers);
+ scheduled.addFirst(new Invocation(target, handle));
+ }
+ }
+
+ /**
+ * This method is invoked when the earliest timer expires.
+ */
+ private void doRunTimers() {
+ final ArrayDeque<Invocation> invocations = this.invocations;
+ final Invocation invocation = invocations.remove();
+
+ final PriorityQueue<Timer<T>> queue = this.queue;
+ final HashMap<T, Timer<T>> timers = this.timers;
+ long now = invocation.timestamp;
+
+ while (!queue.isEmpty()) {
+ final Timer<T> timer = queue.peek();
+
+ long timestamp = timer.timestamp;
+ boolean isCancelled = timer.isCancelled;
+
+ assert timestamp >= now : "Found task in the past";
+
+ if (timestamp > now && !isCancelled) {
+ // Schedule a task for the next event to occur.
+ trySchedule(now, invocations, timestamp);
+ break;
+ }
+
+ queue.poll();
+
+ if (!isCancelled) {
+ timers.remove(timer.key);
+ timer.run();
+ }
+ }
+ }
+
+ /**
+ * A task that is scheduled to run in the future.
+ */
+ private static class Timer<T> implements Comparable<Timer<T>> {
+ final T key;
+ final long timestamp;
+ Runnable block;
+
+ /**
+ * A flag to indicate that the task has been cancelled.
+ */
+ boolean isCancelled;
+
+ /**
+ * Construct a {@link Timer} instance.
+ */
+ public Timer(T key, long timestamp, Runnable block) {
+ this.key = key;
+ this.timestamp = timestamp;
+ this.block = block;
+ }
+
+ /**
+ * Run the task.
+ */
+ void run() {
+ block.run();
+ }
+
+ @Override
+ public int compareTo(@NotNull Timer<T> other) {
+ return Long.compare(timestamp, other.timestamp);
+ }
+ }
+
+ /**
+ * A future engine invocation.
+ * <p>
+ * This class is used to keep track of the future engine invocations created using the {@link Dispatcher} instance.
+ * In case the invocation is not needed anymore, it can be cancelled via [cancel].
+ */
+ private record Invocation(long timestamp, DispatcherHandle handle) {
+ /**
+ * Cancel the engine invocation.
+ */
+ void cancel() {
+ handle.cancel();
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt b/opendc-common/src/main/kotlin/org/opendc/common/DispatcherCoroutineDispatcher.kt
index cacbbbf7..63744ef9 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/DispatcherCoroutineDispatcher.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.kotlin
+package org.opendc.common
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
@@ -28,61 +28,37 @@ import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
-import org.opendc.simulator.SimulationScheduler
-import java.lang.Runnable
-import java.time.Clock
import kotlin.coroutines.CoroutineContext
/**
- * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual
- * clock for time management.
+ * A [CoroutineDispatcher] that uses a [Dispatcher] to dispatch pending co-routines.
*
- * @param scheduler The [SimulationScheduler] used to manage the execution of future tasks.
+ * @param dispatcher The [Dispatcher] used to manage the execution of future tasks.
*/
@OptIn(InternalCoroutinesApi::class)
-public class SimulationCoroutineDispatcher(
- override val scheduler: SimulationScheduler = SimulationScheduler()
-) : CoroutineDispatcher(), SimulationController, Delay {
- /**
- * The virtual clock of this dispatcher.
- */
- override val clock: Clock = scheduler.clock
-
+internal class DispatcherCoroutineDispatcher(private val dispatcher: Dispatcher) : CoroutineDispatcher(), Delay, DispatcherProvider {
override fun dispatch(context: CoroutineContext, block: Runnable) {
block.run()
}
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
- scheduler.execute(block)
+ dispatcher.schedule(block)
}
@OptIn(ExperimentalCoroutinesApi::class)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- scheduler.schedule(timeMillis, CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) })
+ dispatcher.schedule(timeMillis, CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) })
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
- return object : DisposableHandle {
- private val deadline = (scheduler.currentTime + timeMillis).let { if (it >= 0) it else Long.MAX_VALUE }
- private val id = scheduler.schedule(timeMillis, block)
-
- override fun dispose() {
- scheduler.cancel(deadline, id)
- }
- }
+ val handle = dispatcher.scheduleCancellable(timeMillis, block)
+ return DisposableHandle { handle.cancel() }
}
- override fun toString(): String {
- return "SimulationCoroutineDispatcher[time=${scheduler.currentTime}ms]"
- }
-
- override fun advanceUntilIdle(): Long {
- val scheduler = scheduler
- val oldTime = scheduler.currentTime
+ override fun getDispatcher(): Dispatcher = dispatcher
- scheduler.advanceUntilIdle()
-
- return scheduler.currentTime - oldTime
+ override fun toString(): String {
+ return "DispatcherCoroutineDispatcher[dispatcher=$dispatcher]"
}
/**
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/Dispatchers.kt b/opendc-common/src/main/kotlin/org/opendc/common/Dispatchers.kt
new file mode 100644
index 00000000..6dcd17af
--- /dev/null
+++ b/opendc-common/src/main/kotlin/org/opendc/common/Dispatchers.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common
+
+import kotlinx.coroutines.CoroutineDispatcher
+
+/**
+ * Convert a [Dispatcher] to a [CoroutineDispatcher].
+ */
+public fun Dispatcher.asCoroutineDispatcher(): CoroutineDispatcher {
+ return DispatcherCoroutineDispatcher(this)
+}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
deleted file mode 100644
index 44d6010f..00000000
--- a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.common.util
-
-import kotlinx.coroutines.Delay
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.InternalCoroutinesApi
-import java.time.Clock
-import java.util.ArrayDeque
-import java.util.PriorityQueue
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A TimerScheduler facilitates scheduled execution of future tasks.
- *
- * @param context The [CoroutineContext] to run the tasks with.
- * @param clock The clock to keep track of the time.
- */
-public class TimerScheduler<T>(private val context: CoroutineContext, private val clock: Clock) {
- /**
- * The [Delay] instance that provides scheduled execution of [Runnable]s.
- */
- @OptIn(InternalCoroutinesApi::class)
- private val delay =
- requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
-
- /**
- * The stack of the invocations to occur in the future.
- */
- private val invocations = ArrayDeque<Invocation>()
-
- /**
- * A priority queue containing the tasks to be scheduled in the future.
- */
- private val queue = PriorityQueue<Timer<T>>()
-
- /**
- * A map that keeps track of the timers.
- */
- private val timers = mutableMapOf<T, Timer<T>>()
-
- /**
- * Start a timer that will invoke the specified [block] after [delay].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
- *
- * @param key The key of the timer to start.
- * @param delay The delay before invoking the block.
- * @param block The block to invoke.
- */
- public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) {
- startSingleTimerTo(key, clock.millis() + delay, block)
- }
-
- /**
- * Start a timer that will invoke the specified [block] at [timestamp].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
- *
- * @param key The key of the timer to start.
- * @param timestamp The timestamp at which to invoke the block.
- * @param block The block to invoke.
- */
- public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
- val now = clock.millis()
- val queue = queue
- val invocations = invocations
-
- require(timestamp >= now) { "Timestamp must be in the future" }
-
- timers.compute(key) { _, old ->
- if (old != null && old.timestamp == timestamp) {
- // Fast-path: timer for the same timestamp already exists
- old.block = block
- old
- } else {
- // Slow-path: cancel old timer and replace it with new timer
- val timer = Timer(key, timestamp, block)
-
- old?.isCancelled = true
- queue.add(timer)
- trySchedule(now, invocations, timestamp)
-
- timer
- }
- }
- }
-
- /**
- * Check if a timer with a given key is active.
- *
- * @param key The key to check if active.
- * @return `true` if the timer with the specified [key] is active, `false` otherwise.
- */
- public fun isTimerActive(key: T): Boolean = key in timers
-
- /**
- * Cancel a timer with a given key.
- *
- * If canceling a timer that was already canceled, or key never was used to start
- * a timer this operation will do nothing.
- *
- * @param key The key of the timer to cancel.
- */
- public fun cancel(key: T) {
- val timer = timers.remove(key)
-
- // Mark the timer as cancelled
- timer?.isCancelled = true
- }
-
- /**
- * Cancel all timers.
- */
- public fun cancelAll() {
- queue.clear()
- timers.clear()
-
- // Cancel all pending invocations
- for (invocation in invocations) {
- invocation.cancel()
- }
- invocations.clear()
- }
-
- /**
- * Try to schedule an engine invocation at the specified [target].
- *
- * @param now The current virtual timestamp.
- * @param target The virtual timestamp at which the engine invocation should happen.
- * @param scheduled The queue of scheduled invocations.
- */
- private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
- val head = scheduled.peek()
-
- // Only schedule a new scheduler invocation in case the target is earlier than all other pending
- // scheduler invocations
- if (head == null || target < head.timestamp) {
- @OptIn(InternalCoroutinesApi::class)
- val handle = delay.invokeOnTimeout(target - now, ::doRunTimers, context)
- scheduled.addFirst(Invocation(target, handle))
- }
- }
-
- /**
- * This method is invoked when the earliest timer expires.
- */
- private fun doRunTimers() {
- val invocations = invocations
- val invocation = checkNotNull(invocations.poll()) // Clear invocation from future invocation queue
- val now = invocation.timestamp
-
- while (queue.isNotEmpty()) {
- val timer = queue.peek()
-
- val timestamp = timer.timestamp
- val isCancelled = timer.isCancelled
-
- assert(timestamp >= now) { "Found task in the past" }
-
- if (timestamp > now && !isCancelled) {
- // Schedule a task for the next event to occur.
- trySchedule(now, invocations, timestamp)
- break
- }
-
- queue.poll()
-
- if (!isCancelled) {
- timers.remove(timer.key)
- timer()
- }
- }
- }
-
- /**
- * A task that is scheduled to run in the future.
- */
- private class Timer<T>(val key: T, val timestamp: Long, var block: () -> Unit) : Comparable<Timer<T>> {
- /**
- * A flag to indicate that the task has been cancelled.
- */
- @JvmField
- var isCancelled: Boolean = false
-
- /**
- * Run the task.
- */
- operator fun invoke(): Unit = block()
-
- override fun compareTo(other: Timer<T>): Int = timestamp.compareTo(other.timestamp)
- }
-
- /**
- * A future engine invocation.
- *
- * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
- * the invocation is not needed anymore, it can be cancelled via [cancel].
- */
- private class Invocation(
- @JvmField val timestamp: Long,
- @JvmField val handle: DisposableHandle
- ) {
- /**
- * Cancel the engine invocation.
- */
- fun cancel() = handle.dispose()
- }
-}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/DispatcherCoroutineDispatcherTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/DispatcherCoroutineDispatcherTest.kt
new file mode 100644
index 00000000..af8a7857
--- /dev/null
+++ b/opendc-common/src/test/kotlin/org/opendc/common/DispatcherCoroutineDispatcherTest.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common
+
+import kotlinx.coroutines.TimeoutCancellationException
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.withContext
+import kotlinx.coroutines.withTimeout
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Test suite for [DispatcherCoroutineDispatcher].
+ */
+class DispatcherCoroutineDispatcherTest {
+ @Test
+ fun testYield() = runSimulation {
+ withContext(dispatcher.asCoroutineDispatcher()) {
+ val startTime = dispatcher.currentTime
+ yield()
+ assertEquals(startTime, dispatcher.currentTime)
+ }
+ }
+
+ @Test
+ fun testDelay() = runSimulation {
+ withContext(dispatcher.asCoroutineDispatcher()) {
+ val startTime = dispatcher.currentTime
+ delay(10)
+ assertEquals(startTime + 10, dispatcher.currentTime)
+ }
+ }
+
+ @Test
+ fun testTimeout() = runSimulation {
+ withContext(dispatcher.asCoroutineDispatcher()) {
+ assertThrows<TimeoutCancellationException> {
+ withTimeout(10) {
+ delay(1000)
+ }
+ }
+
+ assertEquals(10, dispatcher.currentTime)
+ }
+ }
+}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
index 3fae2ebc..3235b046 100644
--- a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
+++ b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
@@ -28,26 +28,18 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.kotlin.runSimulation
-import java.time.Clock
-import kotlin.coroutines.EmptyCoroutineContext
/**
* Test suite for the [Pacer] class.
*/
class PacerTest {
@Test
- fun testEmptyContext() {
- assertThrows<IllegalArgumentException> { Pacer(EmptyCoroutineContext, Clock.systemUTC(), 100) {} }
- }
-
- @Test
fun testSingleEnqueue() {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -62,7 +54,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -80,7 +72,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -98,7 +90,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -116,7 +108,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
index 22a26111..3947fa2e 100644
--- a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
+++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
@@ -29,25 +29,18 @@ import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.kotlin.runSimulation
-import java.time.Clock
-import kotlin.coroutines.EmptyCoroutineContext
/**
* A test suite for the [TimerScheduler] class.
*/
-internal class TimerSchedulerTest {
- @Test
- fun testEmptyContext() {
- assertThrows<IllegalArgumentException> { TimerScheduler<Unit>(EmptyCoroutineContext, Clock.systemUTC()) }
- }
-
+class TimerSchedulerTest {
@Test
fun testBasicTimer() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) {
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
assertTrue(scheduler.isTimerActive(0))
@@ -58,7 +51,7 @@ internal class TimerSchedulerTest {
@Test
fun testCancelNonExisting() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.cancel(1)
}
@@ -67,7 +60,7 @@ internal class TimerSchedulerTest {
@Test
fun testCancelExisting() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) {
fail()
@@ -76,7 +69,7 @@ internal class TimerSchedulerTest {
scheduler.startSingleTimer(1, 100) {
scheduler.cancel(0)
- assertEquals(100, clock.millis())
+ assertEquals(100, timeSource.millis())
}
}
}
@@ -84,7 +77,7 @@ internal class TimerSchedulerTest {
@Test
fun testCancelAll() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) { fail() }
scheduler.startSingleTimer(1, 100) { fail() }
@@ -95,12 +88,12 @@ internal class TimerSchedulerTest {
@Test
fun testOverride() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) { fail() }
scheduler.startSingleTimer(0, 200) {
- assertEquals(200, clock.millis())
+ assertEquals(200, timeSource.millis())
}
}
}
@@ -108,12 +101,12 @@ internal class TimerSchedulerTest {
@Test
fun testOverrideBlock() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) { fail() }
scheduler.startSingleTimer(0, 1000) {
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
}
}
@@ -121,7 +114,7 @@ internal class TimerSchedulerTest {
@Test
fun testNegativeDelay() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, clock)
+ val scheduler = TimerScheduler<Int>(dispatcher)
assertThrows<IllegalArgumentException> {
scheduler.startSingleTimer(1, -1) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 85222c10..9d7dcba6 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -22,15 +22,14 @@
package org.opendc.compute.service
+import org.opendc.common.Dispatcher
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Server
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* The [ComputeService] hosts the API implementation of the OpenDC Compute service.
@@ -80,18 +79,16 @@ public interface ComputeService : AutoCloseable {
/**
* Construct a new [ComputeService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] for scheduling future events.
* @param scheduler The scheduler implementation to use.
* @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
scheduler: ComputeScheduler,
schedulingQuantum: Duration = Duration.ofMinutes(5)
): ComputeService {
- return ComputeServiceImpl(context, clock, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(dispatcher, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index b377c3e3..77932545 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.compute.service.internal
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Flavor
@@ -35,27 +36,23 @@ import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.ArrayDeque
import java.util.Deque
import java.util.Random
import java.util.UUID
-import kotlin.coroutines.CoroutineContext
import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param coroutineContext The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] for scheduling future events.
* @param scheduler The scheduler implementation to use.
* @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
- coroutineContext: CoroutineContext,
- private val clock: Clock,
+ private val dispatcher: Dispatcher,
private val scheduler: ComputeScheduler,
schedulingQuantum: Duration
) : ComputeService, HostListener {
@@ -108,6 +105,7 @@ internal class ComputeServiceImpl(
override val hosts: Set<Host>
get() = hostToView.keys
+ private val clock = dispatcher.timeSource
private var maxCores = 0
private var maxMemory = 0L
private var _attemptsSuccess = 0L
@@ -120,7 +118,7 @@ internal class ComputeServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() }
override fun newClient(): ComputeClient {
check(!isClosed) { "Service is already closed" }
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
index 233f5ef6..0840ba7e 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
@@ -26,7 +26,8 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import org.opendc.compute.service.scheduler.weights.HostWeigher
-import java.util.Random
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
import kotlin.math.min
/**
@@ -39,13 +40,13 @@ import kotlin.math.min
* @param filters The list of filters to apply when searching for an appropriate host.
* @param weighers The list of weighers to apply when searching for an appropriate host.
* @param subsetSize The size of the subset of best hosts from which a target is randomly chosen.
- * @param random A [Random] instance for selecting
+ * @param random A [RandomGenerator] instance for selecting
*/
public class FilterScheduler(
private val filters: List<HostFilter>,
private val weighers: List<HostWeigher>,
private val subsetSize: Int = 1,
- private val random: Random = Random(0)
+ private val random: RandomGenerator = SplittableRandom(0)
) : ComputeScheduler {
/**
* The pool of hosts available to the scheduler.
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index c18709f3..b5685aba 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -62,12 +62,11 @@ internal class ComputeServiceTest {
@BeforeEach
fun setUp() {
scope = SimulationCoroutineScope()
- val clock = scope.clock
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
weighers = listOf(RamWeigher())
)
- service = ComputeService(scope.coroutineContext, clock, computeScheduler)
+ service = ComputeService(scope.dispatcher, computeScheduler)
}
@Test
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 5eccc6ec..a44ccc27 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -45,9 +45,9 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.compute.workload.SimWorkloads
-import java.time.Clock
import java.time.Duration
import java.time.Instant
+import java.time.InstantSource
import java.util.UUID
import java.util.function.Supplier
@@ -68,7 +68,7 @@ public class SimHost(
override val uid: UUID,
override val name: String,
override val meta: Map<String, Any>,
- private val clock: Clock,
+ private val clock: InstantSource,
private val machine: SimBareMetalMachine,
private val hypervisor: SimHypervisor,
private val mapper: SimWorkloadMapper = DefaultWorkloadMapper,
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
index 258ccc89..d34f70d7 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
@@ -23,7 +23,7 @@
package org.opendc.compute.simulator.failure
import org.opendc.compute.simulator.SimHost
-import java.time.Clock
+import java.time.InstantSource
/**
* Interface responsible for applying the fault to a host.
@@ -32,5 +32,5 @@ public interface HostFault {
/**
* Apply the fault to the specified [victims].
*/
- public suspend fun apply(clock: Clock, victims: List<SimHost>)
+ public suspend fun apply(clock: InstantSource, victims: List<SimHost>)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
index 5eff439f..afbb99d2 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
@@ -26,6 +26,7 @@ import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.internal.HostFaultInjectorImpl
import java.time.Clock
+import java.time.InstantSource
import kotlin.coroutines.CoroutineContext
/**
@@ -55,7 +56,7 @@ public interface HostFaultInjector : AutoCloseable {
*/
public operator fun invoke(
context: CoroutineContext,
- clock: Clock,
+ clock: InstantSource,
hosts: Set<SimHost>,
iat: RealDistribution,
selector: VictimSelector,
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
index fc7cebfc..8bd25391 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
@@ -25,14 +25,14 @@ package org.opendc.compute.simulator.failure
import kotlinx.coroutines.delay
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.simulator.SimHost
-import java.time.Clock
+import java.time.InstantSource
import kotlin.math.roundToLong
/**
* A type of [HostFault] where the hosts are stopped and recover after some random amount of time.
*/
public class StartStopHostFault(private val duration: RealDistribution) : HostFault {
- override suspend fun apply(clock: Clock, victims: List<SimHost>) {
+ override suspend fun apply(clock: InstantSource, victims: List<SimHost>) {
for (host in victims) {
host.fail()
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
index b6d466bd..4aba0e91 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
@@ -24,7 +24,9 @@ package org.opendc.compute.simulator.failure
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.simulator.SimHost
-import java.util.Random
+import java.util.ArrayList
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
import kotlin.math.roundToInt
/**
@@ -32,12 +34,30 @@ import kotlin.math.roundToInt
*/
public class StochasticVictimSelector(
private val size: RealDistribution,
- private val random: Random = Random(0)
+ private val random: RandomGenerator = SplittableRandom(0)
) : VictimSelector {
override fun select(hosts: Set<SimHost>): List<SimHost> {
val n = size.sample().roundToInt()
- return hosts.shuffled(random).take(n)
+ val result = ArrayList<SimHost>(n)
+
+ val random = random
+ var samplesNeeded = n
+ var remainingHosts = hosts.size
+ val iterator = hosts.iterator()
+
+ while (iterator.hasNext() && samplesNeeded > 0) {
+ val host = iterator.next()
+
+ if (random.nextInt(remainingHosts) < samplesNeeded) {
+ result.add(host)
+ samplesNeeded--
+ }
+
+ remainingHosts--
+ }
+
+ return result
}
override fun toString(): String = "StochasticVictimSelector[$size]"
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index ca947625..02766cb1 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -32,15 +32,15 @@ import org.opendc.compute.simulator.SimWorkloadMapper
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimVirtualMachine
-import java.time.Clock
import java.time.Duration
import java.time.Instant
+import java.time.InstantSource
/**
* A virtual machine instance that is managed by a [SimHost].
*/
internal class Guest(
- private val clock: Clock,
+ private val clock: InstantSource,
val host: SimHost,
private val hypervisor: SimHypervisor,
private val mapper: SimWorkloadMapper,
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
index f03bffe9..afc0b0d4 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
@@ -32,7 +32,7 @@ import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.failure.HostFault
import org.opendc.compute.simulator.failure.HostFaultInjector
import org.opendc.compute.simulator.failure.VictimSelector
-import java.time.Clock
+import java.time.InstantSource
import kotlin.coroutines.CoroutineContext
import kotlin.math.roundToLong
@@ -40,7 +40,7 @@ import kotlin.math.roundToLong
* Internal implementation of the [HostFaultInjector] interface.
*
* @param context The scope to run the fault injector in.
- * @param clock The [Clock] to keep track of simulation time.
+ * @param clock The [InstantSource] to keep track of simulation time.
* @param hosts The set of hosts to inject faults into.
* @param iat The inter-arrival time distribution of the failures (in hours).
* @param selector The [VictimSelector] to select the host victims.
@@ -48,7 +48,7 @@ import kotlin.math.roundToLong
*/
internal class HostFaultInjectorImpl(
private val context: CoroutineContext,
- private val clock: Clock,
+ private val clock: InstantSource,
private val hosts: Set<SimHost>,
private val iat: RealDistribution,
private val selector: VictimSelector,
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index fc581d3e..a496cc99 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -75,7 +75,7 @@ internal class SimHostTest {
fun testSingle() = runSimulation {
val duration = 5 * 60L
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -85,7 +85,7 @@ internal class SimHostTest {
uid = UUID.randomUUID(),
name = "test",
meta = emptyMap(),
- clock,
+ timeSource,
machine,
hypervisor
)
@@ -131,7 +131,7 @@ internal class SimHostTest {
{ assertEquals(639, cpuStats.activeTime, "Active time does not match") },
{ assertEquals(2360, cpuStats.idleTime, "Idle time does not match") },
{ assertEquals(56, cpuStats.stealTime, "Steal time does not match") },
- { assertEquals(1500001, clock.millis()) }
+ { assertEquals(1500001, timeSource.millis()) }
)
}
@@ -142,7 +142,7 @@ internal class SimHostTest {
fun testOvercommitted() = runSimulation {
val duration = 5 * 60L
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -152,7 +152,7 @@ internal class SimHostTest {
uid = UUID.randomUUID(),
name = "test",
meta = emptyMap(),
- clock,
+ timeSource,
machine,
hypervisor
)
@@ -218,7 +218,7 @@ internal class SimHostTest {
{ assertEquals(658, cpuStats.activeTime, "Active time does not match") },
{ assertEquals(2341, cpuStats.idleTime, "Idle time does not match") },
{ assertEquals(637, cpuStats.stealTime, "Steal time does not match") },
- { assertEquals(1500001, clock.millis()) }
+ { assertEquals(1500001, timeSource.millis()) }
)
}
@@ -229,7 +229,7 @@ internal class SimHostTest {
fun testFailure() = runSimulation {
val duration = 5 * 60L
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -238,7 +238,7 @@ internal class SimHostTest {
uid = UUID.randomUUID(),
name = "test",
meta = emptyMap(),
- clock,
+ timeSource,
machine,
hypervisor
)
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
index 90f534e6..29d0b5e7 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
@@ -30,15 +30,15 @@ import org.apache.commons.math3.random.Well19937c
import org.junit.jupiter.api.Test
import org.opendc.compute.simulator.SimHost
import org.opendc.simulator.kotlin.runSimulation
-import java.time.Clock
import java.time.Duration
+import java.time.InstantSource
import kotlin.coroutines.CoroutineContext
import kotlin.math.ln
/**
* Test suite for [HostFaultInjector] class.
*/
-internal class HostFaultInjectorTest {
+class HostFaultInjectorTest {
/**
* Simple test case to test that nothing happens when the injector is not started.
*/
@@ -46,7 +46,7 @@ internal class HostFaultInjectorTest {
fun testInjectorNotStarted() = runSimulation {
val host = mockk<SimHost>(relaxUnitFun = true)
- val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+ val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host))
coVerify(exactly = 0) { host.fail() }
coVerify(exactly = 0) { host.recover() }
@@ -61,7 +61,7 @@ internal class HostFaultInjectorTest {
fun testInjectorStopsMachine() = runSimulation {
val host = mockk<SimHost>(relaxUnitFun = true)
- val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+ val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host))
injector.start()
@@ -83,7 +83,7 @@ internal class HostFaultInjectorTest {
mockk(relaxUnitFun = true)
)
- val injector = createSimpleInjector(coroutineContext, clock, hosts.toSet())
+ val injector = createSimpleInjector(coroutineContext, timeSource, hosts.toSet())
injector.start()
@@ -100,7 +100,7 @@ internal class HostFaultInjectorTest {
/**
* Create a simple start stop fault injector.
*/
- private fun createSimpleInjector(context: CoroutineContext, clock: Clock, hosts: Set<SimHost>): HostFaultInjector {
+ private fun createSimpleInjector(context: CoroutineContext, clock: InstantSource, hosts: Set<SimHost>): HostFaultInjector {
val rng = Well19937c(0)
val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03)
val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25))
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt
index 7fe3a2eb..eae5806e 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt
@@ -22,13 +22,12 @@
package org.opendc.experiments.provisioner
+import org.opendc.common.Dispatcher
import org.opendc.experiments.MutableServiceRegistry
import org.opendc.experiments.ServiceRegistry
import org.opendc.experiments.internal.ServiceRegistryImpl
-import java.time.Clock
import java.util.ArrayDeque
import java.util.SplittableRandom
-import kotlin.coroutines.CoroutineContext
/**
* A helper class to set up the experimental environment in a reproducible manner.
@@ -37,17 +36,15 @@ import kotlin.coroutines.CoroutineContext
* [ProvisioningStep]s are executed sequentially and ensure that the necessary infrastructure is configured and teared
* down after the simulation completes.
*
- * @param coroutineContext The [CoroutineContext] in which the environment is set up.
- * @param clock The simulation [Clock].
+ * @param dispatcher The [Dispatcher] implementation for scheduling future tasks.
* @param seed A seed for initializing the randomness of the environment.
*/
-public class Provisioner(coroutineContext: CoroutineContext, clock: Clock, seed: Long) : AutoCloseable {
+public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable {
/**
* Implementation of [ProvisioningContext].
*/
private val context = object : ProvisioningContext {
- override val clock: Clock = clock
- override val coroutineContext: CoroutineContext = coroutineContext
+ override val dispatcher: Dispatcher = dispatcher
override val seeder: SplittableRandom = SplittableRandom(seed)
override val registry: MutableServiceRegistry = ServiceRegistryImpl()
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt
index 73897315..e53044ce 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt
@@ -22,31 +22,26 @@
package org.opendc.experiments.provisioner
+import org.opendc.common.Dispatcher
import org.opendc.experiments.MutableServiceRegistry
-import java.time.Clock
import java.util.SplittableRandom
-import kotlin.coroutines.CoroutineContext
+import java.util.random.RandomGenerator
/**
* The [ProvisioningContext] class provides access to shared state between subsequent [ProvisioningStep]s, as well as
- * access to the simulation dispatcher (via [CoroutineContext]), the virtual clock, and a randomness seeder to allow
+ * access to the simulation dispatcher, the virtual clock, and a randomness seeder to allow
* the provisioning steps to initialize the (simulated) resources.
*/
public interface ProvisioningContext {
/**
- * The [CoroutineContext] in which the provisioner runs.
+ * The [Dispatcher] provided by the provisioner to schedule future events during the simulation.
*/
- public val coroutineContext: CoroutineContext
-
- /**
- * The [Clock] tracking the virtual simulation time.
- */
- public val clock: Clock
+ public val dispatcher: Dispatcher
/**
* A [SplittableRandom] instance used to seed the provisioners.
*/
- public val seeder: SplittableRandom
+ public val seeder: RandomGenerator
/**
* A [MutableServiceRegistry] where the provisioned services are registered.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
index 3e3d758d..1221f084 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
@@ -75,7 +75,7 @@ class CapelinBenchmarks {
fun benchmarkCapelin() = runSimulation {
val serviceDomain = "compute.opendc.org"
- Provisioner(coroutineContext, clock, seed = 0).use { provisioner ->
+ Provisioner(dispatcher, seed = 0).use { provisioner ->
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -87,7 +87,7 @@ class CapelinBenchmarks {
)
val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
- service.replay(clock, vms, 0L, interference = true)
+ service.replay(timeSource, vms, 0L, interference = true)
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
index 2c3573dc..2567a4d5 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
@@ -64,7 +64,7 @@ public class CapelinRunner(
val serviceDomain = "compute.opendc.org"
val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt"))
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }),
setupHosts(serviceDomain, topology, optimize = true)
@@ -96,7 +96,7 @@ public class CapelinRunner(
null
}
- service.replay(clock, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference)
+ service.replay(timeSource, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference)
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
index 0b4cafa6..3a2acbd7 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
@@ -34,8 +34,9 @@ import org.opendc.simulator.compute.power.CpuPowerModel
import org.opendc.simulator.compute.power.CpuPowerModels
import java.io.File
import java.io.InputStream
-import java.util.Random
+import java.util.SplittableRandom
import java.util.UUID
+import java.util.random.RandomGenerator
import kotlin.math.roundToLong
/**
@@ -49,7 +50,7 @@ private val reader = ClusterSpecReader()
fun clusterTopology(
file: File,
powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0),
- random: Random = Random(0)
+ random: RandomGenerator = SplittableRandom(0)
): List<HostSpec> {
return clusterTopology(reader.read(file), powerModel, random)
}
@@ -60,7 +61,7 @@ fun clusterTopology(
fun clusterTopology(
input: InputStream,
powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0),
- random: Random = Random(0)
+ random: RandomGenerator = SplittableRandom(0)
): List<HostSpec> {
return clusterTopology(reader.read(input), powerModel, random)
}
@@ -68,14 +69,14 @@ fun clusterTopology(
/**
* Construct a topology from the given list of [clusters].
*/
-fun clusterTopology(clusters: List<ClusterSpec>, powerModel: CpuPowerModel, random: Random = Random(0)): List<HostSpec> {
+fun clusterTopology(clusters: List<ClusterSpec>, powerModel: CpuPowerModel, random: RandomGenerator = SplittableRandom(0)): List<HostSpec> {
return clusters.flatMap { it.toHostSpecs(random, powerModel) }
}
/**
* Helper method to convert a [ClusterSpec] into a list of [HostSpec]s.
*/
-private fun ClusterSpec.toHostSpecs(random: Random, powerModel: CpuPowerModel): List<HostSpec> {
+private fun ClusterSpec.toHostSpecs(random: RandomGenerator, powerModel: CpuPowerModel): List<HostSpec> {
val cpuSpeed = cpuSpeed
val memoryPerHost = memCapacityPerHost.roundToLong()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 77b0d09f..7e01bb64 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -94,7 +94,7 @@ class CapelinIntegrationTest {
val topology = createTopology()
val monitor = monitor
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -102,7 +102,7 @@ class CapelinIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(clock, workload, seed)
+ service.replay(timeSource, workload, seed)
}
println(
@@ -138,7 +138,7 @@ class CapelinIntegrationTest {
val topology = createTopology("single")
val monitor = monitor
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -146,7 +146,7 @@ class CapelinIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(clock, workload, seed)
+ service.replay(timeSource, workload, seed)
}
println(
@@ -177,7 +177,7 @@ class CapelinIntegrationTest {
val workload = createTestWorkload(1.0, seed)
val topology = createTopology("single")
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -185,7 +185,7 @@ class CapelinIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(clock, workload, seed, interference = true)
+ service.replay(timeSource, workload, seed, interference = true)
}
println(
@@ -216,7 +216,7 @@ class CapelinIntegrationTest {
val workload = createTestWorkload(0.25, seed)
val monitor = monitor
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -224,7 +224,7 @@ class CapelinIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(clock, workload, seed, failureModel = grid5000(Duration.ofDays(7)))
+ service.replay(timeSource, workload, seed, failureModel = grid5000(Duration.ofDays(7)))
}
// Note that these values have been verified beforehand
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt
index bbc70489..125ba6ef 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt
@@ -34,12 +34,13 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import java.util.Random
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
/**
* Create a [ComputeScheduler] for the experiment.
*/
-public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler {
+public fun createComputeScheduler(name: String, seeder: RandomGenerator, placements: Map<String, String> = emptyMap()): ComputeScheduler {
val cpuAllocationRatio = 16.0
val ramAllocationRatio = 1.5
return when (name) {
@@ -79,7 +80,7 @@ public fun createComputeScheduler(name: String, seeder: Random, placements: Map<
filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
weighers = emptyList(),
subsetSize = Int.MAX_VALUE,
- random = Random(seeder.nextLong())
+ random = SplittableRandom(seeder.nextLong())
)
"replay" -> ReplayScheduler(placements)
else -> throw IllegalArgumentException("Unknown policy $name")
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt
index 38cbf2dc..d7347327 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt
@@ -41,7 +41,7 @@ public class ComputeServiceProvisioningStep internal constructor(
private val schedulingQuantum: Duration
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
- val service = ComputeService(ctx.coroutineContext, ctx.clock, scheduler(ctx), schedulingQuantum)
+ val service = ComputeService(ctx.dispatcher, scheduler(ctx), schedulingQuantum)
ctx.registry.register(serviceDomain, ComputeService::class.java, service)
return AutoCloseable { service.close() }
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt
index 2200880d..b7884293 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.compute
-import java.util.Random
+import java.util.random.RandomGenerator
/**
* An interface that describes how a workload is resolved.
@@ -31,5 +31,5 @@ public interface ComputeWorkload {
/**
* Resolve the workload into a list of [VirtualMachine]s to simulate.
*/
- public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
+ public fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine>
}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt
index 81a5cf33..eb85dbb8 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt
@@ -24,8 +24,8 @@ package org.opendc.experiments.compute
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.failure.HostFaultInjector
-import java.time.Clock
-import java.util.Random
+import java.time.InstantSource
+import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
/**
@@ -37,8 +37,8 @@ public interface FailureModel {
*/
public fun createInjector(
context: CoroutineContext,
- clock: Clock,
+ clock: InstantSource,
service: ComputeService,
- random: Random
+ random: RandomGenerator
): HostFaultInjector
}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt
index ff747066..679e370a 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt
@@ -31,9 +31,9 @@ import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.failure.HostFaultInjector
import org.opendc.compute.simulator.failure.StartStopHostFault
import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import java.time.Clock
import java.time.Duration
-import java.util.Random
+import java.time.InstantSource
+import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
import kotlin.math.ln
@@ -47,9 +47,9 @@ public fun grid5000(failureInterval: Duration): FailureModel {
return object : FailureModel {
override fun createInjector(
context: CoroutineContext,
- clock: Clock,
+ clock: InstantSource,
service: ComputeService,
- random: Random
+ random: RandomGenerator
): HostFaultInjector {
val rng = Well19937c(random.nextLong())
val hosts = service.hosts.map { it as SimHost }.toSet()
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
index e224fb84..310aa54c 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
@@ -46,7 +46,7 @@ public class HostsProvisioningStep internal constructor(
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" }
- val engine = FlowEngine.create(ctx.coroutineContext, ctx.clock)
+ val engine = FlowEngine.create(ctx.dispatcher)
val graph = engine.newGraph()
val hosts = mutableSetOf<SimHost>()
@@ -58,7 +58,7 @@ public class HostsProvisioningStep internal constructor(
spec.uid,
spec.name,
spec.meta,
- ctx.clock,
+ ctx.dispatcher.timeSource,
machine,
hypervisor,
optimize = optimize
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
index f0e31932..16d28edb 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
@@ -29,7 +29,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
import org.opendc.compute.service.ComputeService
-import java.time.Clock
+import java.time.InstantSource
import java.util.Random
import kotlin.coroutines.coroutineContext
import kotlin.math.max
@@ -45,7 +45,7 @@ import kotlin.math.max
* @param interference A flag to indicate that VM interference needs to be enabled.
*/
public suspend fun ComputeService.replay(
- clock: Clock,
+ clock: InstantSource,
trace: List<VirtualMachine>,
seed: Long,
submitImmediately: Boolean = false,
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt
index 3a7a51f2..ca23a7c5 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt
@@ -26,7 +26,7 @@ import mu.KotlinLogging
import org.opendc.experiments.compute.ComputeWorkload
import org.opendc.experiments.compute.ComputeWorkloadLoader
import org.opendc.experiments.compute.VirtualMachine
-import java.util.Random
+import java.util.random.RandomGenerator
/**
* A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads.
@@ -37,7 +37,7 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> {
val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt
index a6055762..583405da 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt
@@ -26,8 +26,8 @@ import mu.KotlinLogging
import org.opendc.experiments.compute.ComputeWorkload
import org.opendc.experiments.compute.ComputeWorkloadLoader
import org.opendc.experiments.compute.VirtualMachine
-import java.util.Random
import java.util.UUID
+import java.util.random.RandomGenerator
/**
* A [ComputeWorkload] that samples HPC VMs in the workload.
@@ -46,7 +46,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
*/
private val pattern = Regex("^(ComputeNode|cn).*")
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> {
val vms = source.resolve(loader, random)
val (hpc, nonHpc) = vms.partition { entry ->
@@ -58,7 +58,6 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
.map { index ->
val res = mutableListOf<VirtualMachine>()
hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
res
}
.flatten()
@@ -67,7 +66,6 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
.map { index ->
val res = mutableListOf<VirtualMachine>()
nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
res
}
.flatten()
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt
index 793f1de9..ffb7e0c6 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt
@@ -26,7 +26,7 @@ import mu.KotlinLogging
import org.opendc.experiments.compute.ComputeWorkload
import org.opendc.experiments.compute.ComputeWorkloadLoader
import org.opendc.experiments.compute.VirtualMachine
-import java.util.Random
+import java.util.random.RandomGenerator
/**
* A [ComputeWorkload] that is sampled based on total load.
@@ -37,7 +37,7 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> {
val vms = source.resolve(loader, random)
val res = mutableListOf<VirtualMachine>()
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt
index b4e9005f..d9e311cd 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt
@@ -25,13 +25,13 @@ package org.opendc.experiments.compute.internal
import org.opendc.experiments.compute.ComputeWorkload
import org.opendc.experiments.compute.ComputeWorkloadLoader
import org.opendc.experiments.compute.VirtualMachine
-import java.util.Random
+import java.util.random.RandomGenerator
/**
* A [ComputeWorkload] from a trace.
*/
internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> {
return loader.get(name, format)
}
}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
index ac058171..efd38a3c 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
@@ -27,6 +27,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.compute.api.Server
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
@@ -35,7 +37,6 @@ import org.opendc.experiments.compute.telemetry.table.HostTableReader
import org.opendc.experiments.compute.telemetry.table.ServerInfo
import org.opendc.experiments.compute.telemetry.table.ServerTableReader
import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
-import java.time.Clock
import java.time.Duration
import java.time.Instant
@@ -43,20 +44,20 @@ import java.time.Instant
* A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
* export interval.
*
- * @param scope The [CoroutineScope] to run the reader in.
- * @param clock The virtual clock.
+ * @param dispatcher A [Dispatcher] for scheduling the future events.
* @param service The [ComputeService] to monitor.
* @param monitor The monitor to export the metrics to.
* @param exportInterval The export interval.
*/
public class ComputeMetricReader(
- scope: CoroutineScope,
- clock: Clock,
+ dispatcher: Dispatcher,
private val service: ComputeService,
private val monitor: ComputeMonitor,
private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher())
+ private val clock = dispatcher.timeSource
/**
* Aggregator for service metrics.
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
index 68ca5ae8..665611dd 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
@@ -22,9 +22,6 @@
package org.opendc.experiments.compute.telemetry
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import org.opendc.compute.service.ComputeService
import org.opendc.experiments.provisioner.ProvisioningContext
import org.opendc.experiments.provisioner.ProvisioningStep
@@ -40,13 +37,8 @@ public class ComputeMonitorProvisioningStep internal constructor(
private val exportInterval: Duration
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
- val scope = CoroutineScope(ctx.coroutineContext + Job())
val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" }
- val metricReader = ComputeMetricReader(scope, ctx.clock, service, monitor, exportInterval)
-
- return AutoCloseable {
- metricReader.close()
- scope.cancel()
- }
+ val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval)
+ return AutoCloseable { metricReader.close() }
}
}
diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt
index 3b4200c8..e5c2f86a 100644
--- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt
@@ -56,10 +56,9 @@ public class FaaSServiceProvisioningStep internal constructor(
} else {
ZeroDelayInjector
}
- val deployer = SimFunctionDeployer(ctx.coroutineContext, ctx.clock, machineModel, delayInjector)
+ val deployer = SimFunctionDeployer(ctx.dispatcher, machineModel, delayInjector)
val service = FaaSService(
- ctx.coroutineContext,
- ctx.clock,
+ ctx.dispatcher,
deployer,
routingPolicy(ctx),
terminationPolicy(ctx)
diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt
index c4001e2e..7a354d69 100644
--- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt
@@ -28,16 +28,16 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.opendc.faas.service.FaaSService
-import java.time.Clock
+import java.time.InstantSource
import kotlin.math.max
/**
* Run a simulation of the [FaaSService] by replaying the workload trace given by [trace].
*
- * @param clock A [Clock] instance tracking simulation time.
+ * @param clock An [InstantSource] instance tracking simulation time.
* @param trace The trace to simulate.
*/
-public suspend fun FaaSService.replay(clock: Clock, trace: List<FunctionTrace>) {
+public suspend fun FaaSService.replay(clock: InstantSource, trace: List<FunctionTrace>) {
val client = newClient()
try {
coroutineScope {
diff --git a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt
index 1ad9c57f..4a4d9ae0 100644
--- a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt
+++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt
@@ -49,12 +49,12 @@ class FaaSExperiment {
fun testSmoke() = runSimulation {
val faasService = "faas.opendc.org"
- Provisioner(coroutineContext, clock, seed = 0L).use { provisioner ->
+ Provisioner(dispatcher, seed = 0L).use { provisioner ->
provisioner.runStep(
setupFaaSService(
faasService,
{ RandomRoutingPolicy() },
- { FunctionTerminationPolicyFixed(it.coroutineContext, it.clock, timeout = Duration.ofMinutes(10)) },
+ { FunctionTerminationPolicyFixed(it.dispatcher, timeout = Duration.ofMinutes(10)) },
createMachineModel(),
coldStartModel = ColdStartModel.GOOGLE
)
@@ -63,7 +63,7 @@ class FaaSExperiment {
val service = provisioner.registry.resolve(faasService, FaaSService::class.java)!!
val trace = ServerlessTraceReader().parse(File("src/test/resources/trace"))
- service.replay(clock, trace)
+ service.replay(timeSource, trace)
val stats = service.getSchedulerStats()
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index eb308970..53bf5aa6 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -22,12 +22,9 @@
package org.opendc.experiments.tf20.core
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimMachineContext
@@ -36,17 +33,14 @@ import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.CpuPowerModel
-import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow2.FlowEngine
import org.opendc.simulator.flow2.FlowStage
import org.opendc.simulator.flow2.FlowStageLogic
import org.opendc.simulator.flow2.OutPort
-import java.time.Clock
import java.util.ArrayDeque
import java.util.UUID
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.math.ceil
import kotlin.math.roundToLong
@@ -57,22 +51,16 @@ import kotlin.math.roundToLong
public class SimTFDevice(
override val uid: UUID,
override val isGpu: Boolean,
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
pu: ProcessingUnit,
private val memory: MemoryUnit,
powerModel: CpuPowerModel
) : TFDevice {
/**
- * The scope in which the device runs.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The [SimMachine] representing the device.
*/
private val machine = SimBareMetalMachine.create(
- FlowEngine.create(context, clock).newGraph(),
+ FlowEngine.create(dispatcher).newGraph(),
MachineModel(listOf(pu), listOf(memory)),
SimPsuFactories.simple(powerModel)
)
@@ -162,9 +150,7 @@ public class SimTFDevice(
}
init {
- scope.launch {
- machine.runWorkload(workload)
- }
+ machine.startWorkload(workload, emptyMap()) {}
}
override suspend fun load(dataSize: Long) {
@@ -185,7 +171,6 @@ public class SimTFDevice(
override fun close() {
machine.cancel()
- scope.cancel()
}
private data class Work(var flops: Double, val cont: Continuation<Unit>) {
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
index 7d65a674..5b408fb3 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
@@ -23,19 +23,18 @@
package org.opendc.experiments.tf20.network
import kotlinx.coroutines.channels.Channel
+import org.opendc.common.Dispatcher
import org.opendc.common.util.TimerScheduler
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
/**
* The network controller represents a simple network model between the worker and master nodes during
* TensorFlow execution.
*/
-public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCloseable {
+public class NetworkController(dispatcher: Dispatcher) : AutoCloseable {
/**
* The scheduler for the message.
*/
- private val scheduler = TimerScheduler<Message>(context, clock)
+ private val scheduler = TimerScheduler<Message>(dispatcher)
/**
* The outbound communication channels.
diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
index 32f72686..899aafc0 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
@@ -48,8 +48,7 @@ class TensorFlowTest {
val device = SimTFDevice(
def.uid,
def.meta["gpu"] as Boolean,
- coroutineContext,
- clock,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -67,7 +66,7 @@ class TensorFlowTest {
val stats = device.getDeviceStats()
assertAll(
- { assertEquals(3309694252, clock.millis()) },
+ { assertEquals(3309694252, timeSource.millis()) },
{ assertEquals(8.27423563E8, stats.energyUsage) }
)
}
@@ -83,8 +82,7 @@ class TensorFlowTest {
val device = SimTFDevice(
def.uid,
def.meta["gpu"] as Boolean,
- coroutineContext,
- clock,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -102,7 +100,7 @@ class TensorFlowTest {
val stats = device.getDeviceStats()
assertAll(
- { assertEquals(176230328513, clock.millis()) },
+ { assertEquals(176230328513, timeSource.millis()) },
{ assertEquals(4.405758212825E10, stats.energyUsage) }
)
}
@@ -118,8 +116,7 @@ class TensorFlowTest {
val deviceA = SimTFDevice(
def.uid,
def.meta["gpu"] as Boolean,
- coroutineContext,
- clock,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -128,8 +125,7 @@ class TensorFlowTest {
val deviceB = SimTFDevice(
UUID.randomUUID(),
def.meta["gpu"] as Boolean,
- coroutineContext,
- clock,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -150,7 +146,7 @@ class TensorFlowTest {
val statsA = deviceA.getDeviceStats()
val statsB = deviceB.getDeviceStats()
assertAll(
- { assertEquals(1704994000, clock.millis()) },
+ { assertEquals(1704994000, timeSource.millis()) },
{ assertEquals(4.262485E8, statsA.energyUsage) },
{ assertEquals(4.262485E8, statsB.energyUsage) }
)
diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
index 910cbcc9..549c6f3e 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
@@ -47,8 +47,7 @@ internal class SimTFDeviceTest {
val device = SimTFDevice(
UUID.randomUUID(),
isGpu = true,
- coroutineContext,
- clock,
+ dispatcher,
pu,
memory,
CpuPowerModels.linear(250.0, 100.0)
@@ -56,7 +55,7 @@ internal class SimTFDeviceTest {
// Load 1 GiB into GPU memory
device.load(1000)
- assertEquals(1140, clock.millis())
+ assertEquals(1140, timeSource.millis())
coroutineScope {
launch { device.compute(1e6) }
@@ -68,7 +67,7 @@ internal class SimTFDeviceTest {
val stats = device.getDeviceStats()
assertAll(
- { assertEquals(3681, clock.millis()) },
+ { assertEquals(3681, timeSource.millis()) },
{ assertEquals(749.25, stats.energyUsage) }
)
}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
index b622362a..2037dad4 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
@@ -42,7 +42,7 @@ import org.opendc.workflow.api.Task
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
import org.opendc.workflow.service.WorkflowService
-import java.time.Clock
+import java.time.InstantSource
import java.util.UUID
import kotlin.collections.HashMap
import kotlin.collections.HashSet
@@ -110,7 +110,7 @@ public fun Trace.toJobs(): List<Job> {
/**
* Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished.
*/
-public suspend fun WorkflowService.replay(clock: Clock, jobs: List<Job>) {
+public suspend fun WorkflowService.replay(clock: InstantSource, jobs: List<Job>) {
// Sort jobs by their arrival time
val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
if (orderedJobs.isEmpty()) {
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
index 5cee9abf..fe4fde17 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
@@ -47,8 +47,7 @@ public class WorkflowServiceProvisioningStep internal constructor(
val client = computeService.newClient()
val service = WorkflowService(
- ctx.coroutineContext,
- ctx.clock,
+ ctx.dispatcher,
client,
scheduler.schedulingQuantum,
jobAdmissionPolicy = scheduler.jobAdmissionPolicy,
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 7b40d867..96619cdb 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -22,6 +22,7 @@
package org.opendc.faas.service
+import org.opendc.common.Dispatcher
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
@@ -30,9 +31,7 @@ import org.opendc.faas.service.internal.FaaSServiceImpl
import org.opendc.faas.service.router.RoutingPolicy
import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* The [FaaSService] hosts the service implementation of the OpenDC FaaS platform.
@@ -62,22 +61,20 @@ public interface FaaSService : AutoCloseable {
/**
* Construct a new [FaaSService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] used for scheduling events.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
* @param terminationPolicy The policy for terminating function instances.
* @param quantum The scheduling quantum of the service (100 ms default)
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
terminationPolicy: FunctionTerminationPolicy,
quantum: Duration = Duration.ofMillis(100)
): FaaSService {
- return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum)
+ return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum)
}
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
index d579ad0c..a2c371e1 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -22,12 +22,11 @@
package org.opendc.faas.service.autoscaler
+import org.opendc.common.Dispatcher
import org.opendc.common.util.TimerScheduler
import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceState
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time.
@@ -35,14 +34,13 @@ import kotlin.coroutines.CoroutineContext
* @param timeout The idle timeout after which the function instance is terminated.
*/
public class FunctionTerminationPolicyFixed(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
public val timeout: Duration
) : FunctionTerminationPolicy {
/**
* The [TimerScheduler] used to schedule the function terminations.
*/
- private val scheduler = TimerScheduler<FunctionInstance>(context, clock)
+ private val scheduler = TimerScheduler<FunctionInstance>(dispatcher)
override fun enqueue(instance: FunctionInstance) {
// Cancel the existing timeout timer
@@ -61,6 +59,6 @@ public class FunctionTerminationPolicyFixed(
* Schedule termination for the specified [instance].
*/
private fun schedule(instance: FunctionInstance) {
- scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() }
+ scheduler.startSingleTimer(instance, timeout.toMillis()) { instance.close() }
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index 901213af..b1e6b3f5 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -22,13 +22,11 @@
package org.opendc.faas.service.internal
-import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.suspendCancellableCoroutine
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
import org.opendc.common.util.Pacer
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
@@ -43,13 +41,12 @@ import org.opendc.faas.service.router.RoutingPolicy
import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
import java.lang.IllegalStateException
-import java.time.Clock
import java.time.Duration
+import java.time.InstantSource
import java.util.ArrayDeque
import java.util.Random
import java.util.UUID
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resumeWithException
/**
@@ -60,19 +57,13 @@ import kotlin.coroutines.resumeWithException
* this component queues the events to await the deployment of new instances.
*/
internal class FaaSServiceImpl(
- context: CoroutineContext,
- private val clock: Clock,
+ dispatcher: Dispatcher,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
private val terminationPolicy: FunctionTerminationPolicy,
quantum: Duration
) : FaaSService, FunctionInstanceListener {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -80,7 +71,12 @@ internal class FaaSServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, quantum.toMillis()) { doSchedule() }
+
+ /**
+ * The [InstantSource] instance representing the clock.
+ */
+ private val clock = dispatcher.timeSource
/**
* The [Random] instance used to generate unique identifiers for the objects.
@@ -266,8 +262,6 @@ internal class FaaSServiceImpl(
}
override fun close() {
- scope.cancel()
-
// Stop all function instances
for ((_, function) in functions) {
function.close()
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt
index 5bd9d4d3..22bf7266 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt
@@ -24,13 +24,15 @@ package org.opendc.faas.service.router
import org.opendc.faas.service.FunctionObject
import org.opendc.faas.service.deployer.FunctionInstance
-import kotlin.random.Random
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
/**
* A [RoutingPolicy] that selects a random function instance.
*/
-public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy {
+public class RandomRoutingPolicy(private val random: RandomGenerator = SplittableRandom(0)) : RoutingPolicy {
override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance {
- return instances.random(random)
+ val idx = random.nextInt(instances.size)
+ return instances.elementAt(idx)
}
}
diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
index 97ffc5a5..9676744b 100644
--- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
@@ -44,7 +44,7 @@ internal class FaaSServiceTest {
@Test
fun testClientState() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -58,7 +58,7 @@ internal class FaaSServiceTest {
@Test
fun testClientInvokeUnknown() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -67,7 +67,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCreation() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -78,7 +78,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionQuery() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -91,7 +91,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindById() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -104,7 +104,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindByName() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -117,7 +117,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDuplicateName() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -128,7 +128,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDelete() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -142,7 +142,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -155,7 +155,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionInvoke() = runSimulation {
val deployer = mockk<FunctionDeployer>()
- val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true))
+ val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true))
every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
index 307ad5a5..47b4d4fa 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
@@ -31,6 +31,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.faas.service.FunctionObject
import org.opendc.faas.service.deployer.FunctionDeployer
import org.opendc.faas.service.deployer.FunctionInstance
@@ -44,10 +46,8 @@ import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.flow2.FlowEngine
-import java.time.Clock
import java.util.ArrayDeque
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
@@ -55,8 +55,7 @@ import kotlin.coroutines.resumeWithException
* A [FunctionDeployer] that uses that simulates the [FunctionInstance]s.
*/
public class SimFunctionDeployer(
- context: CoroutineContext,
- private val clock: Clock,
+ private val dispatcher: Dispatcher,
private val model: MachineModel,
private val delayInjector: DelayInjector,
private val mapper: SimFaaSWorkloadMapper = SimMetaFaaSWorkloadMapper()
@@ -64,7 +63,7 @@ public class SimFunctionDeployer(
/**
* The [CoroutineScope] of this deployer.
*/
- private val scope = CoroutineScope(context + Job())
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + Job())
override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): Instance {
val instance = Instance(function, listener)
@@ -86,7 +85,7 @@ public class SimFunctionDeployer(
* The machine that will execute the workloads.
*/
public val machine: SimMachine = SimBareMetalMachine.create(
- FlowEngine.create(scope.coroutineContext, clock).newGraph(),
+ FlowEngine.create(dispatcher).newGraph(),
model
)
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt
index d3b31bb9..de7b4aa5 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt
@@ -23,13 +23,13 @@
package org.opendc.faas.simulator.delay
import org.opendc.faas.service.deployer.FunctionInstance
-import java.util.Random
+import java.util.random.RandomGenerator
import kotlin.math.abs
/*
* Interface for instance deployment delay estimation.
*/
-public class StochasticDelayInjector(private val model: ColdStartModel, private val random: Random) : DelayInjector {
+public class StochasticDelayInjector(private val model: ColdStartModel, private val random: RandomGenerator) : DelayInjector {
override fun getColdStartDelay(instance: FunctionInstance): Long {
val (mean, sd) = model.coldStartParam(instance.function.memorySize.toInt())
return abs(random.nextGaussian() * sd + mean).toLong()
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index 6baee7ea..be133ded 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -73,13 +73,12 @@ internal class SimFaaSServiceTest {
})
val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random)
- val deployer = SimFunctionDeployer(coroutineContext, clock, machineModel, delayInjector) { workload }
+ val deployer = SimFunctionDeployer(dispatcher, machineModel, delayInjector) { workload }
val service = FaaSService(
- coroutineContext,
- clock,
+ dispatcher,
deployer,
RandomRoutingPolicy(),
- FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000))
+ FunctionTerminationPolicyFixed(dispatcher, timeout = Duration.ofMillis(10000))
)
val client = service.newClient()
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index ec032070..eea46b95 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -74,7 +74,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkBareMetal() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
return@runSimulation machine.runWorkload(trace.createWorkload(0))
@@ -84,7 +84,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkSpaceSharedHypervisor() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1))
@@ -105,7 +105,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkFairShareHypervisorSingle() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
@@ -126,7 +126,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkFairShareHypervisorDouble() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java
index 52d04052..05b40cf8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimPsuFactories.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute;
-import java.time.Clock;
+import java.time.InstantSource;
import org.jetbrains.annotations.NotNull;
import org.opendc.simulator.compute.model.ProcessingUnit;
import org.opendc.simulator.compute.power.CpuPowerModel;
@@ -117,7 +117,7 @@ public class SimPsuFactories {
private final FlowStage stage;
private final OutPort out;
private final CpuPowerModel model;
- private final Clock clock;
+ private final InstantSource clock;
private double targetFreq;
private double totalUsage;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
index 4ebcba71..a1623351 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute.kernel;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -253,7 +253,7 @@ public final class SimHypervisor implements SimWorkload {
private final FlowMultiplexer multiplexer;
private final FlowStage stage;
private final List<ScalingGovernor> scalingGovernors;
- private final Clock clock;
+ private final InstantSource clock;
private final HvCounters counters;
private long lastCounterUpdate;
@@ -526,7 +526,7 @@ public final class SimHypervisor implements SimWorkload {
private final VmInterferenceMember interferenceMember;
private final FlowStage stage;
private final FlowMultiplexer multiplexer;
- private final Clock clock;
+ private final InstantSource clock;
private final List<VCpu> cpus;
private final SimAbstractMachine.Memory memory;
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 2acf6ec7..58b01e06 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -72,7 +72,7 @@ class SimMachineTest {
@Test
fun testFlopsWorkload() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -83,7 +83,7 @@ class SimMachineTest {
machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0))
// Two cores execute 1000 MFlOps per second (1000 ms)
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
@@ -97,7 +97,7 @@ class SimMachineTest {
}
val trace = builder.build()
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
graph,
@@ -107,12 +107,12 @@ class SimMachineTest {
machine.runWorkload(trace.createWorkload(0))
// Two cores execute 1000 MFlOps per second (1000 ms)
- assertEquals(1000000000, clock.millis())
+ assertEquals(1000000000, timeSource.millis())
}
@Test
fun testDualSocketMachine() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val cpuNode = machineModel.cpus[0].node
@@ -128,12 +128,12 @@ class SimMachineTest {
machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0))
// Two sockets with two cores execute 2000 MFlOps per second (500 ms)
- assertEquals(500, clock.millis())
+ assertEquals(500, timeSource.millis())
}
@Test
fun testPower() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
graph,
@@ -156,7 +156,7 @@ class SimMachineTest {
@Test
fun testCapacityClamp() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -184,7 +184,7 @@ class SimMachineTest {
@Test
fun testMemory() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -206,7 +206,7 @@ class SimMachineTest {
@Test
fun testMemoryUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -225,12 +225,12 @@ class SimMachineTest {
override fun snapshot(): SimWorkload = TODO()
})
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testNetUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -253,12 +253,12 @@ class SimMachineTest {
override fun snapshot(): SimWorkload = TODO()
})
- assertEquals(40, clock.millis())
+ assertEquals(40, timeSource.millis())
}
@Test
fun testDiskReadUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -278,12 +278,12 @@ class SimMachineTest {
override fun snapshot(): SimWorkload = TODO()
})
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testDiskWriteUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -303,12 +303,12 @@ class SimMachineTest {
override fun snapshot(): SimWorkload = TODO()
})
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testCancellation() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -325,12 +325,12 @@ class SimMachineTest {
// Ignore
}
- assertEquals(0, clock.millis())
+ assertEquals(0, timeSource.millis())
}
@Test
fun testConcurrentRuns() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -351,7 +351,7 @@ class SimMachineTest {
@Test
fun testCatchStartFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -367,7 +367,7 @@ class SimMachineTest {
@Test
fun testCatchStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -384,7 +384,7 @@ class SimMachineTest {
@Test
fun testCatchShutdownFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -400,7 +400,7 @@ class SimMachineTest {
@Test
fun testCatchNestedFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 79669d40..99f47b2f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -74,7 +74,7 @@ internal class SimFairShareHypervisorTest {
SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
).createWorkload(0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
@@ -93,7 +93,7 @@ internal class SimFairShareHypervisorTest {
{ assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") },
{ assertEquals(880219, hypervisor.counters.cpuIdleTime, "Idle time does not match") },
{ assertEquals(28125, hypervisor.counters.cpuStealTime, "Steal time does not match") },
- { assertEquals(1200000, clock.millis()) { "Current time is correct" } }
+ { assertEquals(1200000, timeSource.millis()) { "Current time is correct" } }
)
}
@@ -118,7 +118,7 @@ internal class SimFairShareHypervisorTest {
SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1)
).createWorkload(0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
@@ -145,7 +145,7 @@ internal class SimFairShareHypervisorTest {
{ assertEquals(329250, hypervisor.counters.cpuActiveTime, "Active time does not match") },
{ assertEquals(870750, hypervisor.counters.cpuIdleTime, "Idle time does not match") },
{ assertEquals(318750, hypervisor.counters.cpuStealTime, "Steal time does not match") },
- { assertEquals(1200000, clock.millis()) }
+ { assertEquals(1200000, timeSource.millis()) }
)
}
@@ -157,7 +157,7 @@ internal class SimFairShareHypervisorTest {
/*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
@@ -184,7 +184,7 @@ internal class SimFairShareHypervisorTest {
.addGroup(setOf("a", "n"), 0.1, 0.8)
.build()
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index d11b91ee..93b67aa3 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -75,7 +75,7 @@ internal class SimSpaceSharedHypervisorTest {
SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
).createWorkload(0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -89,7 +89,7 @@ internal class SimSpaceSharedHypervisorTest {
hypervisor.removeMachine(vm)
machine.cancel()
- assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" }
+ assertEquals(5 * 60L * 4000, timeSource.millis()) { "Took enough time" }
}
/**
@@ -99,7 +99,7 @@ internal class SimSpaceSharedHypervisorTest {
fun testRuntimeWorkload() = runSimulation {
val duration = 5 * 60L * 1000
val workload = SimWorkloads.runtime(duration, 1.0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -113,7 +113,7 @@ internal class SimSpaceSharedHypervisorTest {
machine.cancel()
- assertEquals(duration, clock.millis()) { "Took enough time" }
+ assertEquals(duration, timeSource.millis()) { "Took enough time" }
}
/**
@@ -123,7 +123,7 @@ internal class SimSpaceSharedHypervisorTest {
fun testFlopsWorkload() = runSimulation {
val duration = 5 * 60L * 1000
val workload = SimWorkloads.flops((duration * 3.2).toLong(), 1.0)
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -135,7 +135,7 @@ internal class SimSpaceSharedHypervisorTest {
vm.runWorkload(workload)
machine.cancel()
- assertEquals(duration, clock.millis()) { "Took enough time" }
+ assertEquals(duration, timeSource.millis()) { "Took enough time" }
}
/**
@@ -144,7 +144,7 @@ internal class SimSpaceSharedHypervisorTest {
@Test
fun testTwoWorkloads() = runSimulation {
val duration = 5 * 60L * 1000
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -165,7 +165,7 @@ internal class SimSpaceSharedHypervisorTest {
machine.cancel()
- assertEquals(duration * 2, clock.millis()) { "Took enough time" }
+ assertEquals(duration * 2, timeSource.millis()) { "Took enough time" }
}
/**
@@ -173,7 +173,7 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadFails() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -200,7 +200,7 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadSucceeds() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
index d0b0efaa..08bb6509 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
@@ -59,7 +59,7 @@ class SimChainWorkloadTest {
@Test
fun testMultipleWorkloads() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -75,12 +75,12 @@ class SimChainWorkloadTest {
machine.runWorkload(workload)
- assertEquals(2000, clock.millis())
+ assertEquals(2000, timeSource.millis())
}
@Test
fun testStartFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -100,12 +100,12 @@ class SimChainWorkloadTest {
assertThrows<IllegalStateException> { machine.runWorkload(workload) }
- assertEquals(0, clock.millis())
+ assertEquals(0, timeSource.millis())
}
@Test
fun testStartFailureSecond() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -126,12 +126,12 @@ class SimChainWorkloadTest {
assertThrows<IllegalStateException> { machine.runWorkload(workload) }
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -150,12 +150,12 @@ class SimChainWorkloadTest {
assertThrows<IllegalStateException> { machine.runWorkload(workload) }
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testStopFailureSecond() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -175,12 +175,12 @@ class SimChainWorkloadTest {
assertThrows<IllegalStateException> { machine.runWorkload(workload) }
- assertEquals(2000, clock.millis())
+ assertEquals(2000, timeSource.millis())
}
@Test
fun testStartAndStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -201,12 +201,12 @@ class SimChainWorkloadTest {
val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
assertEquals(2, exc.cause!!.suppressedExceptions.size)
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testShutdownAndStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -227,12 +227,12 @@ class SimChainWorkloadTest {
val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
assertEquals(1, exc.cause!!.suppressedExceptions.size)
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testShutdownAndStartFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -255,12 +255,12 @@ class SimChainWorkloadTest {
val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
assertEquals(1, exc.cause!!.suppressedExceptions.size)
- assertEquals(1000, clock.millis())
+ assertEquals(1000, timeSource.millis())
}
@Test
fun testSnapshot() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -276,10 +276,10 @@ class SimChainWorkloadTest {
job.join()
- assertEquals(2000, clock.millis())
+ assertEquals(2000, timeSource.millis())
machine.runWorkload(snapshot)
- assertEquals(3500, clock.millis())
+ assertEquals(3500, timeSource.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
index e3b6e6c5..5c888fbc 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -53,7 +53,7 @@ class SimTraceWorkloadTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -71,12 +71,12 @@ class SimTraceWorkloadTest {
machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testOffset() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -94,12 +94,12 @@ class SimTraceWorkloadTest {
machine.runWorkload(workload)
- assertEquals(5000, clock.millis())
+ assertEquals(5000, timeSource.millis())
}
@Test
fun testSkipFragment() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -118,12 +118,12 @@ class SimTraceWorkloadTest {
delay(1000L)
machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testZeroCores() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -141,6 +141,6 @@ class SimTraceWorkloadTest {
machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-core/build.gradle.kts b/opendc-simulator/opendc-simulator-core/build.gradle.kts
index 0de96a8e..0ae95d42 100644
--- a/opendc-simulator/opendc-simulator-core/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-core/build.gradle.kts
@@ -28,5 +28,6 @@ plugins {
}
dependencies {
+ api(projects.opendc.opendcCommon)
api(libs.kotlinx.coroutines)
}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationDispatcher.java
index 305bdf5e..8c74aacf 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java
+++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationDispatcher.java
@@ -22,17 +22,17 @@
package org.opendc.simulator;
-import java.time.Clock;
import java.time.Instant;
-import java.time.ZoneId;
-import java.util.concurrent.Executor;
+import java.time.InstantSource;
+import org.opendc.common.Dispatcher;
+import org.opendc.common.DispatcherHandle;
/**
- * A scheduler is used by simulations to manage execution of (future) tasks, providing a controllable (virtual) clock to
- * skip over delays.
+ * A {@link Dispatcher} used by simulations to manage execution of (future) tasks, providing a controllable (virtual)
+ * clock to skip over delays.
*
* <p>
- * The scheduler can be queried to advance the time (via {@link #advanceBy}), run all the scheduled tasks advancing the
+ * The dispatcher can be queried to advance the time (via {@link #advanceBy}), run all the scheduled tasks advancing the
* virtual time as needed (via {@link #advanceUntilIdle}), or run the tasks that are scheduled to run as soon as
* possible but have not yet been dispatched (via {@link #runCurrent}). These methods execute the pending tasks using
* a single thread.
@@ -40,7 +40,7 @@ import java.util.concurrent.Executor;
* <p>
* This class is not thread-safe and must not be used concurrently by multiple threads.
*/
-public final class SimulationScheduler implements Executor {
+public final class SimulationDispatcher implements Dispatcher {
/**
* The {@link TaskQueue} containing the pending tasks.
*/
@@ -57,36 +57,27 @@ public final class SimulationScheduler implements Executor {
private int count = 0;
/**
- * The {@link Clock} instance linked to this scheduler.
+ * The {@link InstantSource} instance linked to this scheduler.
*/
- private final SimulationClock clock = new SimulationClock(this, ZoneId.systemDefault());
+ private final SimulationClock timeSource = new SimulationClock(this);
/**
- * Construct a {@link SimulationScheduler} instance with the specified initial time.
+ * Construct a {@link SimulationDispatcher} instance with the specified initial time.
*
* @param initialTimeMs The initial virtual time of the scheduler in milliseconds since epoch.
*/
- public SimulationScheduler(long initialTimeMs) {
+ public SimulationDispatcher(long initialTimeMs) {
this.currentTime = initialTimeMs;
}
/**
- * Construct a {@link SimulationScheduler} instance with the initial time set to UNIX Epoch 0.
+ * Construct a {@link SimulationDispatcher} instance with the initial time set to UNIX Epoch 0.
*/
- public SimulationScheduler() {
+ public SimulationDispatcher() {
this(0);
}
/**
- * Return the virtual clock associated with this dispatcher.
- *
- * @return A {@link Clock} tracking the virtual time of the dispatcher.
- */
- public Clock getClock() {
- return clock;
- }
-
- /**
* Return the current virtual timestamp of the dispatcher (in milliseconds since epoch).
*
* @return A long value representing the virtual timestamp of the dispatcher in milliseconds since epoch.
@@ -96,37 +87,30 @@ public final class SimulationScheduler implements Executor {
}
/**
- * Schedule a <code>task</code> that executes after the specified <code>delayMs</code>.
+ * Return the virtual time source associated with this dispatcher.
*
- * @param delayMs The time from now until the execution of the task (in milliseconds).
- * @param task The task to execute after the delay.
- * @return The identifier of the task that can be used together with the timestamp of the task to cancel it.
+ * @return A {@link InstantSource} tracking the virtual time of the dispatcher.
*/
- public int schedule(long delayMs, Runnable task) {
- if (delayMs < 0) {
- throw new IllegalArgumentException(
- "Attempted scheduling an event earlier in time (delay " + delayMs + " ms)");
- }
+ @Override
+ public InstantSource getTimeSource() {
+ return timeSource;
+ }
+ @Override
+ public void schedule(long delayMs, Runnable command) {
+ internalSchedule(delayMs, command);
+ }
+
+ @Override
+ public DispatcherHandle scheduleCancellable(long delayMs, Runnable command) {
long target = currentTime + delayMs;
if (target < 0) {
target = Long.MAX_VALUE;
}
- int id = count++;
- queue.add(target, id, task);
- return id;
- }
-
- /**
- * Cancel a pending task.
- *
- * @param deadline The deadline of the task.
- * @param id The identifier of the task (returned by {@link #schedule(long, Runnable)}).
- * @return A boolean indicating whether a task was actually cancelled.
- */
- public boolean cancel(long deadline, int id) {
- return queue.remove(deadline, id);
+ long deadline = target;
+ int id = internalSchedule(delayMs, command);
+ return () -> internalCancel(deadline, id);
}
/**
@@ -198,50 +182,62 @@ public final class SimulationScheduler implements Executor {
}
/**
- * Schedule the specified command to run at this moment of virtual time.
+ * Schedule a <code>task</code> that executes after the specified <code>delayMs</code>.
*
- * @param command The command to execute.
+ * @param delayMs The time from now until the execution of the task (in milliseconds).
+ * @param task The task to execute after the delay.
+ * @return The identifier of the task that can be used together with the timestamp of the task to cancel it.
*/
- @Override
- public void execute(Runnable command) {
- schedule(0, command);
+ private int internalSchedule(long delayMs, Runnable task) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException(
+ "Attempted scheduling an event earlier in time (delay " + delayMs + " ms)");
+ }
+
+ long target = currentTime + delayMs;
+ if (target < 0) {
+ target = Long.MAX_VALUE;
+ }
+
+ int id = count++;
+ queue.add(target, id, task);
+ return id;
}
/**
- * A {@link Clock} implementation for a {@link SimulationScheduler}.
+ * Cancel a pending task.
+ *
+ * @param deadline The deadline of the task.
+ * @param id The identifier of the task (returned by {@link #internalSchedule(long, Runnable)}).
+ * @return A boolean indicating whether a task was actually cancelled.
*/
- private static class SimulationClock extends Clock {
- private final SimulationScheduler scheduler;
- private final ZoneId zone;
-
- SimulationClock(SimulationScheduler scheduler, ZoneId zone) {
- this.scheduler = scheduler;
- this.zone = zone;
- }
+ private boolean internalCancel(long deadline, int id) {
+ return queue.remove(deadline, id);
+ }
- @Override
- public ZoneId getZone() {
- return zone;
- }
+ /**
+ * A {@link InstantSource} implementation for a {@link SimulationDispatcher}.
+ */
+ private static class SimulationClock implements InstantSource {
+ private final SimulationDispatcher dispatcher;
- @Override
- public Clock withZone(ZoneId zoneId) {
- return new SimulationClock(scheduler, zone);
+ SimulationClock(SimulationDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
}
@Override
public Instant instant() {
- return Instant.ofEpochMilli(scheduler.currentTime);
+ return Instant.ofEpochMilli(dispatcher.currentTime);
}
@Override
public long millis() {
- return scheduler.currentTime;
+ return dispatcher.currentTime;
}
@Override
public String toString() {
- return "SimulationClock[time=" + millis() + "ms]";
+ return "SimulationDispatcher.InstantSource[time=" + millis() + "ms]";
}
}
}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt
index 882a0fc5..6e568137 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt
@@ -22,11 +22,18 @@
package org.opendc.simulator.kotlin
+import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
+import kotlinx.coroutines.NonCancellable.children
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
-import org.opendc.simulator.SimulationScheduler
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.opendc.common.DispatcherProvider
+import org.opendc.common.asCoroutineDispatcher
+import org.opendc.simulator.SimulationDispatcher
+import java.time.InstantSource
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@@ -54,16 +61,16 @@ import kotlin.coroutines.EmptyCoroutineContext
* The simulation is run in a single thread, unless other [CoroutineDispatcher] are used for child coroutines.
* Because of this, child coroutines are not executed in parallel to [body].
* In order for the spawned-off asynchronous code to actually be executed, one must either [yield] or suspend the
- * body some other way, or use commands that control scheduling (see [SimulationScheduler]).
+ * body some other way, or use commands that control scheduling (see [SimulationDispatcher]).
*/
@OptIn(ExperimentalCoroutinesApi::class)
public fun runSimulation(
context: CoroutineContext = EmptyCoroutineContext,
- scheduler: SimulationScheduler = SimulationScheduler(),
+ scheduler: SimulationDispatcher = SimulationDispatcher(),
body: suspend SimulationCoroutineScope.() -> Unit
) {
- val (safeContext, dispatcher) = context.checkArguments(scheduler)
- val startingJobs = safeContext.activeJobs()
+ val (safeContext, job, dispatcher) = context.checkArguments(scheduler)
+ val startingJobs = job.activeJobs()
val scope = SimulationCoroutineScope(safeContext)
val deferred = scope.async {
body(scope)
@@ -72,7 +79,7 @@ public fun runSimulation(
deferred.getCompletionExceptionOrNull()?.let {
throw it
}
- val endingJobs = safeContext.activeJobs()
+ val endingJobs = job.activeJobs()
if ((endingJobs - startingJobs).isNotEmpty()) {
throw IllegalStateException("Test finished with active jobs: $endingJobs")
}
@@ -82,24 +89,51 @@ public fun runSimulation(
* Convenience method for calling [runSimulation] on an existing [SimulationCoroutineScope].
*/
public fun SimulationCoroutineScope.runSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit =
- runSimulation(coroutineContext, scheduler, block)
+ runSimulation(coroutineContext, dispatcher, block)
+
+private fun CoroutineContext.checkArguments(scheduler: SimulationDispatcher): Triple<CoroutineContext, Job, SimulationController> {
+ val job = get(Job) ?: SupervisorJob()
+ val dispatcher = get(ContinuationInterceptor) ?: scheduler.asCoroutineDispatcher()
+ val simulationDispatcher = dispatcher.asSimulationDispatcher()
+ return Triple(this + dispatcher + job, job, simulationDispatcher.asController())
+}
+
+private fun Job.activeJobs(): Set<Job> {
+ return children.filter { it.isActive }.toSet()
+}
/**
- * Convenience method for calling [runSimulation] on an existing [SimulationCoroutineDispatcher].
+ * Convert a [ContinuationInterceptor] into a [SimulationDispatcher] if possible.
*/
-public fun SimulationCoroutineDispatcher.runSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit =
- runSimulation(this, scheduler, block)
-
-private fun CoroutineContext.checkArguments(scheduler: SimulationScheduler): Pair<CoroutineContext, SimulationController> {
- val dispatcher = get(ContinuationInterceptor).run {
- this?.let { require(this is SimulationController) { "Dispatcher must implement SimulationController: $this" } }
- this ?: SimulationCoroutineDispatcher(scheduler)
- }
+internal fun ContinuationInterceptor.asSimulationDispatcher(): SimulationDispatcher {
+ val provider = this as? DispatcherProvider ?: throw IllegalArgumentException(
+ "DispatcherProvider such as SimulatorCoroutineDispatcher as the ContinuationInterceptor(Dispatcher) is required"
+ )
- val job = get(Job) ?: SupervisorJob()
- return Pair(this + dispatcher + job, dispatcher as SimulationController)
+ return provider.dispatcher as? SimulationDispatcher ?: throw IllegalArgumentException("Active dispatcher is not a SimulationDispatcher")
}
-private fun CoroutineContext.activeJobs(): Set<Job> {
- return checkNotNull(this[Job]).children.filter { it.isActive }.toSet()
+/**
+ * Helper method to convert a [SimulationDispatcher] into a [SimulationController].
+ */
+internal fun SimulationDispatcher.asController(): SimulationController {
+ return object : SimulationController {
+ override val dispatcher: SimulationDispatcher
+ get() = this@asController
+
+ override val timeSource: InstantSource
+ get() = this@asController.timeSource
+
+ override fun advanceUntilIdle() {
+ dispatcher.advanceUntilIdle()
+ }
+
+ override fun advanceBy(delayMs: Long) {
+ dispatcher.advanceBy(delayMs)
+ }
+
+ override fun runCurrent() {
+ dispatcher.runCurrent()
+ }
+ }
}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt
index f96b2326..f7470ad9 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt
@@ -23,30 +23,48 @@
package org.opendc.simulator.kotlin
import kotlinx.coroutines.CoroutineDispatcher
-import org.opendc.simulator.SimulationScheduler
-import java.time.Clock
+import org.opendc.simulator.SimulationDispatcher
+import java.time.InstantSource
/**
- * Control the virtual clock of a [CoroutineDispatcher].
+ * Interface to control the virtual clock of a [CoroutineDispatcher].
*/
public interface SimulationController {
/**
* The current virtual clock as it is known to this Dispatcher.
*/
- public val clock: Clock
+ public val timeSource: InstantSource
/**
- * The [SimulationScheduler] driving the simulation.
+ * The current virtual timestamp of the dispatcher (in milliseconds since epoch).
*/
- public val scheduler: SimulationScheduler
+ public val currentTime: Long
+ get() = timeSource.millis()
+
+ /**
+ * Return the [SimulationDispatcher] driving the simulation.
+ */
+ public val dispatcher: SimulationDispatcher
/**
* Immediately execute all pending tasks and advance the virtual clock-time to the last delay.
*
* If new tasks are scheduled due to advancing virtual time, they will be executed before `advanceUntilIdle`
* returns.
+ */
+ public fun advanceUntilIdle()
+
+ /**
+ * Move the virtual clock of this dispatcher forward by the specified amount, running the scheduled tasks in the
+ * meantime.
*
- * @return the amount of delay-time that this Dispatcher's clock has been forwarded in milliseconds.
+ * @param delayMs The amount of time to move the virtual clock forward (in milliseconds).
+ * @throws IllegalStateException if passed a negative <code>delay</code>.
+ */
+ public fun advanceBy(delayMs: Long)
+
+ /**
+ * Execute the tasks that are scheduled to execute at this moment of virtual time.
*/
- public fun advanceUntilIdle(): Long
+ public fun runCurrent()
}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt
index 6be8e67a..ca49fc53 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt
@@ -24,7 +24,8 @@ package org.opendc.simulator.kotlin
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
-import org.opendc.simulator.SimulationScheduler
+import org.opendc.common.asCoroutineDispatcher
+import org.opendc.simulator.SimulationDispatcher
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@@ -34,33 +35,28 @@ import kotlin.coroutines.EmptyCoroutineContext
*/
public interface SimulationCoroutineScope : CoroutineScope, SimulationController
-private class SimulationCoroutineScopeImpl(
- override val coroutineContext: CoroutineContext
-) :
- SimulationCoroutineScope,
- SimulationController by coroutineContext.simulationController
-
/**
* A scope which provides detailed control over the execution of coroutines for simulations.
*
* If the provided context does not provide a [ContinuationInterceptor] (Dispatcher) or [CoroutineExceptionHandler], the
- * scope adds [SimulationCoroutineDispatcher] automatically.
+ * scope adds a dispatcher automatically.
*/
-@Suppress("FunctionName")
public fun SimulationCoroutineScope(
context: CoroutineContext = EmptyCoroutineContext,
- scheduler: SimulationScheduler = SimulationScheduler()
+ scheduler: SimulationDispatcher = SimulationDispatcher()
): SimulationCoroutineScope {
var safeContext = context
- if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher(scheduler)
- return SimulationCoroutineScopeImpl(safeContext)
-}
+ val simulationDispatcher: SimulationDispatcher
+ val interceptor = context[ContinuationInterceptor]
-private inline val CoroutineContext.simulationController: SimulationController
- get() {
- val handler = this[ContinuationInterceptor]
- return handler as? SimulationController ?: throw IllegalArgumentException(
- "SimulationCoroutineScope requires a SimulationController such as SimulatorCoroutineDispatcher as " +
- "the ContinuationInterceptor (Dispatcher)"
- )
+ if (interceptor != null) {
+ simulationDispatcher = interceptor.asSimulationDispatcher()
+ } else {
+ simulationDispatcher = scheduler
+ safeContext += scheduler.asCoroutineDispatcher()
}
+
+ return object : SimulationCoroutineScope, SimulationController by simulationDispatcher.asController() {
+ override val coroutineContext: CoroutineContext = safeContext
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationDispatcherTest.kt
index eca3b582..600102be 100644
--- a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt
+++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationDispatcherTest.kt
@@ -28,15 +28,15 @@ import org.junit.jupiter.api.assertThrows
import java.time.Instant
/**
- * Test suite for the [SimulationScheduler] class.
+ * Test suite for the [SimulationDispatcher] class.
*/
-class SimulationSchedulerTest {
+class SimulationDispatcherTest {
/**
- * Test the basic functionality of [SimulationScheduler.runCurrent].
+ * Test the basic functionality of [SimulationDispatcher.runCurrent].
*/
@Test
fun testRunCurrent() {
- val scheduler = SimulationScheduler()
+ val scheduler = SimulationDispatcher()
var count = 0
scheduler.schedule(1) { count += 1 }
@@ -58,11 +58,11 @@ class SimulationSchedulerTest {
}
/**
- * Test the clock of the [SimulationScheduler].
+ * Test the clock of the [SimulationDispatcher].
*/
@Test
fun testClock() {
- val scheduler = SimulationScheduler()
+ val scheduler = SimulationDispatcher()
var count = 0
scheduler.schedule(1) { count += 1 }
@@ -70,8 +70,8 @@ class SimulationSchedulerTest {
scheduler.advanceBy(2)
assertEquals(2, scheduler.currentTime)
- assertEquals(2, scheduler.clock.millis())
- assertEquals(Instant.ofEpochMilli(2), scheduler.clock.instant())
+ assertEquals(2, scheduler.timeSource.millis())
+ assertEquals(Instant.ofEpochMilli(2), scheduler.timeSource.instant())
}
/**
@@ -79,7 +79,7 @@ class SimulationSchedulerTest {
*/
@Test
fun testAdvanceByLargeDelays() {
- val scheduler = SimulationScheduler()
+ val scheduler = SimulationDispatcher()
var count = 0
scheduler.schedule(1) { count += 1 }
@@ -87,10 +87,11 @@ class SimulationSchedulerTest {
scheduler.advanceBy(10)
scheduler.schedule(Long.MAX_VALUE) { count += 1 }
+ scheduler.scheduleCancellable(Long.MAX_VALUE) { count += 1 }
scheduler.schedule(100_000_000) { count += 1 }
scheduler.advanceUntilIdle()
- assertEquals(3, count)
+ assertEquals(4, count)
}
/**
@@ -98,7 +99,7 @@ class SimulationSchedulerTest {
*/
@Test
fun testNegativeDelays() {
- val scheduler = SimulationScheduler()
+ val scheduler = SimulationDispatcher()
assertThrows<IllegalArgumentException> { scheduler.schedule(-100) { } }
assertThrows<IllegalArgumentException> { scheduler.advanceBy(-100) }
diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/kotlin/SimulationBuildersTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/kotlin/SimulationBuildersTest.kt
new file mode 100644
index 00000000..26419a50
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/kotlin/SimulationBuildersTest.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.kotlin
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertFalse
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+
+/**
+ * Test suite for the Kotlin simulation builders.
+ */
+class SimulationBuildersTest {
+ @Test
+ fun testDelay() = runSimulation {
+ assertEquals(0, currentTime)
+ delay(100)
+ assertEquals(100, currentTime)
+ }
+
+ @Test
+ fun testController() = runSimulation {
+ var completed = false
+
+ launch {
+ delay(20)
+ completed = true
+ }
+
+ advanceBy(10)
+ assertFalse(completed)
+ advanceBy(11)
+ assertTrue(completed)
+
+ completed = false
+ launch { completed = true }
+ runCurrent()
+ assertTrue(completed)
+ }
+
+ @Test
+ fun testFailOnActiveJobs() {
+ assertThrows<IllegalStateException> {
+ runSimulation {
+ launch { suspendCancellableCoroutine {} }
+ }
+ }
+ }
+
+ @Test
+ fun testPropagateException() {
+ assertThrows<IllegalStateException> {
+ runSimulation {
+ throw IllegalStateException("Test")
+ }
+ }
+ }
+
+ @Test
+ fun testInvalidDispatcher() {
+ assertThrows<IllegalArgumentException> {
+ runSimulation(Dispatchers.Default) { }
+ }
+ }
+
+ @Test
+ fun testExistingJob() {
+ runSimulation(Job()) {
+ delay(10)
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
index 04d46607..4f04bdc1 100644
--- a/opendc-simulator/opendc-simulator-flow/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
@@ -28,8 +28,8 @@ plugins {
}
dependencies {
- api(libs.kotlinx.coroutines)
- implementation(libs.kotlin.logging)
+ api(projects.opendc.opendcCommon)
+ implementation(libs.slf4j.api)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(libs.slf4j.simple)
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
index fb112082..59dd3bad 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
@@ -60,7 +60,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkSink() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
@@ -71,7 +71,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkForward() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
@@ -85,7 +85,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkMuxMaxMinSingleSource() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
@@ -103,7 +103,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkMuxMaxMinTripleSource() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
index 0ebb0da9..c0f52505 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
@@ -23,11 +23,11 @@
package org.opendc.simulator.flow2;
import java.time.Clock;
+import java.time.InstantSource;
import java.util.ArrayList;
import java.util.List;
-import kotlin.coroutines.ContinuationInterceptor;
import kotlin.coroutines.CoroutineContext;
-import kotlinx.coroutines.Delay;
+import org.opendc.common.Dispatcher;
/**
* A {@link FlowEngine} simulates a generic flow network.
@@ -56,29 +56,25 @@ public final class FlowEngine implements Runnable {
*/
private boolean active;
- private final CoroutineContext coroutineContext;
- private final Clock clock;
- private final Delay delay;
+ private final Dispatcher dispatcher;
+ private final InstantSource clock;
/**
- * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link Clock}.
+ * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}.
*/
- public static FlowEngine create(CoroutineContext coroutineContext, Clock clock) {
- return new FlowEngine(coroutineContext, clock);
+ public static FlowEngine create(Dispatcher dispatcher) {
+ return new FlowEngine(dispatcher);
}
- FlowEngine(CoroutineContext coroutineContext, Clock clock) {
- this.coroutineContext = coroutineContext;
- this.clock = clock;
-
- CoroutineContext.Key<? extends ContinuationInterceptor> key = ContinuationInterceptor.Key;
- this.delay = (Delay) coroutineContext.get(key);
+ FlowEngine(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ this.clock = dispatcher.getTimeSource();
}
/**
* Obtain the (virtual) {@link Clock} driving the simulation.
*/
- public Clock getClock() {
+ public InstantSource getClock() {
return clock;
}
@@ -204,7 +200,7 @@ public final class FlowEngine implements Runnable {
// Only schedule a new scheduler invocation in case the target is earlier than all other pending
// scheduler invocations
if (scheduled.tryAdd(target)) {
- delay.invokeOnTimeout(target - now, this, coroutineContext);
+ dispatcher.schedule(target - now, this);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
index ed5579ea..25f87e04 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow2;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
@@ -67,7 +67,7 @@ public final class FlowStage {
*/
int timerIndex = -1;
- final Clock clock;
+ final InstantSource clock;
private final FlowStageLogic logic;
final FlowGraphInternal parentGraph;
private final FlowEngine engine;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
index fba12aaf..16fed4eb 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow2;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.Objects;
/**
@@ -40,7 +40,7 @@ public final class InPort implements Inlet {
OutPort output;
private InHandler handler = InHandlers.noop();
- private final Clock clock;
+ private final InstantSource clock;
private final String name;
private final FlowStage stage;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
index 332296a0..1f7ed4ee 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow2;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.Objects;
/**
@@ -42,7 +42,7 @@ public final class OutPort implements Outlet {
private OutHandler handler = OutHandlers.noop();
private final String name;
private final FlowStage stage;
- private final Clock clock;
+ private final InstantSource clock;
OutPort(FlowStage stage, String name, int id) {
this.name = name;
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
index 839835ce..467bf334 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
@@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation
class FlowEngineTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val multiplexer = MaxMinFlowMultiplexer(graph)
@@ -55,7 +55,7 @@ class FlowEngineTest {
@Test
fun testConnectInvalidInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val inlet = mockk<Inlet>()
@@ -65,7 +65,7 @@ class FlowEngineTest {
@Test
fun testConnectInvalidOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val outlet = mockk<Outlet>()
@@ -75,7 +75,7 @@ class FlowEngineTest {
@Test
fun testConnectInletBelongsToDifferentGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -87,7 +87,7 @@ class FlowEngineTest {
@Test
fun testConnectOutletBelongsToDifferentGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -99,7 +99,7 @@ class FlowEngineTest {
@Test
fun testConnectInletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 2.0f)
@@ -112,7 +112,7 @@ class FlowEngineTest {
@Test
fun testConnectOutletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sinkA = SimpleFlowSink(graph, 2.0f)
@@ -125,7 +125,7 @@ class FlowEngineTest {
@Test
fun testDisconnectInletInvalid() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val inlet = mockk<Inlet>()
@@ -134,7 +134,7 @@ class FlowEngineTest {
@Test
fun testDisconnectOutletInvalid() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val outlet = mockk<Outlet>()
@@ -143,7 +143,7 @@ class FlowEngineTest {
@Test
fun testDisconnectInletInvalidGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -154,7 +154,7 @@ class FlowEngineTest {
@Test
fun testDisconnectOutletInvalidGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -165,7 +165,7 @@ class FlowEngineTest {
@Test
fun testInletEquality() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sinkA = SimpleFlowSink(graph, 2.0f)
@@ -181,7 +181,7 @@ class FlowEngineTest {
@Test
fun testOutletEquality() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
index a2ed2195..fef49786 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
@@ -39,7 +39,7 @@ class ForwardingFlowMultiplexerTest {
*/
@Test
fun testTrace() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = ForwardingFlowMultiplexer(graph)
@@ -60,7 +60,7 @@ class ForwardingFlowMultiplexerTest {
advanceUntilIdle()
assertAll(
- { assertEquals(4000, clock.millis()) { "Took enough time" } }
+ { assertEquals(4000, timeSource.millis()) { "Took enough time" } }
)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
index ba339ee3..ebae2d4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
class MaxMinFlowMultiplexerTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
@@ -49,6 +49,6 @@ class MaxMinFlowMultiplexerTest {
advanceUntilIdle()
- assertEquals(500, clock.millis())
+ assertEquals(500, timeSource.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
index a75efba3..ea516c63 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
@@ -37,7 +37,7 @@ import java.util.concurrent.ThreadLocalRandom
class FlowSinkTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -46,12 +46,12 @@ class FlowSinkTest {
graph.connect(source.output, sink.input)
advanceUntilIdle()
- assertEquals(2000, clock.millis())
+ assertEquals(2000, timeSource.millis())
}
@Test
fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -64,12 +64,12 @@ class FlowSinkTest {
advanceUntilIdle()
- assertEquals(3000, clock.millis())
+ assertEquals(3000, timeSource.millis())
}
@Test
fun testUtilization() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -78,12 +78,12 @@ class FlowSinkTest {
graph.connect(source.output, sink.input)
advanceUntilIdle()
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testFragments() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -100,7 +100,7 @@ class FlowSinkTest {
graph.connect(source.output, sink.input)
advanceUntilIdle()
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
@@ -114,7 +114,7 @@ class FlowSinkTest {
)
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
index 8b4ebb89..181d9a20 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
@@ -43,7 +43,7 @@ import org.opendc.simulator.kotlin.runSimulation
class SimNetworkSinkTest {
@Test
fun testInitialState() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
@@ -56,7 +56,7 @@ class SimNetworkSinkTest {
@Test
fun testDisconnectIdempotent() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
@@ -66,7 +66,7 @@ class SimNetworkSinkTest {
@Test
fun testConnectCircular() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
@@ -77,7 +77,7 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnectedTarget() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = mockk<SimNetworkPort>(relaxUnitFun = true)
@@ -90,7 +90,7 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source1 = TestSource(graph)
@@ -107,7 +107,7 @@ class SimNetworkSinkTest {
@Test
fun testConnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = TestSource(graph)
@@ -127,7 +127,7 @@ class SimNetworkSinkTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = TestSource(graph)
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
index 1507c4a1..4a489478 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
@@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation
class SimNetworkSwitchVirtualTest {
@Test
fun testConnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = TestSource(graph)
@@ -60,7 +60,7 @@ class SimNetworkSwitchVirtualTest {
@Test
fun testConnectClosedPort() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val switch = SimNetworkSwitchVirtual(graph)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
index 6adb0548..f596ca4e 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
internal class SimPduTest {
@Test
fun testZeroOutlets() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val pdu = SimPdu(graph)
@@ -48,7 +48,7 @@ internal class SimPduTest {
@Test
fun testSingleOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val pdu = SimPdu(graph)
@@ -62,7 +62,7 @@ internal class SimPduTest {
@Test
fun testDoubleOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 200.0f)
val pdu = SimPdu(graph)
@@ -78,7 +78,7 @@ internal class SimPduTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 300.0f)
val pdu = SimPdu(graph)
@@ -95,7 +95,7 @@ internal class SimPduTest {
@Test
fun testLoss() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 500.0f)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
@@ -110,7 +110,7 @@ internal class SimPduTest {
@Test
fun testOutletClose() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val pdu = SimPdu(graph)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
index 03b8182c..03c942b4 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
@@ -42,7 +42,7 @@ import org.opendc.simulator.kotlin.runSimulation
internal class SimPowerSourceTest {
@Test
fun testInitialState() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
@@ -57,7 +57,7 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnectIdempotent() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
@@ -67,7 +67,7 @@ internal class SimPowerSourceTest {
@Test
fun testConnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = TestInlet(graph)
@@ -87,7 +87,7 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = TestInlet(graph)
@@ -102,7 +102,7 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnectAssertion() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
@@ -120,7 +120,7 @@ internal class SimPowerSourceTest {
@Test
fun testOutletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = TestInlet(graph)
@@ -135,7 +135,7 @@ internal class SimPowerSourceTest {
@Test
fun testInletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = mockk<SimPowerInlet>(relaxUnitFun = true)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
index 0dd7bb05..89fede63 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
internal class SimUpsTest {
@Test
fun testSingleInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 200.0f)
val ups = SimUps(graph)
@@ -49,7 +49,7 @@ internal class SimUpsTest {
@Test
fun testDoubleInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source1 = SimPowerSource(graph, /*capacity*/ 200.0f)
val source2 = SimPowerSource(graph, /*capacity*/ 200.0f)
@@ -69,7 +69,7 @@ internal class SimUpsTest {
@Test
fun testLoss() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 500.0f)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
@@ -84,7 +84,7 @@ internal class SimUpsTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source1 = SimPowerSource(graph, /*capacity*/ 200.0f)
val source2 = SimPowerSource(graph, /*capacity*/ 200.0f)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt
index 7ba12ed9..d5f509e7 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/TestInlet.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.power
-import io.mockk.spyk
import org.opendc.simulator.flow2.FlowGraph
import org.opendc.simulator.flow2.FlowStage
import org.opendc.simulator.flow2.FlowStageLogic
@@ -32,8 +31,7 @@ import org.opendc.simulator.flow2.Outlet
* A test inlet.
*/
class TestInlet(graph: FlowGraph) : SimPowerInlet(), FlowStageLogic {
- val logic = spyk(this)
- private val stage = graph.newStage(logic)
+ private val stage = graph.newStage(this)
val flowOutlet = stage.getOutlet("out")
init {
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 3aac2630..86c1c521 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -260,7 +260,7 @@ public class OpenDCRunner(
val scenario = scenario
- Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(
serviceDomain,
@@ -285,7 +285,7 @@ public class OpenDCRunner(
}
// Run workload trace
- service.replay(clock, vms, seed, failureModel = failureModel, interference = phenomena.interference)
+ service.replay(timeSource, vms, seed, failureModel = failureModel, interference = phenomena.interference)
val serviceMetrics = service.getSchedulerStats()
logger.debug {
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index 2436c387..07b43b6d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,6 +22,7 @@
package org.opendc.workflow.service
+import org.opendc.common.Dispatcher
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -30,9 +31,7 @@ import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* A service for cloud workflow execution.
@@ -59,9 +58,7 @@ public interface WorkflowService : AutoCloseable {
/**
* Construct a new [WorkflowService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
- * @param meterProvider The meter provider to use.
+ * @param dispatcher A [Dispatcher] to schedule future events.
* @param compute The "Compute" client to use.
* @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles).
* @param jobAdmissionPolicy The job admission policy to use.
@@ -70,8 +67,7 @@ public interface WorkflowService : AutoCloseable {
* @param taskOrderPolicy The task order policy to use.
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
compute: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -80,8 +76,7 @@ public interface WorkflowService : AutoCloseable {
taskOrderPolicy: TaskOrderPolicy
): WorkflowService {
return WorkflowServiceImpl(
- context,
- clock,
+ dispatcher,
compute,
schedulingQuantum,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index b1780896..01c1f565 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -26,6 +26,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Image
@@ -40,11 +42,10 @@ import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
+import java.time.InstantSource
import java.util.PriorityQueue
import java.util.Queue
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
/**
@@ -52,8 +53,7 @@ import kotlin.coroutines.resume
* Datacenter Scheduling.
*/
public class WorkflowServiceImpl(
- context: CoroutineContext,
- private val clock: Clock,
+ dispatcher: Dispatcher,
private val computeClient: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -64,7 +64,12 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- private val scope = CoroutineScope(context + kotlinx.coroutines.Job())
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + kotlinx.coroutines.Job())
+
+ /**
+ * The [InstantSource] representing the clock of this service.
+ */
+ private val clock = dispatcher.timeSource
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -149,7 +154,7 @@ public class WorkflowServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() }
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index b165418a..e5e05a92 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -70,7 +70,7 @@ internal class WorkflowServiceTest {
val computeService = "compute.opendc.org"
val workflowService = "workflow.opendc.org"
- Provisioner(coroutineContext, clock, seed = 0L).use { provisioner ->
+ Provisioner(dispatcher, seed = 0L).use { provisioner ->
val scheduler: (ProvisioningContext) -> ComputeScheduler = {
FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
@@ -103,7 +103,7 @@ internal class WorkflowServiceTest {
Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
format = "gwf"
)
- service.replay(clock, trace.toJobs())
+ service.replay(timeSource, trace.toJobs())
val metrics = service.getSchedulerStats()
@@ -119,7 +119,7 @@ internal class WorkflowServiceTest {
},
{ assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
{ assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(45977707L, clock.millis()) { "Total duration incorrect" } }
+ { assertEquals(45977707L, timeSource.millis()) { "Total duration incorrect" } }
)
}
}
diff --git a/site/docs/advanced-guides/toolchain.md b/site/docs/advanced-guides/toolchain.md
index 36efece7..a1735767 100644
--- a/site/docs/advanced-guides/toolchain.md
+++ b/site/docs/advanced-guides/toolchain.md
@@ -22,7 +22,7 @@ Follow the steps below to get it all set up!
## 1. Installing Java
-OpenDC requires a Java installation of version 11 or higher. Make sure to install
+OpenDC requires a Java installation of version 17 or higher. Make sure to install
the [JDK](https://www.oracle.com/technetwork/java/javase/downloads/index.html), not only the JRE (the JDK also includes
a JRE).
diff --git a/site/docs/getting-started/0-installation.md b/site/docs/getting-started/0-installation.md
index 2747c344..9b9b0e82 100644
--- a/site/docs/getting-started/0-installation.md
+++ b/site/docs/getting-started/0-installation.md
@@ -14,7 +14,7 @@ quicker.
1. **Supported Platforms**
OpenDC is actively tested on Windows, macOS and GNU/Linux.
2. **Required Software**
- A Java installation of version 11 or higher is required for OpenDC. You may download the
+ A Java installation of version 17 or higher is required for OpenDC. You may download the
[Java distribution from Oracle](https://www.oracle.com/java/technologies/downloads/) or use the distribution provided
by your package manager.