summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-10-25 13:32:41 +0200
committerGitHub <noreply@github.com>2024-10-25 13:32:41 +0200
commit5a365dbc068f2a8cdfa9813c39cc84bb30e15637 (patch)
tree72716d562787b85e03cdc7fe1d30c827054d25a0 /opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc
parent27f5b7dcb05aefdab9b762175d538931face0aba (diff)
Rewrote the FlowEngine (#256)
* Removed unused components. Updated tests. Improved checkpointing model Improved model, started with SimPowerSource implemented FailureModels and Checkpointing First working version midway commit first update All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability. * fixed merge conflicts * Updated M3SA paths. * Fixed small typo
Diffstat (limited to 'opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc')
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt39
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt23
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt2
8 files changed, 37 insertions, 37 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt
index 49fa409e..c82e2557 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt
@@ -29,13 +29,12 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.yield
-import org.opendc.compute.api.Task
import org.opendc.compute.api.TaskState
-import org.opendc.compute.api.TaskWatcher
import org.opendc.compute.failure.models.FailureModel
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.workload.VirtualMachine
-import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec
+import org.opendc.compute.simulator.TaskWatcher
+import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.compute.simulator.service.ServiceTask
+import org.opendc.compute.workload.Task
import org.opendc.experiments.base.scenario.specs.FailureModelSpec
import org.opendc.experiments.base.scenario.specs.createFailureModel
import java.time.InstantSource
@@ -50,7 +49,7 @@ import kotlin.math.max
*/
public class RunningTaskWatcher : TaskWatcher {
// TODO: make this changeable
- private val unlockStates: List<TaskState> = listOf(TaskState.DELETED, TaskState.TERMINATED)
+ private val unlockStates: List<TaskState> = listOf(TaskState.DELETED)
private val mutex: Mutex = Mutex()
@@ -63,7 +62,7 @@ public class RunningTaskWatcher : TaskWatcher {
}
override fun onStateChanged(
- task: Task,
+ task: ServiceTask,
newState: TaskState,
) {
if (unlockStates.contains(newState)) {
@@ -73,7 +72,7 @@ public class RunningTaskWatcher : TaskWatcher {
}
/**
- * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished.
+ * Helper method to replay the specified list of [Task] and suspend execution util all VMs have finished.
*
* @param clock The simulation clock.
* @param trace The trace to simulate.
@@ -83,9 +82,8 @@ public class RunningTaskWatcher : TaskWatcher {
*/
public suspend fun ComputeService.replay(
clock: InstantSource,
- trace: List<VirtualMachine>,
+ trace: List<Task>,
failureModelSpec: FailureModelSpec? = null,
- checkpointModelSpec: CheckpointModelSpec? = null,
seed: Long = 0,
submitImmediately: Boolean = false,
) {
@@ -97,9 +95,6 @@ public suspend fun ComputeService.replay(
createFailureModel(coroutineContext, clock, this, Random(seed), it)
}
- // Create new image for the virtual machine
- val image = client.newImage("vm-image")
-
try {
coroutineScope {
// Start the fault injector
@@ -107,9 +102,9 @@ public suspend fun ComputeService.replay(
var simulationOffset = Long.MIN_VALUE
- for (entry in trace.sortedBy { it.startTime }) {
+ for (entry in trace.sortedBy { it.submissionTime }) {
val now = clock.millis()
- val start = entry.startTime.toEpochMilli()
+ val start = entry.submissionTime.toEpochMilli()
// Set the simulationOffset based on the starting time of the first task
if (simulationOffset == Long.MIN_VALUE) {
@@ -121,25 +116,21 @@ public suspend fun ComputeService.replay(
delay(max(0, (start - now - simulationOffset)))
}
- val checkpointInterval = checkpointModelSpec?.checkpointInterval ?: 0L
- val checkpointDuration = checkpointModelSpec?.checkpointDuration ?: 0L
- val checkpointIntervalScaling = checkpointModelSpec?.checkpointIntervalScaling ?: 1.0
-
- val workload = entry.trace.createWorkload(start, checkpointInterval, checkpointDuration, checkpointIntervalScaling)
+ val workload = entry.trace
val meta = mutableMapOf<String, Any>("workload" to workload)
launch {
val task =
client.newTask(
entry.name,
- image,
client.newFlavor(
entry.name,
entry.cpuCount,
entry.memCapacity,
- meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(),
+ if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(),
),
- meta = meta,
+ workload,
+ meta,
)
val taskWatcher = RunningTaskWatcher()
@@ -150,7 +141,7 @@ public suspend fun ComputeService.replay(
taskWatcher.wait()
// Stop the task after reaching the end-time of the virtual machine
- task.delete()
+// task.delete()
}
}
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index 2bd9dfa3..df5aabf7 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -26,13 +26,13 @@ import me.tongfei.progressbar.ProgressBarBuilder
import me.tongfei.progressbar.ProgressBarStyle
import org.opendc.compute.carbon.CarbonTrace
import org.opendc.compute.carbon.getCarbonTrace
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.createComputeScheduler
import org.opendc.compute.simulator.provisioner.Provisioner
import org.opendc.compute.simulator.provisioner.registerComputeMonitor
import org.opendc.compute.simulator.provisioner.setupComputeService
import org.opendc.compute.simulator.provisioner.setupHosts
-import org.opendc.compute.telemetry.export.parquet.ParquetComputeMonitor
+import org.opendc.compute.simulator.scheduler.createComputeScheduler
+import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor
import org.opendc.compute.topology.clusterTopology
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.experiments.base.scenario.Scenario
@@ -89,14 +89,24 @@ public fun runScenario(
{ createComputeScheduler(scenario.allocationPolicySpec.policyType, Random(it.seeder.nextLong())) },
maxNumFailures = scenario.maxNumFailures,
),
- setupHosts(serviceDomain, topology, optimize = true),
+ setupHosts(serviceDomain, topology),
)
- val workloadLoader = ComputeWorkloadLoader(File(scenario.workloadSpec.pathToFile))
+ val checkpointInterval = scenario.checkpointModelSpec?.checkpointInterval ?: 0L
+ val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L
+ val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0
+
+ val workloadLoader =
+ ComputeWorkloadLoader(
+ File(scenario.workloadSpec.pathToFile),
+ checkpointInterval,
+ checkpointDuration,
+ checkpointIntervalScaling,
+ )
val tasks = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed))
val carbonTrace = getCarbonTrace(scenario.carbonTracePath)
- val startTime = Duration.ofMillis(tasks.minOf { it.startTime }.toEpochMilli())
+ val startTime = Duration.ofMillis(tasks.minOf { it.submissionTime }.toEpochMilli())
addExportModel(provisioner, serviceDomain, scenario, seed, startTime, carbonTrace, scenario.id)
val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
@@ -104,7 +114,6 @@ public fun runScenario(
timeSource,
tasks,
failureModelSpec = scenario.failureModelSpec,
- checkpointModelSpec = scenario.checkpointModelSpec,
seed = seed,
)
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt
index 160bd783..8ed60b08 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt
@@ -25,7 +25,7 @@ package org.opendc.experiments.base.scenario
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
-import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig
+import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig
import org.opendc.experiments.base.scenario.specs.ExperimentSpec
import java.io.File
import java.io.InputStream
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt
index 91cd09ba..f649e4f8 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.base.scenario
-import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig
+import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig
import org.opendc.experiments.base.scenario.specs.AllocationPolicySpec
import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec
import org.opendc.experiments.base.scenario.specs.ExportModelSpec
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt
index edfdfaf5..ddc11a50 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt
@@ -23,7 +23,7 @@
package org.opendc.experiments.base.scenario.specs
import kotlinx.serialization.Serializable
-import org.opendc.compute.service.scheduler.ComputeSchedulerEnum
+import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum
/**
* specification describing how tasks are allocated
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt
index 60fcf51a..b957ea18 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt
@@ -25,7 +25,7 @@ package org.opendc.experiments.base.scenario.specs
import kotlinx.serialization.Serializable
import org.opendc.common.logger.infoNewLine
import org.opendc.common.logger.logger
-import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig
+import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig
import java.util.UUID
/**
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt
index a27e77bc..c20b4467 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt
@@ -61,7 +61,7 @@ import org.opendc.compute.failure.models.SampleBasedFailureModel
import org.opendc.compute.failure.models.TraceBasedFailureModel
import org.opendc.compute.failure.prefab.FailurePrefab
import org.opendc.compute.failure.prefab.createFailureModelPrefab
-import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.service.ComputeService
import java.io.File
import java.time.InstantSource
import kotlin.coroutines.CoroutineContext
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt
index d7fdb8f4..8f2146f1 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt
@@ -23,7 +23,7 @@
package org.opendc.experiments.base.scenario.specs
import kotlinx.serialization.Serializable
-import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig
+import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig
@Serializable
public data class ScenarioSpec(