summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-base/src/test/kotlin/org
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-10-25 13:32:41 +0200
committerGitHub <noreply@github.com>2024-10-25 13:32:41 +0200
commit5a365dbc068f2a8cdfa9813c39cc84bb30e15637 (patch)
tree72716d562787b85e03cdc7fe1d30c827054d25a0 /opendc-experiments/opendc-experiments-base/src/test/kotlin/org
parent27f5b7dcb05aefdab9b762175d538931face0aba (diff)
Rewrote the FlowEngine (#256)
* Removed unused components. Updated tests. Improved checkpointing model Improved model, started with SimPowerSource implemented FailureModels and Checkpointing First working version midway commit first update All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability. * fixed merge conflicts * Updated M3SA paths. * Fixed small typo
Diffstat (limited to 'opendc-experiments/opendc-experiments-base/src/test/kotlin/org')
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt218
1 files changed, 162 insertions, 56 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
index 08eddca0..41d18225 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
@@ -26,26 +26,27 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
-import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.simulator.provisioner.Provisioner
import org.opendc.compute.simulator.provisioner.registerComputeMonitor
import org.opendc.compute.simulator.provisioner.setupComputeService
import org.opendc.compute.simulator.provisioner.setupHosts
-import org.opendc.compute.telemetry.ComputeMonitor
-import org.opendc.compute.telemetry.table.HostTableReader
-import org.opendc.compute.telemetry.table.ServiceTableReader
+import org.opendc.compute.simulator.scheduler.FilterScheduler
+import org.opendc.compute.simulator.scheduler.filters.ComputeFilter
+import org.opendc.compute.simulator.scheduler.filters.RamFilter
+import org.opendc.compute.simulator.scheduler.filters.VCpuFilter
+import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.compute.simulator.telemetry.ComputeMonitor
+import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
import org.opendc.compute.topology.clusterTopology
import org.opendc.compute.topology.specs.HostSpec
import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
+import org.opendc.compute.workload.Task
import org.opendc.compute.workload.sampleByLoad
import org.opendc.compute.workload.trace
import org.opendc.experiments.base.runner.replay
+import org.opendc.experiments.base.scenario.specs.TraceBasedFailureModelSpec
import org.opendc.simulator.kotlin.runSimulation
import java.io.File
import java.util.Random
@@ -80,18 +81,18 @@ class ScenarioIntegrationTest {
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0)),
)
- workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace"))
+ workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 0L, 0L, 0.0)
}
/**
- * Test a large simulation setup.
+ * Test a small simulation setup.
*/
@Test
- fun testLarge() =
+ fun testSingleTask() =
runSimulation {
- val seed = 0L
- val workload = createTestWorkload(1.0, seed)
- val topology = createTopology("multi.json")
+ val seed = 1L
+ val workload = createTestWorkload("single_task", 1.0, seed)
+ val topology = createTopology("single.json")
val monitor = monitor
Provisioner(dispatcher, seed).use { provisioner ->
@@ -116,16 +117,11 @@ class ScenarioIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") },
- { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
- { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
- { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
- { assertEquals(43101769345, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(3489430672, monitor.activeTime) { "Incorrect active time" } },
- { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
- { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
- { assertEquals(3.3388920269258898E7, monitor.powerDraw, 1E4) { "Incorrect power draw" } },
- { assertEquals(1.0016127451211525E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
+ { assertEquals(0, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(3000000, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(1200000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
)
}
@@ -133,12 +129,13 @@ class ScenarioIntegrationTest {
* Test a small simulation setup.
*/
@Test
- fun testSmall() =
+ fun testSingleTaskSingleFailure() =
runSimulation {
val seed = 1L
- val workload = createTestWorkload(0.25, seed)
+ val workload = createTestWorkload("single_task", 1.0, seed)
val topology = createTopology("single.json")
val monitor = monitor
+ val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/single_failure.parquet")
Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
@@ -148,7 +145,7 @@ class ScenarioIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(timeSource, workload, seed = seed)
+ service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed)
}
println(
@@ -162,24 +159,25 @@ class ScenarioIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(1373419781, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(1217668222, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(2200000, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(5000000, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(2539987.394500494, monitor.powerDraw, 1E4) { "Incorrect power draw" } },
- { assertEquals(7.617527900379665E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
+ { assertEquals(2440000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
)
}
/**
- * Test a small simulation setup with interference.
- * TODO: Interference is currently removed from OpenDC. Reactivate when interference is back in.
+ * Test a small simulation setup.
*/
- fun testInterference() =
+ @Test
+ fun testSingleTask11Failures() =
runSimulation {
- val seed = 0L
- val workload = createTestWorkload(1.0, seed)
+ val seed = 1L
+ val workload = createTestWorkload("single_task", 1.0, seed)
val topology = createTopology("single.json")
+ val monitor = monitor
+ val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet")
Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
@@ -189,7 +187,7 @@ class ScenarioIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(timeSource, workload, seed = seed)
+ service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed)
}
println(
@@ -203,22 +201,69 @@ class ScenarioIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(42814948316, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(40138266225, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(23489356981, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(1, monitor.tasksTerminated) { "Idle time incorrect" } },
+ { assertEquals(18100000, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(20000000, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(1.162E7, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
)
}
/**
- * Test a small simulation setup with failures.
- * FIXME: Currently failures do not work. reactivate this test when Failures are implemented again
+ * Test a small simulation setup.
*/
- fun testFailures() =
+ @Test
+ fun testSingleTaskCheckpoint() =
runSimulation {
- val seed = 0L
+ val seed = 1L
+ workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 1000000L, 1000L, 1.0)
+ val workload = createTestWorkload("single_task", 1.0, seed)
+ val topology = createTopology("single.json")
+ val monitor = monitor
+ val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet")
+
+ Provisioner(dispatcher, seed).use { provisioner ->
+ provisioner.runSteps(
+ setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
+ registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
+ setupHosts(serviceDomain = "compute.opendc.org", topology),
+ )
+
+ val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
+ service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed)
+ }
+
+ println(
+ "Scheduler " +
+ "Success=${monitor.attemptsSuccess} " +
+ "Failure=${monitor.attemptsFailure} " +
+ "Error=${monitor.attemptsError} " +
+ "Pending=${monitor.tasksPending} " +
+ "Active=${monitor.tasksActive}",
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } },
+ { assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } },
+ { assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
+ )
+ }
+
+ /**
+ * Test a small simulation setup.
+ */
+ @Test
+ fun testSmall() =
+ runSimulation {
+ val seed = 1L
+ val workload = createTestWorkload("bitbrains-small", 0.25, seed)
val topology = createTopology("single.json")
- val workload = createTestWorkload(0.25, seed)
val monitor = monitor
Provisioner(dispatcher, seed).use { provisioner ->
@@ -229,16 +274,72 @@ class ScenarioIntegrationTest {
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(timeSource, workload, seed = seed, failureModelSpec = null)
+ service.replay(timeSource, workload, seed = seed)
}
+ println(
+ "Scheduler " +
+ "Success=${monitor.attemptsSuccess} " +
+ "Failure=${monitor.attemptsFailure} " +
+ "Error=${monitor.attemptsError} " +
+ "Pending=${monitor.tasksPending} " +
+ "Active=${monitor.tasksActive}",
+ )
+
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(1404277711, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(1478675712, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(1803918601, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(787181585, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(360369187, monitor.uptime) { "Uptime incorrect" } },
+ { assertEquals(6.756768E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
+ )
+ }
+
+ /**
+ * Test a large simulation setup.
+ */
+ @Test
+ fun testLarge() =
+ runSimulation {
+ val seed = 0L
+ val workload = createTestWorkload("bitbrains-small", 1.0, seed)
+ val topology = createTopology("multi.json")
+ val monitor = monitor
+
+ Provisioner(dispatcher, seed).use { provisioner ->
+ provisioner.runSteps(
+ setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
+ registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
+ setupHosts(serviceDomain = "compute.opendc.org", topology),
+ )
+
+ val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
+ service.replay(timeSource, workload, seed = seed)
+ }
+
+ println(
+ "Scheduler " +
+ "Success=${monitor.attemptsSuccess} " +
+ "Failure=${monitor.attemptsFailure} " +
+ "Error=${monitor.attemptsError} " +
+ "Pending=${monitor.tasksPending} " +
+ "Active=${monitor.tasksActive}",
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+ { assertEquals(50, monitor.tasksCompleted, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, monitor.tasksTerminated, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
+ { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
+ { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
+ { assertEquals(43101788258, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(3489412702, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
+ { assertEquals(1.0016592256E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
)
}
@@ -246,10 +347,11 @@ class ScenarioIntegrationTest {
* Obtain the trace reader for the test.
*/
private fun createTestWorkload(
+ traceName: String,
fraction: Double,
seed: Long,
- ): List<VirtualMachine> {
- val source = trace("bitbrains-small").sampleByLoad(fraction)
+ ): List<Task> {
+ val source = trace(traceName).sampleByLoad(fraction)
return source.resolve(workloadLoader, Random(seed))
}
@@ -257,7 +359,7 @@ class ScenarioIntegrationTest {
* Obtain the topology factory for the test.
*/
private fun createTopology(name: String): List<HostSpec> {
- val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name"))
+ val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/topologies/$name"))
return stream.use { clusterTopology(stream) }
}
@@ -267,13 +369,17 @@ class ScenarioIntegrationTest {
var attemptsError = 0
var tasksPending = 0
var tasksActive = 0
+ var tasksTerminated = 0
+ var tasksCompleted = 0
override fun record(reader: ServiceTableReader) {
attemptsSuccess = reader.attemptsSuccess
attemptsFailure = reader.attemptsFailure
- attemptsError = reader.attemptsError
+ attemptsError = 0
tasksPending = reader.tasksPending
tasksActive = reader.tasksActive
+ tasksTerminated = reader.tasksTerminated
+ tasksCompleted = reader.tasksCompleted
}
var idleTime = 0L