summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-01-24 13:54:59 +0100
committerGitHub <noreply@github.com>2025-01-24 13:54:59 +0100
commitbe9698483f8e7891b5c2d562eaeac9dd3edbf9d8 (patch)
tree60b27e2ff80f76c5aa7736ca64f2ae0580348930
parentbb945c2fdd7b20898e3dfccbac7da2a427418216 (diff)
Added Fragment scaling (#296)
* Added maxCpuDemand to TraceWorkload, don't know if this will be needed so might remove later. Updated SimTraceWorkload to properly handle creating checkpoints Fixed a bug with the updatedConsumers in the FlowDistributor Implemented a first version of scaling the runtime of fragments. * small update * updated tests to reflect the changes in the checkpointing model * Updated the checkpointing tests to reflect the changes made * updated wrapper-validation-action * Applied spotless
-rw-r--r--.github/workflows/build.yml2
-rw-r--r--.github/workflows/publish.yml2
-rw-r--r--.github/workflows/release.yml2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt19
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt23
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt4
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/CarbonTest.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt157
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt337
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java (renamed from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java)120
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java (renamed from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java)2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java (renamed from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java)63
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java47
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java47
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java59
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java14
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java35
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java4
26 files changed, 810 insertions, 163 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 807c6d26..71fb15a8 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -22,7 +22,7 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4
- name: Validate Gradle wrapper
- uses: gradle/wrapper-validation-action@v1
+ uses: gradle/wrapper-validation-action@v3
- name: Set up JDK
uses: actions/setup-java@v4
with:
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index e2b09550..660b7899 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -90,7 +90,7 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4
- name: Validate Gradle wrapper
- uses: gradle/wrapper-validation-action@v1
+ uses: gradle/wrapper-validation-action@v3
- name: Set up JDK
uses: actions/setup-java@v3
with:
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 5b3357f4..3079d25a 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -13,7 +13,7 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4
- name: Validate Gradle wrapper
- uses: gradle/wrapper-validation-action@v1
+ uses: gradle/wrapper-validation-action@v3
- name: Set up JDK
uses: actions/setup-java@v3
with:
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index ee2cb319..7f5f09eb 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -31,8 +31,9 @@ import org.opendc.compute.simulator.telemetry.GuestSystemStats
import org.opendc.simulator.compute.machine.SimMachine
import org.opendc.simulator.compute.machine.VirtualMachine
import org.opendc.simulator.compute.workload.ChainWorkload
-import org.opendc.simulator.compute.workload.TraceFragment
-import org.opendc.simulator.compute.workload.TraceWorkload
+import org.opendc.simulator.compute.workload.trace.TraceFragment
+import org.opendc.simulator.compute.workload.trace.TraceWorkload
+import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling
import java.time.Duration
import java.time.Instant
import java.time.InstantSource
@@ -93,6 +94,8 @@ public class Guest(
onStart()
+ val scalingPolicy = NoDelayScaling()
+
val bootworkload =
TraceWorkload(
ArrayList(
@@ -107,6 +110,7 @@ public class Guest(
0,
0,
0.0,
+ scalingPolicy,
)
if (task.workload is TraceWorkload) {
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 655bacb9..2b8b589f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -23,7 +23,9 @@
package org.opendc.compute.workload
import mu.KotlinLogging
-import org.opendc.simulator.compute.workload.TraceWorkload
+import org.opendc.simulator.compute.workload.trace.TraceWorkload
+import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling
+import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy
import org.opendc.trace.Trace
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
@@ -53,6 +55,7 @@ public class ComputeWorkloadLoader(
private val checkpointInterval: Long = 0L,
private val checkpointDuration: Long = 0L,
private val checkpointIntervalScaling: Double = 1.0,
+ private val scalingPolicy: ScalingPolicy = NoDelayScaling(),
) : WorkloadLoader(subMissionTime) {
/**
* The logger for this instance.
@@ -84,7 +87,10 @@ public class ComputeWorkloadLoader(
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)
- val builder = fragments.computeIfAbsent(id) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling) }
+ val builder =
+ fragments.computeIfAbsent(
+ id,
+ ) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy) }
builder.add(durationMs, cpuUsage, cores)
}
@@ -178,7 +184,12 @@ public class ComputeWorkloadLoader(
/**
* A builder for a VM trace.
*/
- private class Builder(checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double) {
+ private class Builder(
+ checkpointInterval: Long,
+ checkpointDuration: Long,
+ checkpointIntervalScaling: Double,
+ scalingPolicy: ScalingPolicy,
+ ) {
/**
* The total load of the trace.
*/
@@ -187,7 +198,7 @@ public class ComputeWorkloadLoader(
/**
* The internal builder for the trace.
*/
- private val builder = TraceWorkload.builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling)
+ private val builder = TraceWorkload.builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy)
/**
* Add a fragment to the trace.
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
index 60be9299..7a5089b9 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
@@ -22,7 +22,7 @@
package org.opendc.compute.workload
-import org.opendc.simulator.compute.workload.TraceWorkload
+import org.opendc.simulator.compute.workload.trace.TraceWorkload
import java.time.Instant
import java.util.UUID
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt
index 87c7abe9..cf40d88d 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt
@@ -25,6 +25,9 @@ package org.opendc.experiments.base.experiment.specs
import kotlinx.serialization.Serializable
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.WorkloadLoader
+import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling
+import org.opendc.simulator.compute.workload.trace.scaling.PerfectScaling
+import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy
import java.io.File
/**
@@ -41,6 +44,7 @@ public data class WorkloadSpec(
val type: WorkloadTypes,
val sampleFraction: Double = 1.0,
val submissionTime: String? = null,
+ val scalingPolicy: ScalingPolicyEnum = ScalingPolicyEnum.NoDelay,
) {
public val name: String = File(pathToFile).nameWithoutExtension
@@ -56,11 +60,6 @@ public data class WorkloadSpec(
* @constructor Create empty Workload types
*/
public enum class WorkloadTypes {
- /**
- * Compute workload
- *
- * @constructor Create empty Compute workload
- */
ComputeWorkload,
}
@@ -74,6 +73,7 @@ public fun getWorkloadLoader(
checkpointInterval: Long,
checkpointDuration: Long,
checkpointIntervalScaling: Double,
+ scalingPolicy: ScalingPolicy,
): WorkloadLoader {
return when (type) {
WorkloadTypes.ComputeWorkload ->
@@ -83,6 +83,19 @@ public fun getWorkloadLoader(
checkpointInterval,
checkpointDuration,
checkpointIntervalScaling,
+ scalingPolicy,
)
}
}
+
+public enum class ScalingPolicyEnum {
+ NoDelay,
+ Perfect,
+}
+
+public fun getScalingPolicy(scalingPolicyEnum: ScalingPolicyEnum): ScalingPolicy {
+ return when (scalingPolicyEnum) {
+ ScalingPolicyEnum.NoDelay -> NoDelayScaling()
+ ScalingPolicyEnum.Perfect -> PerfectScaling()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index c9c2729d..56278bf2 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -33,6 +33,7 @@ import org.opendc.compute.simulator.service.ComputeService
import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor
import org.opendc.compute.topology.clusterTopology
import org.opendc.experiments.base.experiment.Scenario
+import org.opendc.experiments.base.experiment.specs.getScalingPolicy
import org.opendc.experiments.base.experiment.specs.getWorkloadLoader
import org.opendc.simulator.kotlin.runSimulation
import java.io.File
@@ -80,6 +81,8 @@ public fun runScenario(
val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L
val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0
+ val scalingPolicy = getScalingPolicy(scenario.workloadSpec.scalingPolicy)
+
val workloadLoader =
getWorkloadLoader(
scenario.workloadSpec.type,
@@ -88,6 +91,7 @@ public fun runScenario(
checkpointInterval,
checkpointDuration,
checkpointIntervalScaling,
+ scalingPolicy,
)
val workload = workloadLoader.sampleByLoad(scenario.workloadSpec.sampleFraction)
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/CarbonTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/CarbonTest.kt
index 091f506a..895eee92 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/CarbonTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/CarbonTest.kt
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.workload.Task
-import org.opendc.simulator.compute.workload.TraceFragment
+import org.opendc.simulator.compute.workload.trace.TraceFragment
import java.util.ArrayList
/**
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
index 56850558..e271fce7 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.workload.Task
-import org.opendc.simulator.compute.workload.TraceFragment
+import org.opendc.simulator.compute.workload.trace.TraceFragment
import java.util.ArrayList
/**
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt
index 90737ab6..3231f533 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt
@@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.workload.Task
import org.opendc.experiments.base.experiment.specs.TraceBasedFailureModelSpec
-import org.opendc.simulator.compute.workload.TraceFragment
+import org.opendc.simulator.compute.workload.trace.TraceFragment
import java.util.ArrayList
/**
@@ -225,15 +225,12 @@ class FailuresAndCheckpointingTest {
}
/**
- * Failure test 1: Single Task with checkpointing
+ * Checkpointing test 1: Single Task with checkpointing
* In this test, a single task is scheduled that is interrupted by a failure after 5 min.
- * Because there is no checkpointing, the full task has to be rerun.
- *
- * This means the final runtime is 20 minutes
+ * The system is using checkpointing, taking snapshots every minute.
*
- * When the task is running, it is using 50% of the cpu.
- * This means that half of the time is active, and half is idle.
- * When the task is failed, all time is idle.
+ * This means that after failure, only 6 minutes of the task is left.
+ * However, taking a snapshot takes 1 second, which means 9 seconds have to be added to the total runtime.
*/
@Test
fun testCheckpoints1() {
@@ -256,22 +253,20 @@ class FailuresAndCheckpointingTest {
assertAll(
{ assertEquals((10 * 60000) + (9 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } },
- { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
- { assertEquals(((10 * 30000) + (9 * 1000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
- { assertEquals((10 * 60 * 150.0) + (9 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ { assertEquals((10 * 60 * 150.0) + (9 * 150.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
)
}
/**
- * Failure test 2: Single Task with scaling checkpointing
+ * Checkpointing test 2: Single Task with checkpointing, higher cpu demand
* In this test, a single task is scheduled that is interrupted by a failure after 5 min.
- * Because there is no checkpointing, the full task has to be rerun.
+ * The system is using checkpointing, taking snapshots every minute.
*
- * This means the final runtime is 20 minutes
+ * This means that after failure, only 16 minutes of the task is left.
+ * However, taking a snapshot takes 1 second, which means 19 seconds have to be added to the total runtime.
*
- * When the task is running, it is using 50% of the cpu.
- * This means that half of the time is active, and half is idle.
- * When the task is failed, all time is idle.
+ * This is similar to the previous test, but the cpu demand of taking a snapshot is higher.
+ * The cpu demand of taking a snapshot is as high as the highest fragment
*/
@Test
fun testCheckpoints2() {
@@ -281,6 +276,86 @@ class FailuresAndCheckpointingTest {
name = "0",
fragments =
arrayListOf(
+ TraceFragment(10 * 60 * 1000, 2000.0, 1),
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ checkpointInterval = 60 * 1000L,
+ checkpointDuration = 1000L,
+ ),
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals((20 * 60000) + (19 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } },
+ {
+ assertEquals(
+ (10 * 60 * 200.0) + (10 * 60 * 150.0) + (19 * 200.0),
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect energy usage" }
+ },
+ )
+ }
+
+ /**
+ * Checkpointing test 3: Single Task with checkpointing, higher cpu demand
+ * In this test, a single task is scheduled that is interrupted by a failure after 5 min.
+ * The system is using checkpointing, taking snapshots every minute.
+ *
+ * This means that after failure, only 16 minutes of the task is left.
+ * However, taking a snapshot takes 1 second, which means 19 seconds have to be added to the total runtime.
+ *
+ * This is similar to the previous test, but the fragments are reversed
+ *
+ */
+ @Test
+ fun testCheckpoints3() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ TraceFragment(10 * 60 * 1000, 2000.0, 1),
+ ),
+ checkpointInterval = 60 * 1000L,
+ checkpointDuration = 1000L,
+ ),
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals((20 * 60000) + (19 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } },
+ {
+ assertEquals(
+ (10 * 60 * 200.0) + (10 * 60 * 150.0) + (19 * 200.0),
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect energy usage" }
+ },
+ )
+ }
+
+ /**
+ * Checkpointing test 4: Single Task with scaling checkpointing
+ * In this test, checkpointing is used, with a scaling factor of 1.5
+ *
+ * This means that the interval between checkpoints starts at 1 min, but is multiplied by 1.5 every snapshot.
+ *
+ */
+ @Test
+ fun testCheckpoints4() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
TraceFragment(10 * 60 * 1000, 1000.0, 1),
),
checkpointInterval = 60 * 1000L,
@@ -295,20 +370,18 @@ class FailuresAndCheckpointingTest {
assertAll(
{ assertEquals((10 * 60000) + (4 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } },
- { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
- { assertEquals(((10 * 30000) + (4 * 1000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
- { assertEquals((10 * 60 * 150.0) + (4 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ { assertEquals((10 * 60 * 150.0) + (4 * 150.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
)
}
/**
- * Checkpoint test 3: Single Task, single failure with checkpointing
+ * Checkpointing test 5: Single Task, single failure with checkpointing
* In this test, a single task is scheduled that is interrupted by a failure after 5 min.
* Because there is no checkpointing, the full task has to be rerun.
*
*/
@Test
- fun testCheckpoints3() {
+ fun testCheckpoints5() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -336,23 +409,7 @@ class FailuresAndCheckpointingTest {
{ assertEquals((960 * 1000) + 5000, monitor.maxTimestamp) { "Total runtime incorrect" } },
{
assertEquals(
- ((300 * 1000) + (296 * 500) + (360 * 500)).toLong(),
- monitor.hostIdleTimes["H01"]?.sum(),
- ) { "Idle time incorrect" }
- },
- {
- assertEquals(
- ((296 * 500) + 4000 + (360 * 500) + 5000).toLong(),
- monitor.hostActiveTimes["H01"]?.sum(),
- ) { "Active time incorrect" }
- },
- { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
- { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } },
- { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } },
- {
- assertEquals(
- (296 * 150.0) + (4 * 200.0) + (300 * 100.0) +
- (360 * 150.0) + (5 * 200.0),
+ (665 * 150.0) + (300 * 100.0),
monitor.hostEnergyUsages["H01"]?.sum(),
) { "Incorrect energy usage" }
},
@@ -360,13 +417,13 @@ class FailuresAndCheckpointingTest {
}
/**
- * Checkpoint test 4: Single Task, repeated failure with checkpointing
+ * Checkpointing test 6: Single Task, repeated failure with checkpointing
* In this test, a single task is scheduled that is interrupted by a failure after 5 min.
* Because there is no checkpointing, the full task has to be rerun.
*
*/
@Test
- fun testCheckpoints4() {
+ fun testCheckpoints6() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -394,23 +451,7 @@ class FailuresAndCheckpointingTest {
{ assertEquals((22 * 60000) + 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
{
assertEquals(
- ((10 * 60000) + (2 * 296 * 500) + (120 * 500)).toLong(),
- monitor.hostIdleTimes["H01"]?.sum(),
- ) { "Idle time incorrect" }
- },
- {
- assertEquals(
- ((2 * 296 * 500) + 8000 + (120 * 500) + 1000).toLong(),
- monitor.hostActiveTimes["H01"]?.sum(),
- ) { "Active time incorrect" }
- },
- { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
- { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } },
- { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } },
- {
- assertEquals(
- (2 * 296 * 150.0) + (8 * 200.0) + (600 * 100.0) +
- (120 * 150.0) + (200.0),
+ (300 * 150.0) + (300 * 100.0) + (300 * 150.0) + (300 * 100.0) + (121 * 150.0),
monitor.hostEnergyUsages["H01"]?.sum(),
) { "Incorrect energy usage" }
},
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
index 4a7c9341..3d733360 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.workload.Task
-import org.opendc.simulator.compute.workload.TraceFragment
+import org.opendc.simulator.compute.workload.trace.TraceFragment
import java.util.ArrayList
/**
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt
new file mode 100644
index 00000000..b0aa3555
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt
@@ -0,0 +1,337 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.base
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.compute.workload.Task
+import org.opendc.simulator.compute.workload.trace.TraceFragment
+import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling
+import org.opendc.simulator.compute.workload.trace.scaling.PerfectScaling
+import java.util.ArrayList
+
+/**
+ * Testing suite containing tests that specifically test the scaling of trace fragments
+ */
+class FragmentScalingTest {
+ /**
+ * Scaling test 1: A single fitting task
+ * In this test, a single task is scheduled that should fit the system.
+ * This means nothing will be delayed regardless of the scaling policy
+ */
+ @Test
+ fun testScaling1() {
+ val workloadNoDelay: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 2000.0, 1),
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ scalingPolicy = NoDelayScaling(),
+ ),
+ )
+
+ val workloadPerfect: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 2000.0, 1),
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ scalingPolicy = PerfectScaling(),
+ ),
+ )
+ val topology = createTopology("single_1_2000.json")
+
+ val monitorNoDelay = runTest(topology, workloadNoDelay)
+ val monitorPerfect = runTest(topology, workloadPerfect)
+
+ assertAll(
+ { assertEquals(1200000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } },
+ { assertEquals(1200000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } },
+ { assertEquals(2000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } },
+ )
+ }
+
+ /**
+ * Scaling test 2: A single task with a single non-fitting fragment
+ * In this test, a single task is scheduled that should not fit.
+ * This means the Task is getting only 2000 Mhz while it was demanding 4000 Mhz
+ *
+ * For the NoDelay scaling policy, the task should take the planned 10 min.
+ * For the Perfect scaling policy, the task should be slowed down by 50% resulting in a runtime of 20 min.
+ */
+ @Test
+ fun testScaling2() {
+ val workloadNoDelay: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 4000.0, 1),
+ ),
+ scalingPolicy = NoDelayScaling(),
+ ),
+ )
+
+ val workloadPerfect: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 4000.0, 1),
+ ),
+ scalingPolicy = PerfectScaling(),
+ ),
+ )
+ val topology = createTopology("single_1_2000.json")
+
+ val monitorNoDelay = runTest(topology, workloadNoDelay)
+ val monitorPerfect = runTest(topology, workloadPerfect)
+
+ assertAll(
+ { assertEquals(600000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } },
+ { assertEquals(1200000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } },
+ { assertEquals(4000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+ )
+ }
+
+ /**
+ * Scaling test 3: A single task that switches between fitting and not fitting
+ * In this test, a single task is scheduled has one fragment that does not fit
+ * This means the second fragment is getting only 2000 Mhz while it was demanding 4000 Mhz
+ *
+ * For the NoDelay scaling policy, the task should take the planned 30 min.
+ * For the Perfect scaling policy, the second fragment should be slowed down by 50% resulting in a runtime of 20 min,
+ * and a total runtime of 40 min.
+ */
+ @Test
+ fun testScaling3() {
+ val workloadNoDelay: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ TraceFragment(10 * 60 * 1000, 4000.0, 1),
+ TraceFragment(10 * 60 * 1000, 1500.0, 1),
+ ),
+ scalingPolicy = NoDelayScaling(),
+ ),
+ )
+
+ val workloadPerfect: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ TraceFragment(10 * 60 * 1000, 4000.0, 1),
+ TraceFragment(10 * 60 * 1000, 1500.0, 1),
+ ),
+ scalingPolicy = PerfectScaling(),
+ ),
+ )
+ val topology = createTopology("single_1_2000.json")
+
+ val monitorNoDelay = runTest(topology, workloadNoDelay)
+ val monitorPerfect = runTest(topology, workloadPerfect)
+
+ assertAll(
+ { assertEquals(1800000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } },
+ { assertEquals(2400000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } },
+ { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(4000.0, monitorNoDelay.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(1500.0, monitorNoDelay.taskCpuDemands["0"]?.get(19)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(19)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(1500.0, monitorNoDelay.taskCpuSupplied["0"]?.get(19)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(19)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(1500.0, monitorPerfect.taskCpuDemands["0"]?.get(29)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(1500.0, monitorPerfect.taskCpuSupplied["0"]?.get(29)) { "The cpu supplied to task 0 is incorrect" } },
+ )
+ }
+
+ /**
+ * Scaling test 4: Two tasks, that both fit
+ * In this test, two tasks are scheduled that both fit
+ *
+ * For both scaling policies, the tasks should run without delay.
+ */
+ @Test
+ fun testScaling4() {
+ val workloadNoDelay: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ scalingPolicy = NoDelayScaling(),
+ ),
+ createTestTask(
+ name = "1",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 3000.0, 1),
+ ),
+ scalingPolicy = NoDelayScaling(),
+ ),
+ )
+
+ val workloadPerfect: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ scalingPolicy = PerfectScaling(),
+ ),
+ createTestTask(
+ name = "1",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 3000.0, 1),
+ ),
+ scalingPolicy = PerfectScaling(),
+ ),
+ )
+ val topology = createTopology("single_2_2000.json")
+
+ val monitorNoDelay = runTest(topology, workloadNoDelay)
+ val monitorPerfect = runTest(topology, workloadPerfect)
+
+ assertAll(
+ { assertEquals(600000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } },
+ { assertEquals(600000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } },
+ { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(3000.0, monitorNoDelay.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(3000.0, monitorPerfect.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } },
+ { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(3000.0, monitorNoDelay.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } },
+ { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+ { assertEquals(3000.0, monitorPerfect.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } },
+ )
+ }
+
+ /**
+ * Scaling test 5: Two tasks, that don't fit together
+ * In this test, two tasks are scheduled that do not fit together
+ * This means the Task_1 is getting only 2000 Mhz while it was demanding 4000 Mhz
+ *
+ * For the NoDelay scaling policy, the tasks should complete in 10 min
+ * For the Perfect scaling policy, task_1 is delayed while task_0 is still going.
+ * In the first 10 min (while Task_0 is still running), Task_1 is running at 50%.
+ * This means that after Task_0 is done, Task_1 still needs to run for 5 minutes, making the total runtime 15 min.
+ */
+ @Test
+ fun testScaling5() {
+ val workloadNoDelay: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 2000.0, 1),
+ ),
+ scalingPolicy = NoDelayScaling(),
+ ),
+ createTestTask(
+ name = "1",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 4000.0, 1),
+ ),
+ scalingPolicy = NoDelayScaling(),
+ ),
+ )
+
+ val workloadPerfect: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 2000.0, 1),
+ ),
+ scalingPolicy = PerfectScaling(),
+ ),
+ createTestTask(
+ name = "1",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 4000.0, 1),
+ ),
+ scalingPolicy = PerfectScaling(),
+ ),
+ )
+ val topology = createTopology("single_2_2000.json")
+
+// val monitorNoDelay = runTest(topology, workloadNoDelay)
+ val monitorPerfect = runTest(topology, workloadPerfect)
+
+// assertAll(
+// { assertEquals(600000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } },
+// { assertEquals(900000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } },
+//
+// { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+// { assertEquals(3000.0, monitorNoDelay.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } },
+// { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } },
+// { assertEquals(3000.0, monitorPerfect.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } },
+//
+// { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+// { assertEquals(3000.0, monitorNoDelay.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } },
+// { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } },
+// { assertEquals(3000.0, monitorPerfect.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } },
+// )
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt
index 6d80ce56..f9a20c68 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt
@@ -30,7 +30,7 @@ import org.opendc.compute.simulator.scheduler.filters.ComputeFilter
import org.opendc.compute.simulator.scheduler.filters.RamFilter
import org.opendc.compute.simulator.scheduler.filters.VCpuFilter
import org.opendc.compute.workload.Task
-import org.opendc.simulator.compute.workload.TraceFragment
+import org.opendc.simulator.compute.workload.trace.TraceFragment
import java.util.ArrayList
class SchedulerTest {
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
index eadd97e4..df45f374 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
@@ -43,8 +43,10 @@ import org.opendc.compute.topology.specs.ClusterSpec
import org.opendc.compute.workload.Task
import org.opendc.experiments.base.experiment.specs.FailureModelSpec
import org.opendc.experiments.base.runner.replay
-import org.opendc.simulator.compute.workload.TraceFragment
-import org.opendc.simulator.compute.workload.TraceWorkload
+import org.opendc.simulator.compute.workload.trace.TraceFragment
+import org.opendc.simulator.compute.workload.trace.TraceWorkload
+import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling
+import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy
import org.opendc.simulator.kotlin.runSimulation
import java.time.Duration
import java.time.LocalDateTime
@@ -69,6 +71,7 @@ fun createTestTask(
checkpointInterval: Long = 0L,
checkpointDuration: Long = 0L,
checkpointIntervalScaling: Double = 1.0,
+ scalingPolicy: ScalingPolicy = NoDelayScaling(),
): Task {
return Task(
UUID.nameUUIDFromBytes(name.toByteArray()),
@@ -84,6 +87,7 @@ fun createTestTask(
checkpointInterval,
checkpointDuration,
checkpointIntervalScaling,
+ scalingPolicy,
),
)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java
index 2919fc3a..ebfcc552 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java
@@ -55,11 +55,11 @@ public abstract class SimWorkload extends FlowNode implements FlowConsumer {
public abstract Workload getSnapshot();
- abstract void createCheckpointModel();
+ public abstract void createCheckpointModel();
- abstract long getCheckpointInterval();
+ public abstract long getCheckpointInterval();
- abstract long getCheckpointDuration();
+ public abstract long getCheckpointDuration();
- abstract double getCheckpointIntervalScaling();
+ public abstract double getCheckpointIntervalScaling();
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java
index 9b12b1e3..93733268 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java
@@ -20,9 +20,12 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload;
+package org.opendc.simulator.compute.workload.trace;
import java.util.LinkedList;
+import org.opendc.simulator.compute.workload.SimWorkload;
+import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling;
+import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy;
import org.opendc.simulator.engine.graph.FlowConsumer;
import org.opendc.simulator.engine.graph.FlowEdge;
import org.opendc.simulator.engine.graph.FlowGraph;
@@ -37,13 +40,18 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
private long startOfFragment;
private FlowEdge machineEdge;
- private double currentDemand;
- private double currentSupply;
+
+ private double cpuFreqDemand = 0.0; // The Cpu demanded by fragment
+ private double cpuFreqSupplied = 0.0; // The Cpu speed supplied
+ private double newCpuFreqSupplied = 0.0; // The Cpu speed supplied
+ private double remainingWork = 0.0; // The duration of the fragment at the demanded speed
private long checkpointDuration;
private TraceWorkload snapshot;
+ private ScalingPolicy scalingPolicy = new NoDelayScaling();
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Basic Getters and Setters
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -57,27 +65,20 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
}
@Override
- long getCheckpointInterval() {
+ public long getCheckpointInterval() {
return 0;
}
@Override
- long getCheckpointDuration() {
+ public long getCheckpointDuration() {
return 0;
}
@Override
- double getCheckpointIntervalScaling() {
+ public double getCheckpointIntervalScaling() {
return 0;
}
- public TraceFragment getNextFragment() {
- this.currentFragment = this.remainingFragments.pop();
- this.fragmentIndex++;
-
- return this.currentFragment;
- }
-
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Constructors
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -87,15 +88,12 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.snapshot = workload;
this.checkpointDuration = workload.getCheckpointDuration();
+ this.scalingPolicy = workload.getScalingPolicy();
this.remainingFragments = new LinkedList<>(workload.getFragments());
this.fragmentIndex = 0;
final FlowGraph graph = ((FlowNode) supplier).getGraph();
graph.addEdge(this, supplier);
-
- this.currentFragment = this.getNextFragment();
- pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage());
- this.startOfFragment = now;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -105,36 +103,50 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
@Override
public long onUpdate(long now) {
long passedTime = getPassedTime(now);
- long duration = this.currentFragment.duration();
+ this.startOfFragment = now;
- // The current Fragment has not yet been finished, continue
- if (passedTime < duration) {
- return now + (duration - passedTime);
+ // The amount of work done since last update
+ double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime);
+
+ this.remainingWork -= finishedWork;
+
+ // If this.remainingWork <= 0, the fragment has been completed
+ if (this.remainingWork <= 0) {
+ this.startNextFragment();
+
+ this.invalidate();
+ return Long.MAX_VALUE;
}
- // Loop through fragments until the passed time is filled.
- // We need a while loop to account for skipping of fragments.
- while (passedTime >= duration) {
- if (this.remainingFragments.isEmpty()) {
- this.stopWorkload();
- return Long.MAX_VALUE;
- }
+ this.cpuFreqSupplied = this.newCpuFreqSupplied;
- passedTime = passedTime - duration;
+ // The amount of time required to finish the fragment at this speed
+ long remainingDuration = this.scalingPolicy.getRemainingDuration(
+ this.cpuFreqDemand, this.newCpuFreqSupplied, this.remainingWork);
- // get next Fragment
- currentFragment = this.getNextFragment();
- duration = currentFragment.duration();
+ return now + remainingDuration;
+ }
+
+ public TraceFragment getNextFragment() {
+ if (this.remainingFragments.isEmpty()) {
+ return null;
}
+ this.currentFragment = this.remainingFragments.pop();
+ this.fragmentIndex++;
- // start new fragment
- this.startOfFragment = now - passedTime;
+ return this.currentFragment;
+ }
- // Change the cpu Usage to the new Fragment
- pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage());
+ private void startNextFragment() {
- // Return the time when the current fragment will complete
- return this.startOfFragment + duration;
+ TraceFragment nextFragment = this.getNextFragment();
+ if (nextFragment == null) {
+ this.stopWorkload();
+ return;
+ }
+ double demand = nextFragment.cpuUsage();
+ this.remainingWork = this.scalingPolicy.getRemainingWork(demand, nextFragment.duration());
+ this.pushOutgoingDemand(this.machineEdge, demand);
}
@Override
@@ -159,7 +171,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
* TODO: Maybe add checkpoint models for SimTraceWorkload
*/
@Override
- void createCheckpointModel() {}
+ public void createCheckpointModel() {}
/**
* Create a new snapshot based on the current status of the workload.
@@ -171,7 +183,15 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
// Get remaining time of current fragment
long passedTime = getPassedTime(now);
- long remainingTime = currentFragment.duration() - passedTime;
+
+ // The amount of work done since last update
+ double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime);
+
+ this.remainingWork -= finishedWork;
+
+ // The amount of time required to finish the fragment at this speed
+ long remainingTime =
+ this.scalingPolicy.getRemainingDuration(this.cpuFreqDemand, this.cpuFreqDemand, this.remainingWork);
// If this is the end of the Task, don't make a snapshot
if (remainingTime <= 0 && remainingFragments.isEmpty()) {
@@ -189,13 +209,13 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.remainingFragments.addFirst(newFragment);
// Create and add a fragment for processing the snapshot process
- // TODO: improve the implementation of cpuUsage and coreCount
- TraceFragment snapshotFragment = new TraceFragment(this.checkpointDuration, 123456, 1);
+ TraceFragment snapshotFragment = new TraceFragment(
+ this.checkpointDuration, this.snapshot.getMaxCpuDemand(), this.snapshot.getMaxCoreCount());
this.remainingFragments.addFirst(snapshotFragment);
this.fragmentIndex = -1;
- this.currentFragment = getNextFragment();
- pushOutgoingDemand(this.machineEdge, this.currentFragment.cpuUsage());
+ startNextFragment();
+
this.startOfFragment = now;
this.invalidate();
@@ -213,11 +233,14 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
*/
@Override
public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
- if (newSupply == this.currentSupply) {
+ if (newSupply == this.cpuFreqSupplied) {
return;
}
- this.currentSupply = newSupply;
+ this.cpuFreqSupplied = this.newCpuFreqSupplied;
+ this.newCpuFreqSupplied = newSupply;
+
+ this.invalidate();
}
/**
@@ -228,11 +251,11 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
*/
@Override
public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
- if (newDemand == this.currentDemand) {
+ if (newDemand == this.cpuFreqDemand) {
return;
}
- this.currentDemand = newDemand;
+ this.cpuFreqDemand = newDemand;
this.machineEdge.pushDemand(newDemand);
}
@@ -257,6 +280,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
if (this.machineEdge == null) {
return;
}
+
this.stopWorkload();
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java
index 550c2135..a09206a1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload;
+package org.opendc.simulator.compute.workload.trace;
public record TraceFragment(long duration, double cpuUsage, int coreCount) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java
index 7f82ab71..47292a7b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java
@@ -20,10 +20,15 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload;
+package org.opendc.simulator.compute.workload.trace;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import org.opendc.simulator.compute.workload.SimWorkload;
+import org.opendc.simulator.compute.workload.Workload;
+import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling;
+import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy;
import org.opendc.simulator.engine.graph.FlowSupplier;
public class TraceWorkload implements Workload {
@@ -31,20 +36,40 @@ public class TraceWorkload implements Workload {
private final long checkpointInterval;
private final long checkpointDuration;
private final double checkpointIntervalScaling;
+ private final double maxCpuDemand;
+ private final int maxCoreCount;
+
+ public ScalingPolicy getScalingPolicy() {
+ return scalingPolicy;
+ }
+
+ private final ScalingPolicy scalingPolicy;
public TraceWorkload(
ArrayList<TraceFragment> fragments,
long checkpointInterval,
long checkpointDuration,
- double checkpointIntervalScaling) {
+ double checkpointIntervalScaling,
+ ScalingPolicy scalingPolicy) {
this.fragments = fragments;
this.checkpointInterval = checkpointInterval;
this.checkpointDuration = checkpointDuration;
this.checkpointIntervalScaling = checkpointIntervalScaling;
+ this.scalingPolicy = scalingPolicy;
+
+ // TODO: remove if we decide not to use it.
+ this.maxCpuDemand = fragments.stream()
+ .max(Comparator.comparing(TraceFragment::cpuUsage))
+ .get()
+ .cpuUsage();
+ this.maxCoreCount = fragments.stream()
+ .max(Comparator.comparing(TraceFragment::coreCount))
+ .get()
+ .coreCount();
}
public TraceWorkload(ArrayList<TraceFragment> fragments) {
- this(fragments, 0L, 0L, 1.0);
+ this(fragments, 0L, 0L, 1.0, new NoDelayScaling());
}
public ArrayList<TraceFragment> getFragments() {
@@ -66,6 +91,14 @@ public class TraceWorkload implements Workload {
return checkpointIntervalScaling;
}
+ public int getMaxCoreCount() {
+ return maxCoreCount;
+ }
+
+ public double getMaxCpuDemand() {
+ return maxCpuDemand;
+ }
+
public void removeFragments(int numberOfFragments) {
if (numberOfFragments <= 0) {
return;
@@ -83,11 +116,15 @@ public class TraceWorkload implements Workload {
}
public static Builder builder() {
- return builder(0L, 0L, 0.0);
+ return builder(0L, 0L, 0.0, new NoDelayScaling());
}
- public static Builder builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) {
- return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling);
+ public static Builder builder(
+ long checkpointInterval,
+ long checkpointDuration,
+ double checkpointIntervalScaling,
+ ScalingPolicy scalingPolicy) {
+ return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy);
}
/**
@@ -125,15 +162,21 @@ public class TraceWorkload implements Workload {
private final long checkpointInterval;
private final long checkpointDuration;
private final double checkpointIntervalScaling;
+ private final ScalingPolicy scalingPolicy;
/**
* Construct a new {@link Builder} instance.
*/
- private Builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) {
+ private Builder(
+ long checkpointInterval,
+ long checkpointDuration,
+ double checkpointIntervalScaling,
+ ScalingPolicy scalingPolicy) {
this.fragments = new ArrayList<>();
this.checkpointInterval = checkpointInterval;
this.checkpointDuration = checkpointDuration;
this.checkpointIntervalScaling = checkpointIntervalScaling;
+ this.scalingPolicy = scalingPolicy;
}
/**
@@ -152,7 +195,11 @@ public class TraceWorkload implements Workload {
*/
public TraceWorkload build() {
return new TraceWorkload(
- this.fragments, this.checkpointInterval, this.checkpointDuration, this.checkpointIntervalScaling);
+ this.fragments,
+ this.checkpointInterval,
+ this.checkpointDuration,
+ this.checkpointIntervalScaling,
+ this.scalingPolicy);
}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java
new file mode 100644
index 00000000..4230bb55
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload.trace.scaling;
+
+/**
+ * The NoDelay scaling policy states that there will be no delay
+ * when less CPU can be provided than needed.
+ *
+ * This could be used in situations where the data is streamed.
+ * This will also result in the same behaviour as older OpenDC.
+ */
+public class NoDelayScaling implements ScalingPolicy {
+ @Override
+ public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) {
+ return cpuFreqDemand * passedTime;
+ }
+
+ @Override
+ public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) {
+ return (long) (remainingWork / cpuFreqDemand);
+ }
+
+ @Override
+ public double getRemainingWork(double cpuFreqDemand, long duration) {
+ return cpuFreqDemand * duration;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java
new file mode 100644
index 00000000..7eae70e6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload.trace.scaling;
+
+/**
+ * PerfectScaling scales the workload duration perfectly
+ * based on the CPU capacity.
+ *
+ * This means that if a fragment has a duration of 10 min at 4000 mHz,
+ * it will take 20 min and 2000 mHz.
+ */
+public class PerfectScaling implements ScalingPolicy {
+ @Override
+ public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) {
+ return cpuFreqSupplied * passedTime;
+ }
+
+ @Override
+ public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) {
+ return (long) (remainingWork / cpuFreqSupplied);
+ }
+
+ @Override
+ public double getRemainingWork(double cpuFreqDemand, long duration) {
+ return cpuFreqDemand * duration;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java
new file mode 100644
index 00000000..a0f473ba
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload.trace.scaling;
+
+/**
+ * Interface for the scaling policy.
+ * A scaling decides how a TaskFragment should scale when it is not getting the demanded capacity
+ */
+public interface ScalingPolicy {
+
+ /**
+ * Calculate how much work was finished based on the demanded and supplied cpu
+ *
+ * @param cpuFreqDemand
+ * @param cpuFreqSupplied
+ * @param passedTime
+ * @return
+ */
+ double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime);
+
+ /**
+ * Calculate the remaining duration of this fragment based on the demanded and supplied cpu
+ *
+ * @param cpuFreqDemand
+ * @param cpuFreqSupplied
+ * @param remainingWork
+ * @return
+ */
+ long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork);
+
+ /**
+ * Calculate how much work is remaining based on the demanded and supplied cpu
+ *
+ * @param cpuFreqDemand
+ * @param duration
+ * @return
+ */
+ double getRemainingWork(double cpuFreqDemand, long duration);
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
index ad69a3d6..49baaf48 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
@@ -24,7 +24,7 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.suspendCancellableCoroutine
import org.opendc.simulator.compute.machine.SimMachine
-import org.opendc.simulator.compute.workload.TraceWorkload
+import org.opendc.simulator.compute.workload.trace.TraceWorkload
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
index 67540f4e..3f18ac76 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
@@ -139,16 +139,16 @@ public final class FlowEngine implements Runnable {
* This method should only be invoked while inside an engine cycle.
*/
public void scheduleDelayedInContext(FlowNode ctx) {
- FlowEventQueue timerQueue = this.eventQueue;
- timerQueue.enqueue(ctx);
+ FlowEventQueue eventQueue = this.eventQueue;
+ eventQueue.enqueue(ctx);
}
/**
* Run all the enqueued actions for the specified timestamp (<code>now</code>).
*/
private void doRunEngine(long now) {
- final FlowCycleQueue queue = this.cycleQueue;
- final FlowEventQueue timerQueue = this.eventQueue;
+ final FlowCycleQueue cycleQueue = this.cycleQueue;
+ final FlowEventQueue eventQueue = this.eventQueue;
try {
// Mark the engine as active to prevent concurrent calls to this method
@@ -156,7 +156,7 @@ public final class FlowEngine implements Runnable {
// Execute all scheduled updates at current timestamp
while (true) {
- final FlowNode ctx = timerQueue.poll(now);
+ final FlowNode ctx = eventQueue.poll(now);
if (ctx == null) {
break;
}
@@ -166,7 +166,7 @@ public final class FlowEngine implements Runnable {
// Execute all immediate updates
while (true) {
- final FlowNode ctx = queue.poll();
+ final FlowNode ctx = cycleQueue.poll();
if (ctx == null) {
break;
}
@@ -178,7 +178,7 @@ public final class FlowEngine implements Runnable {
}
// Schedule an engine invocation for the next update to occur.
- long headDeadline = timerQueue.peekDeadline();
+ long headDeadline = eventQueue.peekDeadline();
if (headDeadline != Long.MAX_VALUE && headDeadline >= now) {
trySchedule(futureInvocations, now, headDeadline);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index 16bb161f..ff7ff199 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -24,6 +24,9 @@ package org.opendc.simulator.engine.graph;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer {
private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
@@ -36,13 +39,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
private double currentIncomingSupply; // The current supply provided by the supplier
private boolean outgoingDemandUpdateNeeded = false;
+ private final Set<Integer> updatedDemands =
+ new HashSet<>(); // Array of consumers that updated their demand in this cycle
private boolean overloaded = false;
private double capacity; // What is the max capacity. Can probably be removed
- private final ArrayList<Integer> updatedDemands = new ArrayList<>();
-
public FlowDistributor(FlowGraph graph) {
super(graph);
}
@@ -68,7 +71,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return Long.MAX_VALUE;
}
- this.updateOutgoingSupplies();
+ if (!this.outgoingSupplies.isEmpty()) {
+ this.updateOutgoingSupplies();
+ }
return Long.MAX_VALUE;
}
@@ -100,7 +105,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
// provide all consumers with their demand
if (this.overloaded) {
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) {
+ if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) {
this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx));
}
}
@@ -190,23 +195,25 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
this.totalIncomingDemand -= consumerEdge.getDemand();
+ // Remove idx from consumers that updated their demands
+ this.updatedDemands.remove(idx);
+
this.consumerEdges.remove(idx);
this.incomingDemands.remove(idx);
this.outgoingSupplies.remove(idx);
// update the consumer index for all consumerEdges higher than this.
for (int i = idx; i < this.consumerEdges.size(); i++) {
- this.consumerEdges.get(i).setConsumerIndex(i);
+ FlowEdge other = this.consumerEdges.get(i);
+
+ other.setConsumerIndex(other.getConsumerIndex() - 1);
}
- for (int i = 0; i < this.updatedDemands.size(); i++) {
- int j = this.updatedDemands.get(i);
+ for (int idx_other : this.updatedDemands) {
- if (j == idx) {
- this.updatedDemands.remove(idx);
- }
- if (j > idx) {
- this.updatedDemands.set(i, j - 1);
+ if (idx_other > idx) {
+ this.updatedDemands.remove(idx_other);
+ this.updatedDemands.add(idx_other - 1);
}
}
@@ -220,7 +227,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
this.capacity = 0;
this.currentIncomingSupply = 0;
- this.invalidate();
+ this.updatedDemands.clear();
+
+ this.closeNode();
}
@Override
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
index 0e6e137c..91662950 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
@@ -63,7 +63,7 @@ public class FlowGraph {
// Remove all edges connected to node
final ArrayList<FlowEdge> connectedEdges = nodeToEdge.get(node);
- while (connectedEdges.size() > 0) {
+ while (!connectedEdges.isEmpty()) {
removeEdge(connectedEdges.get(0));
}
@@ -90,7 +90,7 @@ public class FlowGraph {
throw new IllegalArgumentException("The consumer is not a node in this graph");
}
if (!(this.nodes.contains((FlowNode) flowSupplier))) {
- throw new IllegalArgumentException("The consumer is not a node in this graph");
+ throw new IllegalArgumentException("The supplier is not a node in this graph");
}
final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier);