From 8bbc3de611f9a679b5fb542241d32f887b4fe921 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Fri, 6 Dec 2024 15:44:09 +0100 Subject: Renamed Multiplexer to FlowDistributor (#282) * Restructured opendc-simulator-flow. Renamed Multiplexer to FlowDistributor. * spotless applied * Added FlowDistributor topologies back --- .../org/opendc/compute/simulator/host/SimHost.kt | 6 +- .../simulator/provisioner/HostsProvisioningStep.kt | 6 +- .../org/opendc/compute/topology/specs/HostSpec.kt | 3 +- .../opendc/experiments/base/FlowDistributorTest.kt | 591 +++++++++++++++++++++ .../org/opendc/experiments/base/MultiplexerTest.kt | 591 --------------------- .../FlowDistributor/topologies/single_1_2000.json | 22 + .../FlowDistributor/topologies/single_2_2000.json | 22 + .../Multiplexer/topologies/single_1_2000.json | 22 - .../Multiplexer/topologies/single_2_2000.json | 22 - .../simulator/compute/SimMachineBenchmarks.kt | 162 ------ .../org/opendc/simulator/compute/cpu/SimCpu.java | 10 +- .../simulator/compute/machine/SimMachine.java | 14 +- .../simulator/compute/machine/VirtualMachine.java | 10 +- .../opendc/simulator/compute/memory/Memory.java | 2 +- .../simulator/compute/power/CarbonModel.java | 4 +- .../simulator/compute/power/SimPowerSource.java | 8 +- .../org/opendc/simulator/compute/power/SimPsu.java | 10 +- .../simulator/compute/workload/ChainWorkload.java | 2 +- .../compute/workload/CheckpointModel.java | 4 +- .../compute/workload/SimChainWorkload.java | 6 +- .../compute/workload/SimTraceWorkload.java | 10 +- .../simulator/compute/workload/SimWorkload.java | 6 +- .../simulator/compute/workload/TraceWorkload.java | 2 +- .../simulator/compute/workload/Workload.java | 2 +- .../java/org/opendc/simulator/Multiplexer.java | 247 --------- .../org/opendc/simulator/engine/FlowConsumer.java | 34 -- .../java/org/opendc/simulator/engine/FlowEdge.java | 131 ----- .../org/opendc/simulator/engine/FlowEngine.java | 204 ------- .../org/opendc/simulator/engine/FlowGraph.java | 112 ---- .../java/org/opendc/simulator/engine/FlowNode.java | 191 ------- .../org/opendc/simulator/engine/FlowNodeQueue.java | 109 ---- .../org/opendc/simulator/engine/FlowSupplier.java | 36 -- .../opendc/simulator/engine/FlowTimerQueue.java | 205 ------- .../opendc/simulator/engine/InvocationStack.java | 94 ---- .../opendc/simulator/engine/engine/FlowEngine.java | 206 +++++++ .../simulator/engine/engine/FlowNodeQueue.java | 110 ++++ .../simulator/engine/engine/FlowTimerQueue.java | 206 +++++++ .../simulator/engine/engine/InvocationStack.java | 94 ++++ .../simulator/engine/graph/FlowConsumer.java | 34 ++ .../simulator/engine/graph/FlowDistributor.java | 248 +++++++++ .../opendc/simulator/engine/graph/FlowEdge.java | 131 +++++ .../opendc/simulator/engine/graph/FlowGraph.java | 113 ++++ .../opendc/simulator/engine/graph/FlowNode.java | 230 ++++++++ .../simulator/engine/graph/FlowSupplier.java | 36 ++ .../src/test/kotlin/InvocationStackTest.kt | 2 +- 45 files changed, 2096 insertions(+), 2214 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/MultiplexerTest.kt create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_1_2000.json create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_2_2000.json delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_1_2000.json delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_2_2000.json delete mode 100644 opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt index e1ccdfaf..f34e135d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -31,12 +31,12 @@ import org.opendc.compute.simulator.telemetry.GuestCpuStats import org.opendc.compute.simulator.telemetry.GuestSystemStats import org.opendc.compute.simulator.telemetry.HostCpuStats import org.opendc.compute.simulator.telemetry.HostSystemStats -import org.opendc.simulator.Multiplexer import org.opendc.simulator.compute.cpu.CpuPowerModel import org.opendc.simulator.compute.machine.SimMachine import org.opendc.simulator.compute.models.MachineModel import org.opendc.simulator.compute.models.MemoryUnit -import org.opendc.simulator.engine.FlowGraph +import org.opendc.simulator.engine.graph.FlowDistributor +import org.opendc.simulator.engine.graph.FlowGraph import java.time.Duration import java.time.Instant import java.time.InstantSource @@ -62,7 +62,7 @@ public class SimHost( private val graph: FlowGraph, private val machineModel: MachineModel, private val cpuPowerModel: CpuPowerModel, - private val powerMux: Multiplexer, + private val powerMux: FlowDistributor, ) : AutoCloseable { /** * The event listeners registered with this host. diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index 8e7293c8..8dcf2fa1 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -27,9 +27,9 @@ import org.opendc.compute.simulator.host.SimHost import org.opendc.compute.simulator.service.ComputeService import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.topology.specs.HostSpec -import org.opendc.simulator.Multiplexer import org.opendc.simulator.compute.power.SimPowerSource -import org.opendc.simulator.engine.FlowEngine +import org.opendc.simulator.engine.engine.FlowEngine +import org.opendc.simulator.engine.graph.FlowDistributor /** * A [ProvisioningStep] that provisions a list of hosts for a [ComputeService]. @@ -64,7 +64,7 @@ public class HostsProvisioningStep internal constructor( service.addPowerSource(simPowerSource) simPowerSources.add(simPowerSource) - val powerMux = Multiplexer(graph) + val powerMux = FlowDistributor(graph) graph.addEdge(powerMux, simPowerSource) // Create hosts, they are connected to the powerMux when SimMachine is created diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/HostSpec.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/HostSpec.kt index 1956ffde..0d0dd24d 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/HostSpec.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/specs/HostSpec.kt @@ -33,8 +33,7 @@ import java.util.UUID * @param name The name of the host. * @param meta The metadata of the host. * @param model The physical model of the machine. - * @param cpuPowerModel The [SimPsuFactory] to construct the PSU that models the power consumption of the machine. - * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host. + * @param cpuPowerModel The [cpuPowerModel] that determines the power draw based on cpu utilization */ public data class HostSpec( val uid: UUID, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt new file mode 100644 index 00000000..31c7dfd0 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt @@ -0,0 +1,591 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.base + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.simulator.provisioner.Provisioner +import org.opendc.compute.simulator.provisioner.registerComputeMonitor +import org.opendc.compute.simulator.provisioner.setupComputeService +import org.opendc.compute.simulator.provisioner.setupHosts +import org.opendc.compute.simulator.scheduler.FilterScheduler +import org.opendc.compute.simulator.scheduler.filters.ComputeFilter +import org.opendc.compute.simulator.scheduler.filters.RamFilter +import org.opendc.compute.simulator.scheduler.filters.VCpuFilter +import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.TaskTableReader +import org.opendc.compute.topology.clusterTopology +import org.opendc.compute.topology.specs.ClusterSpec +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.Task +import org.opendc.experiments.base.runner.replay +import org.opendc.simulator.compute.workload.TraceFragment +import org.opendc.simulator.compute.workload.TraceWorkload +import org.opendc.simulator.kotlin.runSimulation +import java.io.File +import java.time.Duration +import java.time.LocalDateTime +import java.time.ZoneId +import java.util.ArrayList +import java.util.UUID + +/** + * Testing suite containing tests that specifically test the FlowDistributor + */ +class FlowDistributorTest { + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestComputeMonitor + + /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** + * The [ComputeWorkloadLoader] responsible for loading the traces. + */ + private lateinit var workloadLoader: ComputeWorkloadLoader + + private val basePath = "src/test/resources/FlowDistributor" + + /** + * Set up the experimental environment. + */ + @BeforeEach + fun setUp() { + monitor = TestComputeMonitor() + computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) + workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0) + } + + private fun createTestTask( + name: String, + cpuCount: Int = 1, + cpuCapacity: Double = 0.0, + memCapacity: Long = 0L, + submissionTime: String = "1970-01-01T00:00", + duration: Long = 0L, + fragments: ArrayList, + ): Task { + return Task( + UUID.nameUUIDFromBytes(name.toByteArray()), + name, + cpuCount, + cpuCapacity, + memCapacity, + 1800000.0, + LocalDateTime.parse(submissionTime).atZone(ZoneId.systemDefault()).toInstant(), + duration, + TraceWorkload( + fragments, + ), + ) + } + + private fun runTest( + topology: List, + workload: ArrayList, + ): TestComputeMonitor { + runSimulation { + val monitor = monitor + val seed = 0L + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload) + } + } + return monitor + } + + /** + * FlowDistributor test 1: A single fitting task + * In this test, a single task is scheduled that should fit the FlowDistributor + * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor1() { + val workload: ArrayList = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + TraceFragment(10 * 60 * 1000, 2000.0, 1), + ), + ), + ) + val topology = createTopology("single_1_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * FlowDistributor test 2: A single overloaded task + * In this test, a single task is scheduled that does not fit the FlowDistributor + * In this test we expect the usage to be lower than the demand. + * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor2() { + val workload: ArrayList = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 3000.0, 1), + TraceFragment(10 * 60 * 1000, 4000.0, 1), + ), + ), + ) + val topology = createTopology("single_1_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, + ) + } + + /** + * FlowDistributor test 3: A single task transition fit to overloaded + * In this test, a single task is scheduled where the first fragment fits, but the second does not. + * For the first fragment, we expect the usage of the second fragment to be lower than the demand + * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor3() { + val workload: ArrayList = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + TraceFragment(10 * 60 * 1000, 4000.0, 1), + ), + ), + ) + val topology = createTopology("single_1_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, + ) + } + + /** + * FlowDistributor test 4: A single task transition overload to fit + * In this test, a single task is scheduled where the first fragment does not fit, and the second does. + * For the first fragment, we expect the usage of the first fragment to be lower than the demand + * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor4() { + val workload: ArrayList = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 4000.0, 1), + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + ) + val topology = createTopology("single_1_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, + ) + } + + /** + * FlowDistributor test 5: Two task, same time, both fit + * In this test, two tasks are scheduled, and they fit together on the host . The tasks start and finish at the same time + * This test shows how the FlowDistributor handles two tasks that can fit and no redistribution is required. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor5() { + val workload: ArrayList = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + TraceFragment(10 * 60 * 1000, 3000.0, 1), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 3000.0, 1), + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + ) + val topology = createTopology("single_2_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, + ) + } + + /** + * FlowDistributor test 6: Two task, same time, can not fit + * In this test, two tasks are scheduled, and they can not both fit. The tasks start and finish at the same time + * This test shows how the FlowDistributor handles two tasks that both do not fit and redistribution is required. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor6() { + val workload: ArrayList = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 6000.0, 1), + TraceFragment(10 * 60 * 1000, 5000.0, 1), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 5000.0, 1), + TraceFragment(10 * 60 * 1000, 6000.0, 1), + ), + ), + ) + val topology = createTopology("single_2_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(6000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(5000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(5000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(6000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(11000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(11000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, + ) + } + + /** + * FlowDistributor test 7: Two task, both fit, second task is delayed + * In this test, two tasks are scheduled, the second task is delayed. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor7() { + val workload: ArrayList = + arrayListOf( + createTestTask( + name = "0", + submissionTime = "2024-02-01T10:00", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + TraceFragment(10 * 60 * 1000, 2000.0, 1), + ), + ), + createTestTask( + name = "1", + submissionTime = "2024-02-01T10:05", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 2000.0, 1), + ), + ), + ) + val topology = createTopology("single_2_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(14)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(5)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(14)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, + ) + } + + /** + * FlowDistributor test 8: Two task, both fit on their own but not together, second task is delayed + * In this test, two tasks are scheduled, the second task is delayed. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + * When the second task comes in, the host is overloaded. + * This test shows how the FlowDistributor can handle redistribution when a new task comes in. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor8() { + val workload: ArrayList = + arrayListOf( + createTestTask( + name = "0", + submissionTime = "2024-02-01T10:00", + fragments = + arrayListOf( + TraceFragment(20 * 60 * 1000, 3000.0, 1), + ), + ), + createTestTask( + name = "1", + submissionTime = "2024-02-01T10:05", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1500.0, 1), + ), + ), + ) + val topology = createTopology("single_2_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(14)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuSupplied["0"]?.get(5)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuSupplied["0"]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["0"]?.get(14)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, + ) + } + + /** + * FlowDistributor test 9: Two task, one changes demand, causing overload + * In this test, two tasks are scheduled, and they can both fit. + * However, task 0 increases its demand which overloads the FlowDistributor. + * This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor9() { + val workload: ArrayList = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(5 * 60 * 1000, 1000.0, 1), + TraceFragment(5 * 60 * 1000, 1500.0, 1), + TraceFragment(5 * 60 * 1000, 2500.0, 1), + TraceFragment(5 * 60 * 1000, 1000.0, 1), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(20 * 60 * 1000, 3000.0, 1), + ), + ), + ) + val topology = createTopology("single_2_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuDemands["0"]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(14)) { "The cpu demanded is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuSupplied["0"]?.get(5)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(14)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(5)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(9)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(14)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuSupplied["1"]?.get(5)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(9)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(14)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(5500.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, + ) + } + + /** + * Obtain the topology factory for the test. + */ + private fun createTopology(name: String): List { + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/FlowDistributor/topologies/$name")) + return stream.use { clusterTopology(stream) } + } + + class TestComputeMonitor : ComputeMonitor { + var hostCpuDemands = ArrayList() + var hostCpuSupplied = ArrayList() + + override fun record(reader: HostTableReader) { + hostCpuDemands.add(reader.cpuDemand) + hostCpuSupplied.add(reader.cpuUsage) + } + + var taskCpuDemands = mutableMapOf>() + var taskCpuSupplied = mutableMapOf>() + + override fun record(reader: TaskTableReader) { + val taskName: String = reader.taskInfo.name + + if (taskName in taskCpuDemands) { + taskCpuDemands[taskName]?.add(reader.cpuDemand) + taskCpuSupplied[taskName]?.add(reader.cpuUsage) + } else { + taskCpuDemands[taskName] = arrayListOf(reader.cpuDemand) + taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage) + } + } + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/MultiplexerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/MultiplexerTest.kt deleted file mode 100644 index 049e0d32..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/MultiplexerTest.kt +++ /dev/null @@ -1,591 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.base - -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.simulator.provisioner.Provisioner -import org.opendc.compute.simulator.provisioner.registerComputeMonitor -import org.opendc.compute.simulator.provisioner.setupComputeService -import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.simulator.scheduler.FilterScheduler -import org.opendc.compute.simulator.scheduler.filters.ComputeFilter -import org.opendc.compute.simulator.scheduler.filters.RamFilter -import org.opendc.compute.simulator.scheduler.filters.VCpuFilter -import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.service.ComputeService -import org.opendc.compute.simulator.telemetry.ComputeMonitor -import org.opendc.compute.simulator.telemetry.table.HostTableReader -import org.opendc.compute.simulator.telemetry.table.TaskTableReader -import org.opendc.compute.topology.clusterTopology -import org.opendc.compute.topology.specs.ClusterSpec -import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.Task -import org.opendc.experiments.base.runner.replay -import org.opendc.simulator.compute.workload.TraceFragment -import org.opendc.simulator.compute.workload.TraceWorkload -import org.opendc.simulator.kotlin.runSimulation -import java.io.File -import java.time.Duration -import java.time.LocalDateTime -import java.time.ZoneId -import java.util.ArrayList -import java.util.UUID - -/** - * Testing suite containing tests that specifically test the Multiplexer - */ -class MultiplexerTest { - /** - * The monitor used to keep track of the metrics. - */ - private lateinit var monitor: TestComputeMonitor - - /** - * The [FilterScheduler] to use for all experiments. - */ - private lateinit var computeScheduler: FilterScheduler - - /** - * The [ComputeWorkloadLoader] responsible for loading the traces. - */ - private lateinit var workloadLoader: ComputeWorkloadLoader - - private val basePath = "src/test/resources/Multiplexer" - - /** - * Set up the experimental environment. - */ - @BeforeEach - fun setUp() { - monitor = TestComputeMonitor() - computeScheduler = - FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)), - ) - workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0) - } - - private fun createTestTask( - name: String, - cpuCount: Int = 1, - cpuCapacity: Double = 0.0, - memCapacity: Long = 0L, - submissionTime: String = "1970-01-01T00:00", - duration: Long = 0L, - fragments: ArrayList, - ): Task { - return Task( - UUID.nameUUIDFromBytes(name.toByteArray()), - name, - cpuCount, - cpuCapacity, - memCapacity, - 1800000.0, - LocalDateTime.parse(submissionTime).atZone(ZoneId.systemDefault()).toInstant(), - duration, - TraceWorkload( - fragments, - ), - ) - } - - private fun runTest( - topology: List, - workload: ArrayList, - ): TestComputeMonitor { - runSimulation { - val monitor = monitor - val seed = 0L - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload) - } - } - return monitor - } - - /** - * Multiplexer test 1: A single fitting task - * In this test, a single task is scheduled that should fit the Multiplexer - * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. - */ - @Test - fun testMultiplexer1() { - val workload: ArrayList = - arrayListOf( - createTestTask( - name = "0", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 1000.0, 1), - TraceFragment(10 * 60 * 1000, 2000.0, 1), - ), - ), - ) - val topology = createTopology("single_1_2000.json") - - monitor = runTest(topology, workload) - - assertAll( - { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, - ) - } - - /** - * Multiplexer test 2: A single overloaded task - * In this test, a single task is scheduled that does not fit the Multiplexer - * In this test we expect the usage to be lower than the demand. - * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. - */ - @Test - fun testMultiplexer2() { - val workload: ArrayList = - arrayListOf( - createTestTask( - name = "0", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 3000.0, 1), - TraceFragment(10 * 60 * 1000, 4000.0, 1), - ), - ), - ) - val topology = createTopology("single_1_2000.json") - - monitor = runTest(topology, workload) - - assertAll( - { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, - ) - } - - /** - * Multiplexer test 3: A single task transition fit to overloaded - * In this test, a single task is scheduled where the first fragment fits, but the second does not. - * For the first fragment, we expect the usage of the second fragment to be lower than the demand - * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. - */ - @Test - fun testMultiplexer3() { - val workload: ArrayList = - arrayListOf( - createTestTask( - name = "0", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 1000.0, 1), - TraceFragment(10 * 60 * 1000, 4000.0, 1), - ), - ), - ) - val topology = createTopology("single_1_2000.json") - - monitor = runTest(topology, workload) - - assertAll( - { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, - ) - } - - /** - * Multiplexer test 4: A single task transition overload to fit - * In this test, a single task is scheduled where the first fragment does not fit, and the second does. - * For the first fragment, we expect the usage of the first fragment to be lower than the demand - * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. - */ - @Test - fun testMultiplexer4() { - val workload: ArrayList = - arrayListOf( - createTestTask( - name = "0", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 4000.0, 1), - TraceFragment(10 * 60 * 1000, 1000.0, 1), - ), - ), - ) - val topology = createTopology("single_1_2000.json") - - monitor = runTest(topology, workload) - - assertAll( - { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, - ) - } - - /** - * Multiplexer test 5: Two task, same time, both fit - * In this test, two tasks are scheduled, and they fit together on the host . The tasks start and finish at the same time - * This test shows how the multiplexer handles two tasks that can fit and no redistribution is required. - * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. - */ - @Test - fun testMultiplexer5() { - val workload: ArrayList = - arrayListOf( - createTestTask( - name = "0", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 1000.0, 1), - TraceFragment(10 * 60 * 1000, 3000.0, 1), - ), - ), - createTestTask( - name = "1", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 3000.0, 1), - TraceFragment(10 * 60 * 1000, 1000.0, 1), - ), - ), - ) - val topology = createTopology("single_2_2000.json") - - monitor = runTest(topology, workload) - - assertAll( - { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, - ) - } - - /** - * Multiplexer test 6: Two task, same time, can not fit - * In this test, two tasks are scheduled, and they can not both fit. The tasks start and finish at the same time - * This test shows how the multiplexer handles two tasks that both do not fit and redistribution is required. - * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. - */ - @Test - fun testMultiplexer6() { - val workload: ArrayList = - arrayListOf( - createTestTask( - name = "0", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 6000.0, 1), - TraceFragment(10 * 60 * 1000, 5000.0, 1), - ), - ), - createTestTask( - name = "1", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 5000.0, 1), - TraceFragment(10 * 60 * 1000, 6000.0, 1), - ), - ), - ) - val topology = createTopology("single_2_2000.json") - - monitor = runTest(topology, workload) - - assertAll( - { assertEquals(6000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(5000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(5000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(6000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(11000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(11000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, - ) - } - - /** - * Multiplexer test 7: Two task, both fit, second task is delayed - * In this test, two tasks are scheduled, the second task is delayed. - * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. - */ - @Test - fun testMultiplexer7() { - val workload: ArrayList = - arrayListOf( - createTestTask( - name = "0", - submissionTime = "2024-02-01T10:00", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 1000.0, 1), - TraceFragment(10 * 60 * 1000, 2000.0, 1), - ), - ), - createTestTask( - name = "1", - submissionTime = "2024-02-01T10:05", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 2000.0, 1), - ), - ), - ) - val topology = createTopology("single_2_2000.json") - - monitor = runTest(topology, workload) - - assertAll( - { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(14)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(5)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(9)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(14)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, - ) - } - - /** - * Multiplexer test 8: Two task, both fit on their own but not together, second task is delayed - * In this test, two tasks are scheduled, the second task is delayed. - * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. - * When the second task comes in, the host is overloaded. - * This test shows how the multiplexer can handle redistribution when a new task comes in. - * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. - */ - @Test - fun testMultiplexer8() { - val workload: ArrayList = - arrayListOf( - createTestTask( - name = "0", - submissionTime = "2024-02-01T10:00", - fragments = - arrayListOf( - TraceFragment(20 * 60 * 1000, 3000.0, 1), - ), - ), - createTestTask( - name = "1", - submissionTime = "2024-02-01T10:05", - fragments = - arrayListOf( - TraceFragment(10 * 60 * 1000, 1500.0, 1), - ), - ), - ) - val topology = createTopology("single_2_2000.json") - - monitor = runTest(topology, workload) - - assertAll( - { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(14)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2500.0, monitor.taskCpuSupplied["0"]?.get(5)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2500.0, monitor.taskCpuSupplied["0"]?.get(9)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied["0"]?.get(14)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1500.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(1500.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, - ) - } - - /** - * Multiplexer test 9: Two task, one changes demand, causing overload - * In this test, two tasks are scheduled, and they can both fit. - * However, task 0 increases its demand which overloads the multiplexer. - * This test shows how the multiplexer handles transition from fitting to overloading when multiple tasks are running. - * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. - */ - @Test - fun testMultiplexer9() { - val workload: ArrayList = - arrayListOf( - createTestTask( - name = "0", - fragments = - arrayListOf( - TraceFragment(5 * 60 * 1000, 1000.0, 1), - TraceFragment(5 * 60 * 1000, 1500.0, 1), - TraceFragment(5 * 60 * 1000, 2500.0, 1), - TraceFragment(5 * 60 * 1000, 1000.0, 1), - ), - ), - createTestTask( - name = "1", - fragments = - arrayListOf( - TraceFragment(20 * 60 * 1000, 3000.0, 1), - ), - ), - ) - val topology = createTopology("single_2_2000.json") - - monitor = runTest(topology, workload) - - assertAll( - { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1500.0, monitor.taskCpuDemands["0"]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2500.0, monitor.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(14)) { "The cpu demanded is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1500.0, monitor.taskCpuSupplied["0"]?.get(5)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(9)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(14)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(5)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(9)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(14)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(2500.0, monitor.taskCpuSupplied["1"]?.get(5)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(9)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(14)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(5500.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, - ) - } - - /** - * Obtain the topology factory for the test. - */ - private fun createTopology(name: String): List { - val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/Multiplexer/topologies/$name")) - return stream.use { clusterTopology(stream) } - } - - class TestComputeMonitor : ComputeMonitor { - var hostCpuDemands = ArrayList() - var hostCpuSupplied = ArrayList() - - override fun record(reader: HostTableReader) { - hostCpuDemands.add(reader.cpuDemand) - hostCpuSupplied.add(reader.cpuUsage) - } - - var taskCpuDemands = mutableMapOf>() - var taskCpuSupplied = mutableMapOf>() - - override fun record(reader: TaskTableReader) { - val taskName: String = reader.taskInfo.name - - if (taskName in taskCpuDemands) { - taskCpuDemands[taskName]?.add(reader.cpuDemand) - taskCpuSupplied[taskName]?.add(reader.cpuUsage) - } else { - taskCpuDemands[taskName] = arrayListOf(reader.cpuDemand) - taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage) - } - } - } -} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_1_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_1_2000.json new file mode 100644 index 00000000..6790a10f --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_1_2000.json @@ -0,0 +1,22 @@ +{ + "clusters": + [ + { + "name": "C01", + "hosts" : + [ + { + "name": "H01", + "cpu": + { + "coreCount": 1, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_2_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_2_2000.json new file mode 100644 index 00000000..4bab620a --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_2_2000.json @@ -0,0 +1,22 @@ +{ + "clusters": + [ + { + "name": "C01", + "hosts" : + [ + { + "name": "H01", + "cpu": + { + "coreCount": 2, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_1_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_1_2000.json deleted file mode 100644 index 6790a10f..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_1_2000.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "clusters": - [ - { - "name": "C01", - "hosts" : - [ - { - "name": "H01", - "cpu": - { - "coreCount": 1, - "coreSpeed": 2000 - }, - "memory": { - "memorySize": 140457600000 - } - } - ] - } - ] -} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_2_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_2_2000.json deleted file mode 100644 index 4bab620a..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_2_2000.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "clusters": - [ - { - "name": "C01", - "hosts" : - [ - { - "name": "H01", - "cpu": - { - "coreCount": 2, - "coreSpeed": 2000 - }, - "memory": { - "memorySize": 140457600000 - } - } - ] - } - ] -} diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt deleted file mode 100644 index 8d8f4ef8..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import org.opendc.simulator.compute.old.SimBareMetalMachine -import org.opendc.simulator.compute.old.kernel.SimHypervisor -import org.opendc.simulator.compute.old.model.CpuModel -import org.opendc.simulator.compute.old.model.MachineModel -import org.opendc.simulator.compute.old.model.MemoryUnit -import org.opendc.simulator.compute.old.model.ProcessingNode -import org.opendc.simulator.compute.old.workload.SimTrace -import org.opendc.simulator.flow2.FlowEngine -import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory -import org.opendc.simulator.kotlin.runSimulation -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Fork -import org.openjdk.jmh.annotations.Measurement -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.Setup -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.Warmup -import java.util.SplittableRandom -import java.util.concurrent.ThreadLocalRandom -import java.util.concurrent.TimeUnit - -@State(Scope.Thread) -@Fork(1) -@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -class SimMachineBenchmarks { - private lateinit var machineModel: MachineModel - private lateinit var trace: SimTrace - - @Setup - fun setUp() { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) - - machineModel = - MachineModel( - // cpus - List(cpuNode.coreCount) { - CpuModel( - cpuNode, - it, - 1000.0, - ) - }, - // memory - List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }, - ) - - val random = ThreadLocalRandom.current() - val builder = SimTrace.builder() - repeat(1000000) { - val timestamp = it.toLong() * 1000 - val deadline = timestamp + 1000 - builder.add(deadline, random.nextDouble(0.0, 4500.0), 1) - } - trace = builder.build() - } - - @Benchmark - fun benchmarkBareMetal() { - return runSimulation { - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - val machine = SimBareMetalMachine.create(graph, machineModel) - return@runSimulation machine.runWorkload(trace.createWorkload(0)) - } - } - - @Benchmark - fun benchmarkSpaceSharedHypervisor() { - return runSimulation { - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1)) - - launch { machine.runWorkload(hypervisor) } - - val vm = hypervisor.newMachine(machineModel) - - try { - return@runSimulation vm.runWorkload(trace.createWorkload(0)) - } finally { - vm.cancel() - machine.cancel() - } - } - } - - @Benchmark - fun benchmarkFairShareHypervisorSingle() { - return runSimulation { - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - - launch { machine.runWorkload(hypervisor) } - - val vm = hypervisor.newMachine(machineModel) - - try { - return@runSimulation vm.runWorkload(trace.createWorkload(0)) - } finally { - vm.cancel() - machine.cancel() - } - } - } - - @Benchmark - fun benchmarkFairShareHypervisorDouble() { - return runSimulation { - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - - launch { machine.runWorkload(hypervisor) } - - coroutineScope { - repeat(2) { - val vm = hypervisor.newMachine(machineModel) - - launch { - try { - vm.runWorkload(trace.createWorkload(0)) - } finally { - machine.cancel() - } - } - } - } - machine.cancel() - } - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java index 63331a6c..c5b8a9ea 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java @@ -24,11 +24,11 @@ package org.opendc.simulator.compute.cpu; import org.opendc.simulator.compute.machine.PerformanceCounters; import org.opendc.simulator.compute.models.CpuModel; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimCpu} of a machine. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index 8364324a..074f0ed8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.machine; import java.time.InstantSource; import java.util.function.Consumer; -import org.opendc.simulator.Multiplexer; import org.opendc.simulator.compute.cpu.CpuPowerModel; import org.opendc.simulator.compute.cpu.SimCpu; import org.opendc.simulator.compute.memory.Memory; @@ -32,7 +31,8 @@ import org.opendc.simulator.compute.models.MachineModel; import org.opendc.simulator.compute.power.SimPsu; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.compute.workload.Workload; -import org.opendc.simulator.engine.FlowGraph; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowGraph; /** * A machine that is able to execute {@link SimWorkload} objects. @@ -44,7 +44,7 @@ public class SimMachine { private final InstantSource clock; private SimCpu cpu; - private Multiplexer cpuMux; + private FlowDistributor cpuMux; private SimPsu psu; private Memory memory; @@ -74,7 +74,7 @@ public class SimMachine { return cpu; } - public Multiplexer getCpuMux() { + public FlowDistributor getCpuMux() { return cpuMux; } @@ -114,7 +114,7 @@ public class SimMachine { public SimMachine( FlowGraph graph, MachineModel machineModel, - Multiplexer powerMux, + FlowDistributor powerMux, CpuPowerModel cpuPowerModel, Consumer completion) { this.graph = graph; @@ -132,8 +132,8 @@ public class SimMachine { this.memory = new Memory(graph, this.machineModel.getMemory()); - // Create a Multiplexer and add the cpu as supplier - this.cpuMux = new Multiplexer(this.graph); + // Create a FlowDistributor and add the cpu as supplier + this.cpuMux = new FlowDistributor(this.graph); graph.addEdge(this.cpuMux, this.cpu); this.completion = completion; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java index 15a1b1c4..b8a9c738 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java @@ -26,11 +26,11 @@ import java.util.function.Consumer; import org.opendc.simulator.compute.cpu.SimCpu; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.compute.workload.Workload; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /* A virtual Machine created to run a single workload diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java index 2656a99a..d4406b20 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.memory; import org.opendc.simulator.compute.models.MemoryUnit; -import org.opendc.simulator.engine.FlowGraph; +import org.opendc.simulator.engine.graph.FlowGraph; /** * The [SimMemory] implementation for a machine. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java index 98ef2b72..91095c01 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java @@ -23,8 +23,8 @@ package org.opendc.simulator.compute.power; import java.util.List; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; /** * CarbonModel used to provide the Carbon Intensity of a {@link SimPowerSource} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java index ea500c81..e8626e40 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java @@ -24,10 +24,10 @@ package org.opendc.simulator.compute.power; import java.util.List; import org.opendc.simulator.compute.cpu.SimCpu; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimPsu} implementation that estimates the power consumption based on CPU usage. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java index 709d3e15..c1e8a1b9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java @@ -23,11 +23,11 @@ package org.opendc.simulator.compute.power; import org.opendc.simulator.compute.cpu.SimCpu; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimPsu} implementation that estimates the power consumption based on CPU usage. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java index 78e8b5d4..ecd4c47f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload; import java.util.ArrayList; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowSupplier; public class ChainWorkload implements Workload { private ArrayList workloads; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java index 723c450d..f4f7cdd6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java @@ -29,8 +29,8 @@ package org.opendc.simulator.compute.workload; import java.time.InstantSource; import org.jetbrains.annotations.NotNull; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; public class CheckpointModel extends FlowNode { private SimWorkload simWorkload; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index 5b7c10bb..75bdde92 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -23,9 +23,9 @@ package org.opendc.simulator.compute.workload; import java.util.LinkedList; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimChainWorkload} that composes multiple {@link SimWorkload}s. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java index 59994fe6..72c095dc 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java @@ -23,11 +23,11 @@ package org.opendc.simulator.compute.workload; import java.util.LinkedList; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private LinkedList remainingFragments; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java index b5c89941..2919fc3a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java @@ -22,9 +22,9 @@ package org.opendc.simulator.compute.workload; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; /** * A model that characterizes the runtime behavior of some particular workload. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java index 39bb6111..7f82ab71 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java @@ -24,7 +24,7 @@ package org.opendc.simulator.compute.workload; import java.util.ArrayList; import java.util.List; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowSupplier; public class TraceWorkload implements Workload { private ArrayList fragments; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java index cd34921a..d85669bb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java @@ -22,7 +22,7 @@ package org.opendc.simulator.compute.workload; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowSupplier; public interface Workload { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java deleted file mode 100644 index 48177412..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator; - -import java.util.ArrayList; -import java.util.Arrays; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; - -public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer { - private final ArrayList consumerEdges = new ArrayList<>(); - private FlowEdge supplierEdge; - - private final ArrayList demands = new ArrayList<>(); // What is demanded by the consumers - private final ArrayList supplies = new ArrayList<>(); // What is supplied to the consumers - - private double totalDemand; // The total demand of all the consumers - private double totalSupply; // The total supply from the supplier - - private boolean overLoaded = false; - private int currentConsumerIdx = -1; - - private double capacity; // What is the max capacity - - public Multiplexer(FlowGraph graph) { - super(graph); - } - - public double getTotalDemand() { - return totalDemand; - } - - public double getTotalSupply() { - return totalSupply; - } - - public double getCapacity() { - return capacity; - } - - public long onUpdate(long now) { - - return Long.MAX_VALUE; - } - - private void distributeSupply() { - // if supply >= demand -> push supplies to all tasks - if (this.totalSupply > this.totalDemand) { - - // If this came from a state of over provisioning, provide all consumers with their demand - if (this.overLoaded) { - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); - } - } - - if (this.currentConsumerIdx != -1) { - this.pushSupply( - this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx)); - this.currentConsumerIdx = -1; - } - - this.overLoaded = false; - } - - // if supply < demand -> distribute the supply over all consumers - else { - this.overLoaded = true; - double[] supplies = redistributeSupply(this.demands, this.totalSupply); - - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), supplies[idx]); - } - } - } - - private record Demand(int idx, double value) {} - - private static double[] redistributeSupply(ArrayList demands, double totalSupply) { - int inputSize = demands.size(); - - final double[] supplies = new double[inputSize]; - final Demand[] tempDemands = new Demand[inputSize]; - - for (int i = 0; i < inputSize; i++) { - tempDemands[i] = new Demand(i, demands.get(i)); - } - - Arrays.sort(tempDemands, (o1, o2) -> { - Double i1 = o1.value; - Double i2 = o2.value; - return i1.compareTo(i2); - }); - - double availableCapacity = totalSupply; // totalSupply - - for (int i = 0; i < inputSize; i++) { - double d = tempDemands[i].value; - - if (d == 0.0) { - continue; - } - - double availableShare = availableCapacity / (inputSize - i); - double r = Math.min(d, availableShare); - - int idx = tempDemands[i].idx; - supplies[idx] = r; // Update the rates - availableCapacity -= r; - } - - // Return the used capacity - return supplies; - } - - /** - * Add a new consumer. - * Set its demand and supply to 0.0 - */ - @Override - public void addConsumerEdge(FlowEdge consumerEdge) { - consumerEdge.setConsumerIndex(this.consumerEdges.size()); - - this.consumerEdges.add(consumerEdge); - this.demands.add(0.0); - this.supplies.add(0.0); - } - - @Override - public void addSupplierEdge(FlowEdge supplierEdge) { - this.supplierEdge = supplierEdge; - this.capacity = supplierEdge.getCapacity(); - this.totalSupply = 0; - } - - @Override - public void removeConsumerEdge(FlowEdge consumerEdge) { - int idx = consumerEdge.getConsumerIndex(); - - if (idx == -1) { - return; - } - - this.totalDemand -= consumerEdge.getDemand(); - - this.consumerEdges.remove(idx); - this.demands.remove(idx); - this.supplies.remove(idx); - - // update the consumer index for all consumerEdges higher than this. - for (int i = idx; i < this.consumerEdges.size(); i++) { - this.consumerEdges.get(i).setConsumerIndex(i); - } - - this.currentConsumerIdx = -1; - - if (this.overLoaded) { - this.distributeSupply(); - } - - this.pushDemand(this.supplierEdge, this.totalDemand); - } - - @Override - public void removeSupplierEdge(FlowEdge supplierEdge) { - this.supplierEdge = null; - this.capacity = 0; - this.totalSupply = 0; - } - - @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { - int idx = consumerEdge.getConsumerIndex(); - - this.currentConsumerIdx = idx; - - if (idx == -1) { - System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer"); - return; - } - - // Update the total demand (This is cheaper than summing over all demands) - double prevDemand = demands.get(idx); - - demands.set(idx, newDemand); - this.totalDemand += (newDemand - prevDemand); - - if (overLoaded) { - distributeSupply(); - } - - // Send new totalDemand to CPU - // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply) - this.pushDemand(this.supplierEdge, this.totalDemand); - } - - @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.totalSupply = newSupply; - - this.distributeSupply(); - } - - @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { - this.supplierEdge.pushDemand(newDemand); - } - - @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { - int idx = consumerEdge.getConsumerIndex(); - - if (idx == -1) { - System.out.println("Error (Multiplexer): pushing supply to an unknown consumer"); - } - - if (supplies.get(idx) == newSupply) { - return; - } - - supplies.set(idx, newSupply); - consumerEdge.pushSupply(newSupply); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java deleted file mode 100644 index ddb40794..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -public interface FlowConsumer { - - void handleSupply(FlowEdge supplierEdge, double newSupply); - - void pushDemand(FlowEdge supplierEdge, double newDemand); - - void addSupplierEdge(FlowEdge supplierEdge); - - void removeSupplierEdge(FlowEdge supplierEdge); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java deleted file mode 100644 index 95fe7928..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -/** - * An edge that connects two FlowStages. - * A connection between FlowStages always consist of a FlowStage that demands - * something, and a FlowStage that Delivers something - * For instance, this could be the connection between a workload, and its machine - */ -public class FlowEdge { - private FlowConsumer consumer; - private FlowSupplier supplier; - - private int consumerIndex = -1; - private int supplierIndex = -1; - - private double demand = 0.0; - private double supply = 0.0; - - private double capacity; - - public FlowEdge(FlowConsumer consumer, FlowSupplier supplier) { - if (!(consumer instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - if (!(supplier instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - - this.consumer = consumer; - this.supplier = supplier; - - this.capacity = supplier.getCapacity(); - - this.consumer.addSupplierEdge(this); - this.supplier.addConsumerEdge(this); - } - - public void close() { - if (this.consumer != null) { - this.consumer.removeSupplierEdge(this); - this.consumer = null; - } - - if (this.supplier != null) { - this.supplier.removeConsumerEdge(this); - this.supplier = null; - } - } - - public FlowConsumer getConsumer() { - return consumer; - } - - public FlowSupplier getSupplier() { - return supplier; - } - - public double getCapacity() { - return capacity; - } - - public double getDemand() { - return this.demand; - } - - public double getSupply() { - return this.supply; - } - - public int getConsumerIndex() { - return consumerIndex; - } - - public void setConsumerIndex(int consumerIndex) { - this.consumerIndex = consumerIndex; - } - - public int getSupplierIndex() { - return supplierIndex; - } - - public void setSupplierIndex(int supplierIndex) { - this.supplierIndex = supplierIndex; - } - - /** - * Push new demand from the Consumer to the Supplier - */ - public void pushDemand(double newDemand) { - if (newDemand == this.demand) { - return; - } - - this.demand = newDemand; - this.supplier.handleDemand(this, newDemand); - } - - /** - * Push new supply from the Supplier to the Consumer - */ - public void pushSupply(double newSupply) { - if (newSupply == this.supply) { - return; - } - - this.supply = newSupply; - this.consumer.handleSupply(this, newSupply); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java deleted file mode 100644 index 10af7c51..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.time.Clock; -import java.time.InstantSource; -import kotlin.coroutines.CoroutineContext; -import org.opendc.common.Dispatcher; - -/** - * A {@link FlowEngine} simulates a generic flow network. - *

- * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation - * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. - */ -public final class FlowEngine implements Runnable { - /** - * The queue of {@link FlowNode} updates that are scheduled for immediate execution. - */ - private final FlowNodeQueue queue = new FlowNodeQueue(256); - - /** - * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. - */ - private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); - - /** - * The stack of engine invocations to occur in the future. - */ - private final InvocationStack futureInvocations = new InvocationStack(256); - - /** - * A flag to indicate that the engine is active. - */ - private boolean active; - - private final Dispatcher dispatcher; - private final InstantSource clock; - - /** - * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}. - */ - public static FlowEngine create(Dispatcher dispatcher) { - return new FlowEngine(dispatcher); - } - - FlowEngine(Dispatcher dispatcher) { - this.dispatcher = dispatcher; - this.clock = dispatcher.getTimeSource(); - } - - /** - * Obtain the (virtual) {@link Clock} driving the simulation. - */ - public InstantSource getClock() { - return clock; - } - - /** - * Return a new {@link FlowGraph} that can be used to build a flow network. - */ - public FlowGraph newGraph() { - return new FlowGraph(this); - } - - /** - * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. - *

- * This method should be used when the state of a flow context is invalidated/interrupted and needs to be - * re-computed. - */ - void scheduleImmediate(long now, FlowNode ctx) { - scheduleImmediateInContext(ctx); - - // In-case the engine is already running in the call-stack, return immediately. The changes will be picked - // up by the active engine. - if (active) { - return; - } - - trySchedule(futureInvocations, now, now); - } - - /** - * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. - *

- * This method should be used when the state of a flow context is invalidated/interrupted and needs to be - * re-computed. - *

- * This method should only be invoked while inside an engine cycle. - */ - void scheduleImmediateInContext(FlowNode ctx) { - queue.add(ctx); - } - - /** - * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. - */ - void scheduleDelayed(FlowNode ctx) { - scheduleDelayedInContext(ctx); - - // In-case the engine is already running in the call-stack, return immediately. The changes will be picked - // up by the active engine. - if (active) { - return; - } - - long deadline = timerQueue.peekDeadline(); - if (deadline != Long.MAX_VALUE) { - trySchedule(futureInvocations, clock.millis(), deadline); - } - } - - /** - * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. - *

- * This method should only be invoked while inside an engine cycle. - */ - void scheduleDelayedInContext(FlowNode ctx) { - FlowTimerQueue timerQueue = this.timerQueue; - timerQueue.enqueue(ctx); - } - - /** - * Run all the enqueued actions for the specified timestamp (now). - */ - private void doRunEngine(long now) { - final FlowNodeQueue queue = this.queue; - final FlowTimerQueue timerQueue = this.timerQueue; - - try { - // Mark the engine as active to prevent concurrent calls to this method - active = true; - - // Execute all scheduled updates at current timestamp - while (true) { - final FlowNode ctx = timerQueue.poll(now); - if (ctx == null) { - break; - } - - ctx.update(now); - } - - // Execute all immediate updates - while (true) { - final FlowNode ctx = queue.poll(); - if (ctx == null) { - break; - } - - ctx.update(now); - } - } finally { - active = false; - } - - // Schedule an engine invocation for the next update to occur. - long headDeadline = timerQueue.peekDeadline(); - if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { - trySchedule(futureInvocations, now, headDeadline); - } - } - - @Override - public void run() { - doRunEngine(futureInvocations.poll()); - } - - /** - * Try to schedule an engine invocation at the specified [target]. - * - * @param scheduled The queue of scheduled invocations. - * @param now The current virtual timestamp. - * @param target The virtual timestamp at which the engine invocation should happen. - */ - private void trySchedule(InvocationStack scheduled, long now, long target) { - // Only schedule a new scheduler invocation in case the target is earlier than all other pending - // scheduler invocations - if (scheduled.tryAdd(target)) { - dispatcher.schedule(target - now, this); - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java deleted file mode 100644 index d82b542b..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.util.ArrayList; -import java.util.HashMap; - -public class FlowGraph { - private final FlowEngine engine; - private final ArrayList nodes = new ArrayList<>(); - private final ArrayList edges = new ArrayList<>(); - private final HashMap> nodeToEdge = new HashMap<>(); - - public FlowGraph(FlowEngine engine) { - this.engine = engine; - } - - /** - * Return the {@link FlowEngine} driving the simulation of the graph. - */ - public FlowEngine getEngine() { - return engine; - } - - /** - * Create a new {@link FlowNode} representing a node in the flow network. - */ - public void addNode(FlowNode node) { - if (nodes.contains(node)) { - System.out.println("Node already exists"); - } - nodes.add(node); - nodeToEdge.put(node, new ArrayList<>()); - long now = this.engine.getClock().millis(); - node.invalidate(now); - } - - /** - * Internal method to remove the specified {@link FlowNode} from the graph. - */ - public void removeNode(FlowNode node) { - - // Remove all edges connected to node - final ArrayList connectedEdges = nodeToEdge.get(node); - while (connectedEdges.size() > 0) { - removeEdge(connectedEdges.get(0)); - } - - nodeToEdge.remove(node); - - // remove the node - nodes.remove(node); - } - - /** - * Add an edge between the specified consumer and supplier in this graph. - */ - public void addEdge(FlowConsumer flowConsumer, FlowSupplier flowSupplier) { - // Check if the consumer and supplier are both FlowNodes - if (!(flowConsumer instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - if (!(flowSupplier instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - - // Check of the consumer and supplier are present in this graph - if (!(this.nodes.contains((FlowNode) flowConsumer))) { - throw new IllegalArgumentException("The consumer is not a node in this graph"); - } - if (!(this.nodes.contains((FlowNode) flowSupplier))) { - throw new IllegalArgumentException("The consumer is not a node in this graph"); - } - - final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier); - - edges.add(flowEdge); - - nodeToEdge.get((FlowNode) flowConsumer).add(flowEdge); - nodeToEdge.get((FlowNode) flowSupplier).add(flowEdge); - } - - public void removeEdge(FlowEdge flowEdge) { - final FlowConsumer consumer = flowEdge.getConsumer(); - final FlowSupplier supplier = flowEdge.getSupplier(); - nodeToEdge.get((FlowNode) consumer).remove(flowEdge); - nodeToEdge.get((FlowNode) supplier).remove(flowEdge); - - edges.remove(flowEdge); - flowEdge.close(); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java deleted file mode 100644 index d1faf465..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.time.InstantSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@link FlowNode} represents a node in a {@link FlowGraph}. - */ -public abstract class FlowNode { - private static final Logger LOGGER = LoggerFactory.getLogger(FlowNode.class); - - protected enum NodeState { - PENDING, // Stage is active, but is not running any updates - UPDATING, // Stage is active, and running an update - INVALIDATED, // Stage is deemed invalid, and should run an update - CLOSING, // Stage is being closed, final updates can still be run - CLOSED // Stage is closed and should not run any updates - } - - protected NodeState nodeState = NodeState.PENDING; - - /** - * The deadline of the stage after which an update should run. - */ - long deadline = Long.MAX_VALUE; - - /** - * The index of the timer in the {@link FlowTimerQueue}. - */ - int timerIndex = -1; - - protected InstantSource clock; - protected FlowGraph parentGraph; - protected FlowEngine engine; - - /** - * Construct a new {@link FlowNode} instance. - * - * @param parentGraph The {@link FlowGraph} this stage belongs to. - */ - public FlowNode(FlowGraph parentGraph) { - this.parentGraph = parentGraph; - this.engine = parentGraph.getEngine(); - this.clock = engine.getClock(); - - this.parentGraph.addNode(this); - } - - /** - * Return the {@link FlowGraph} to which this stage belongs. - */ - public FlowGraph getGraph() { - return parentGraph; - } - - /** - * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch). - */ - public long getDeadline() { - return deadline; - } - - public void setDeadline(long deadline) { - this.deadline = deadline; - } - - public void setTimerIndex(int index) { - this.timerIndex = index; - } - /** - * Invalidate the {@link FlowNode} forcing the stage to update. - * - *

- * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to - * prevent having to re-query the clock. This method should not be called during an update. - */ - public void invalidate(long now) { - // If there is already an update running, - // notify the update, that a next update should be run after - if (this.nodeState == NodeState.UPDATING) { - this.nodeState = NodeState.INVALIDATED; - } else { - engine.scheduleImmediate(now, this); - } - } - - /** - * Invalidate the {@link FlowNode} forcing the stage to update. - */ - public void invalidate() { - invalidate(clock.millis()); - } - - /** - * Update the state of the stage. - */ - public void update(long now) { - this.nodeState = NodeState.UPDATING; - - long newDeadline = this.deadline; - - try { - newDeadline = this.onUpdate(now); - } catch (Exception e) { - doFail(e); - } - - // Check whether the stage is marked as closing. - if (this.nodeState == NodeState.INVALIDATED) { - newDeadline = now; - } - if (this.nodeState == NodeState.CLOSING) { - closeNode(); - return; - } - - this.deadline = newDeadline; - - // Update the timer queue with the new deadline - engine.scheduleDelayedInContext(this); - - this.nodeState = NodeState.PENDING; - } - - /** - * This method is invoked when the one of the stage's InPorts or OutPorts is invalidated. - * - * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring. - * @return The next deadline for the stage. - */ - public abstract long onUpdate(long now); - - /** - * This method is invoked when an uncaught exception is caught by the engine. When this happens, the - */ - void doFail(Throwable cause) { - LOGGER.warn("Uncaught exception (closing stage)", cause); - - closeNode(); - } - - /** - * This method is invoked when the {@link FlowNode} exits successfully or due to failure. - */ - public void closeNode() { - if (this.nodeState == NodeState.CLOSED) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed"); - return; - } - - // If this stage is running an update, notify it that is should close after. - if (this.nodeState == NodeState.UPDATING) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active"); - this.nodeState = NodeState.CLOSING; - return; - } - - // Mark the stage as closed - this.nodeState = NodeState.CLOSED; - - // Remove stage from parent graph - this.parentGraph.removeNode(this); - - // Remove stage from the timer queue - this.deadline = Long.MAX_VALUE; - this.engine.scheduleDelayedInContext(this); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java deleted file mode 100644 index 37b3c65b..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.util.ArrayDeque; -import java.util.Arrays; - -/** - * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s - * that have been updated during the engine cycle and should converge. - *

- * By using a specialized class, we reduce the overhead caused by type-erasure. - */ -final class FlowNodeQueue { - /** - * The array of elements in the queue. - */ - private FlowNode[] elements; - - private int head = 0; - private int tail = 0; - - public FlowNodeQueue(int initialCapacity) { - elements = new FlowNode[initialCapacity]; - } - - /** - * Add the specified context to the queue. - */ - void add(FlowNode ctx) { - final FlowNode[] es = elements; - int tail = this.tail; - - es[tail] = ctx; - - tail = inc(tail, es.length); - this.tail = tail; - - if (head == tail) { - doubleCapacity(); - } - } - - /** - * Remove a {@link FlowNode} from the queue or null if the queue is empty. - */ - FlowNode poll() { - final FlowNode[] es = elements; - int head = this.head; - FlowNode ctx = es[head]; - - if (ctx != null) { - es[head] = null; - this.head = inc(head, es.length); - } - - return ctx; - } - - /** - * Doubles the capacity of this deque - */ - private void doubleCapacity() { - int oldCapacity = elements.length; - int newCapacity = oldCapacity + (oldCapacity >> 1); - if (newCapacity < 0) { - throw new IllegalStateException("Sorry, deque too big"); - } - - final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity); - - // Exceptionally, here tail == head needs to be disambiguated - if (tail < head || (tail == head && es[head] != null)) { - // wrap around; slide first leg forward to end of array - int newSpace = newCapacity - oldCapacity; - System.arraycopy(es, head, es, head + newSpace, oldCapacity - head); - for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null; - } - } - - /** - * Circularly increments i, mod modulus. - * Precondition and postcondition: 0 <= i < modulus. - */ - private static int inc(int i, int modulus) { - if (++i >= modulus) i = 0; - return i; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java deleted file mode 100644 index 955f4943..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -public interface FlowSupplier { - - void handleDemand(FlowEdge consumerEdge, double newDemand); - - void pushSupply(FlowEdge consumerEdge, double newSupply); - - void addConsumerEdge(FlowEdge consumerEdge); - - void removeConsumerEdge(FlowEdge consumerEdge); - - double getCapacity(); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java deleted file mode 100644 index 1e348b10..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.util.Arrays; - -/** - * A specialized priority queue for timers of {@link FlowNode}s. - *

- * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation - * being generic. - */ -public final class FlowTimerQueue { - /** - * Array representation of binary heap of {@link FlowNode} instances. - */ - private FlowNode[] queue; - - /** - * The number of elements in the priority queue. - */ - private int size = 0; - - /** - * Construct a {@link FlowTimerQueue} with the specified initial capacity. - * - * @param initialCapacity The initial capacity of the queue. - */ - public FlowTimerQueue(int initialCapacity) { - this.queue = new FlowNode[initialCapacity]; - } - - /** - * Enqueue a timer for the specified context or update the existing timer. - */ - public void enqueue(FlowNode node) { - FlowNode[] es = queue; - int k = node.timerIndex; - - if (node.deadline != Long.MAX_VALUE) { - if (k >= 0) { - update(es, node, k); - } else { - add(es, node); - } - } else if (k >= 0) { - delete(es, k); - } - } - - /** - * Retrieve the head of the queue if its deadline does not exceed now. - * - * @param now The timestamp that the deadline of the head of the queue should not exceed. - * @return The head of the queue if its deadline does not exceed now, otherwise null. - */ - public FlowNode poll(long now) { - if (this.size == 0) { - return null; - } - - final FlowNode[] es = queue; - final FlowNode head = es[0]; - - if (now < head.deadline) { - return null; - } - - int n = size - 1; - this.size = n; - final FlowNode next = es[n]; - es[n] = null; // Clear the last element of the queue - - if (n > 0) { - siftDown(0, next, es, n); - } - - head.timerIndex = -1; - return head; - } - - /** - * Find the earliest deadline in the queue. - */ - public long peekDeadline() { - if (this.size > 0) { - return this.queue[0].deadline; - } - - return Long.MAX_VALUE; - } - - /** - * Add a new entry to the queue. - */ - private void add(FlowNode[] es, FlowNode node) { - if (this.size >= es.length) { - // Re-fetch the resized array - es = grow(); - } - - siftUp(this.size, node, es); - - this.size++; - } - - /** - * Update the deadline of an existing entry in the queue. - */ - private void update(FlowNode[] es, FlowNode node, int k) { - if (k > 0) { - int parent = (k - 1) >>> 1; - if (es[parent].deadline > node.deadline) { - siftUp(k, node, es); - return; - } - } - - siftDown(k, node, es, this.size); - } - - /** - * Deadline an entry from the queue. - */ - private void delete(FlowNode[] es, int k) { - int s = --this.size; - if (s == k) { - es[k] = null; // Element is last in the queue - } else { - FlowNode moved = es[s]; - es[s] = null; - - siftDown(k, moved, es, s); - - if (es[k] == moved) { - siftUp(k, moved, es); - } - } - } - - /** - * Increases the capacity of the array. - */ - private FlowNode[] grow() { - FlowNode[] queue = this.queue; - int oldCapacity = queue.length; - int newCapacity = oldCapacity + (oldCapacity >> 1); - - queue = Arrays.copyOf(queue, newCapacity); - this.queue = queue; - return queue; - } - - private static void siftUp(int k, FlowNode key, FlowNode[] es) { - while (k > 0) { - int parent = (k - 1) >>> 1; - FlowNode e = es[parent]; - if (key.deadline >= e.deadline) break; - es[k] = e; - e.timerIndex = k; - k = parent; - } - es[k] = key; - key.timerIndex = k; - } - - private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { - int half = n >>> 1; // loop while a non-leaf - while (k < half) { - int child = (k << 1) + 1; // assume left child is least - FlowNode c = es[child]; - int right = child + 1; - if (right < n && c.deadline > es[right].deadline) c = es[child = right]; - - if (key.deadline <= c.deadline) break; - - es[k] = c; - c.timerIndex = k; - k = child; - } - - es[k] = key; - key.timerIndex = k; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java deleted file mode 100644 index 15da2f23..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.util.Arrays; - -/** - * A specialized monotonic stack implementation for tracking the scheduled engine invocations. - *

- * By using a specialized class, we reduce the overhead caused by type-erasure. - */ -public final class InvocationStack { - /** - * The array of elements in the stack. - */ - private long[] elements; - - private int head = -1; - - public InvocationStack(int initialCapacity) { - this.elements = new long[initialCapacity]; - Arrays.fill(this.elements, Long.MIN_VALUE); - } - - /** - * Try to add the specified invocation to the monotonic stack. - * - * @param invocation The timestamp of the invocation. - * @return true if the invocation was added, false otherwise. - */ - public boolean tryAdd(long invocation) { - final long[] es = this.elements; - int head = this.head; - - if (head < 0 || es[head] > invocation) { - es[head + 1] = invocation; - this.head = head + 1; - - if (head + 2 == es.length) { - doubleCapacity(); - } - - return true; - } - - return false; - } - - /** - * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty. - */ - public long poll() { - int head = this.head--; - - if (head >= 0) { - return this.elements[head]; - } - - return Long.MAX_VALUE; - } - - /** - * Doubles the capacity of this deque - */ - private void doubleCapacity() { - int oldCapacity = this.elements.length; - int newCapacity = oldCapacity + (oldCapacity >> 1); - if (newCapacity < 0) { - throw new IllegalStateException("Sorry, deque too big"); - } - - this.elements = Arrays.copyOf(this.elements, newCapacity); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java new file mode 100644 index 00000000..1a068b40 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.time.Clock; +import java.time.InstantSource; +import kotlin.coroutines.CoroutineContext; +import org.opendc.common.Dispatcher; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; + +/** + * A {@link FlowEngine} simulates a generic flow network. + *

+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public final class FlowEngine implements Runnable { + /** + * The queue of {@link FlowNode} updates that are scheduled for immediate execution. + */ + private final FlowNodeQueue queue = new FlowNodeQueue(256); + + /** + * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. + */ + private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); + + /** + * The stack of engine invocations to occur in the future. + */ + private final InvocationStack futureInvocations = new InvocationStack(256); + + /** + * A flag to indicate that the engine is active. + */ + private boolean active; + + private final Dispatcher dispatcher; + private final InstantSource clock; + + /** + * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}. + */ + public static FlowEngine create(Dispatcher dispatcher) { + return new FlowEngine(dispatcher); + } + + FlowEngine(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + this.clock = dispatcher.getTimeSource(); + } + + /** + * Obtain the (virtual) {@link Clock} driving the simulation. + */ + public InstantSource getClock() { + return clock; + } + + /** + * Return a new {@link FlowGraph} that can be used to build a flow network. + */ + public FlowGraph newGraph() { + return new FlowGraph(this); + } + + /** + * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. + *

+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + */ + public void scheduleImmediate(long now, FlowNode ctx) { + scheduleImmediateInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + trySchedule(futureInvocations, now, now); + } + + /** + * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. + *

+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + *

+ * This method should only be invoked while inside an engine cycle. + */ + public void scheduleImmediateInContext(FlowNode ctx) { + queue.add(ctx); + } + + /** + * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. + */ + public void scheduleDelayed(FlowNode ctx) { + scheduleDelayedInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + long deadline = timerQueue.peekDeadline(); + if (deadline != Long.MAX_VALUE) { + trySchedule(futureInvocations, clock.millis(), deadline); + } + } + + /** + * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. + *

+ * This method should only be invoked while inside an engine cycle. + */ + public void scheduleDelayedInContext(FlowNode ctx) { + FlowTimerQueue timerQueue = this.timerQueue; + timerQueue.enqueue(ctx); + } + + /** + * Run all the enqueued actions for the specified timestamp (now). + */ + private void doRunEngine(long now) { + final FlowNodeQueue queue = this.queue; + final FlowTimerQueue timerQueue = this.timerQueue; + + try { + // Mark the engine as active to prevent concurrent calls to this method + active = true; + + // Execute all scheduled updates at current timestamp + while (true) { + final FlowNode ctx = timerQueue.poll(now); + if (ctx == null) { + break; + } + + ctx.update(now); + } + + // Execute all immediate updates + while (true) { + final FlowNode ctx = queue.poll(); + if (ctx == null) { + break; + } + + ctx.update(now); + } + } finally { + active = false; + } + + // Schedule an engine invocation for the next update to occur. + long headDeadline = timerQueue.peekDeadline(); + if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { + trySchedule(futureInvocations, now, headDeadline); + } + } + + @Override + public void run() { + doRunEngine(futureInvocations.poll()); + } + + /** + * Try to schedule an engine invocation at the specified [target]. + * + * @param scheduled The queue of scheduled invocations. + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + */ + private void trySchedule(InvocationStack scheduled, long now, long target) { + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (scheduled.tryAdd(target)) { + dispatcher.schedule(target - now, this); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java new file mode 100644 index 00000000..bd622083 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.util.ArrayDeque; +import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; + +/** + * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s + * that have been updated during the engine cycle and should converge. + *

+ * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +final class FlowNodeQueue { + /** + * The array of elements in the queue. + */ + private FlowNode[] elements; + + private int head = 0; + private int tail = 0; + + public FlowNodeQueue(int initialCapacity) { + elements = new FlowNode[initialCapacity]; + } + + /** + * Add the specified context to the queue. + */ + void add(FlowNode ctx) { + final FlowNode[] es = elements; + int tail = this.tail; + + es[tail] = ctx; + + tail = inc(tail, es.length); + this.tail = tail; + + if (head == tail) { + doubleCapacity(); + } + } + + /** + * Remove a {@link FlowNode} from the queue or null if the queue is empty. + */ + FlowNode poll() { + final FlowNode[] es = elements; + int head = this.head; + FlowNode ctx = es[head]; + + if (ctx != null) { + es[head] = null; + this.head = inc(head, es.length); + } + + return ctx; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity); + + // Exceptionally, here tail == head needs to be disambiguated + if (tail < head || (tail == head && es[head] != null)) { + // wrap around; slide first leg forward to end of array + int newSpace = newCapacity - oldCapacity; + System.arraycopy(es, head, es, head + newSpace, oldCapacity - head); + for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null; + } + } + + /** + * Circularly increments i, mod modulus. + * Precondition and postcondition: 0 <= i < modulus. + */ + private static int inc(int i, int modulus) { + if (++i >= modulus) i = 0; + return i; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java new file mode 100644 index 00000000..049eb40d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; + +/** + * A specialized priority queue for timers of {@link FlowNode}s. + *

+ * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation + * being generic. + */ +public final class FlowTimerQueue { + /** + * Array representation of binary heap of {@link FlowNode} instances. + */ + private FlowNode[] queue; + + /** + * The number of elements in the priority queue. + */ + private int size = 0; + + /** + * Construct a {@link FlowTimerQueue} with the specified initial capacity. + * + * @param initialCapacity The initial capacity of the queue. + */ + public FlowTimerQueue(int initialCapacity) { + this.queue = new FlowNode[initialCapacity]; + } + + /** + * Enqueue a timer for the specified context or update the existing timer. + */ + public void enqueue(FlowNode node) { + FlowNode[] es = queue; + int k = node.getTimerIndex(); + + if (node.getDeadline() != Long.MAX_VALUE) { + if (k >= 0) { + update(es, node, k); + } else { + add(es, node); + } + } else if (k >= 0) { + delete(es, k); + } + } + + /** + * Retrieve the head of the queue if its deadline does not exceed now. + * + * @param now The timestamp that the deadline of the head of the queue should not exceed. + * @return The head of the queue if its deadline does not exceed now, otherwise null. + */ + public FlowNode poll(long now) { + if (this.size == 0) { + return null; + } + + final FlowNode[] es = queue; + final FlowNode head = es[0]; + + if (now < head.getDeadline()) { + return null; + } + + int n = size - 1; + this.size = n; + final FlowNode next = es[n]; + es[n] = null; // Clear the last element of the queue + + if (n > 0) { + siftDown(0, next, es, n); + } + + head.setTimerIndex(-1); + return head; + } + + /** + * Find the earliest deadline in the queue. + */ + public long peekDeadline() { + if (this.size > 0) { + return this.queue[0].getDeadline(); + } + + return Long.MAX_VALUE; + } + + /** + * Add a new entry to the queue. + */ + private void add(FlowNode[] es, FlowNode node) { + if (this.size >= es.length) { + // Re-fetch the resized array + es = grow(); + } + + siftUp(this.size, node, es); + + this.size++; + } + + /** + * Update the deadline of an existing entry in the queue. + */ + private void update(FlowNode[] es, FlowNode node, int k) { + if (k > 0) { + int parent = (k - 1) >>> 1; + if (es[parent].getDeadline() > node.getDeadline()) { + siftUp(k, node, es); + return; + } + } + + siftDown(k, node, es, this.size); + } + + /** + * Deadline an entry from the queue. + */ + private void delete(FlowNode[] es, int k) { + int s = --this.size; + if (s == k) { + es[k] = null; // Element is last in the queue + } else { + FlowNode moved = es[s]; + es[s] = null; + + siftDown(k, moved, es, s); + + if (es[k] == moved) { + siftUp(k, moved, es); + } + } + } + + /** + * Increases the capacity of the array. + */ + private FlowNode[] grow() { + FlowNode[] queue = this.queue; + int oldCapacity = queue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + + queue = Arrays.copyOf(queue, newCapacity); + this.queue = queue; + return queue; + } + + private static void siftUp(int k, FlowNode key, FlowNode[] es) { + while (k > 0) { + int parent = (k - 1) >>> 1; + FlowNode e = es[parent]; + if (key.getDeadline() >= e.getDeadline()) break; + es[k] = e; + e.setTimerIndex(k); + k = parent; + } + es[k] = key; + key.setTimerIndex(k); + } + + private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { + int half = n >>> 1; // loop while a non-leaf + while (k < half) { + int child = (k << 1) + 1; // assume left child is least + FlowNode c = es[child]; + int right = child + 1; + if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right]; + + if (key.getDeadline() <= c.getDeadline()) break; + + es[k] = c; + c.setTimerIndex(k); + k = child; + } + + es[k] = key; + key.setTimerIndex(k); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java new file mode 100644 index 00000000..5607278c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.util.Arrays; + +/** + * A specialized monotonic stack implementation for tracking the scheduled engine invocations. + *

+ * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +public final class InvocationStack { + /** + * The array of elements in the stack. + */ + private long[] elements; + + private int head = -1; + + public InvocationStack(int initialCapacity) { + this.elements = new long[initialCapacity]; + Arrays.fill(this.elements, Long.MIN_VALUE); + } + + /** + * Try to add the specified invocation to the monotonic stack. + * + * @param invocation The timestamp of the invocation. + * @return true if the invocation was added, false otherwise. + */ + public boolean tryAdd(long invocation) { + final long[] es = this.elements; + int head = this.head; + + if (head < 0 || es[head] > invocation) { + es[head + 1] = invocation; + this.head = head + 1; + + if (head + 2 == es.length) { + doubleCapacity(); + } + + return true; + } + + return false; + } + + /** + * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty. + */ + public long poll() { + int head = this.head--; + + if (head >= 0) { + return this.elements[head]; + } + + return Long.MAX_VALUE; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = this.elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + this.elements = Arrays.copyOf(this.elements, newCapacity); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java new file mode 100644 index 00000000..2130d376 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +public interface FlowConsumer { + + void handleSupply(FlowEdge supplierEdge, double newSupply); + + void pushDemand(FlowEdge supplierEdge, double newDemand); + + void addSupplierEdge(FlowEdge supplierEdge); + + void removeSupplierEdge(FlowEdge supplierEdge); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java new file mode 100644 index 00000000..7ef091f8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -0,0 +1,248 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +import java.util.ArrayList; +import java.util.Arrays; + +public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { + private final ArrayList consumerEdges = new ArrayList<>(); + private FlowEdge supplierEdge; + + private final ArrayList demands = new ArrayList<>(); // What is demanded by the consumers + private final ArrayList supplies = new ArrayList<>(); // What is supplied to the consumers + + private double totalDemand; // The total demand of all the consumers + private double totalSupply; // The total supply from the supplier + + private boolean overLoaded = false; + private int currentConsumerIdx = -1; + + private double capacity; // What is the max capacity + + public FlowDistributor(FlowGraph graph) { + super(graph); + } + + public double getTotalDemand() { + return totalDemand; + } + + public double getTotalSupply() { + return totalSupply; + } + + public double getCapacity() { + return capacity; + } + + public long onUpdate(long now) { + + return Long.MAX_VALUE; + } + + private void distributeSupply() { + // if supply >= demand -> push supplies to all tasks + if (this.totalSupply > this.totalDemand) { + + // If this came from a state of overload, provide all consumers with their demand + if (this.overLoaded) { + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); + } + } + + if (this.currentConsumerIdx != -1) { + this.pushSupply( + this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx)); + this.currentConsumerIdx = -1; + } + + this.overLoaded = false; + } + + // if supply < demand -> distribute the supply over all consumers + else { + this.overLoaded = true; + double[] supplies = redistributeSupply(this.demands, this.totalSupply); + + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushSupply(this.consumerEdges.get(idx), supplies[idx]); + } + } + } + + private record Demand(int idx, double value) {} + + /** + * Distributed the available supply over the different demands. + * The supply is distributed using MaxMin Fairness. + * + * TODO: Move this outside of the Distributor so we can easily add different redistribution methods + */ + private static double[] redistributeSupply(ArrayList demands, double totalSupply) { + int inputSize = demands.size(); + + final double[] supplies = new double[inputSize]; + final Demand[] tempDemands = new Demand[inputSize]; + + for (int i = 0; i < inputSize; i++) { + tempDemands[i] = new Demand(i, demands.get(i)); + } + + Arrays.sort(tempDemands, (o1, o2) -> { + Double i1 = o1.value; + Double i2 = o2.value; + return i1.compareTo(i2); + }); + + double availableCapacity = totalSupply; // totalSupply + + for (int i = 0; i < inputSize; i++) { + double d = tempDemands[i].value; + + if (d == 0.0) { + continue; + } + + double availableShare = availableCapacity / (inputSize - i); + double r = Math.min(d, availableShare); + + int idx = tempDemands[i].idx; + supplies[idx] = r; // Update the rates + availableCapacity -= r; + } + + // Return the used capacity + return supplies; + } + + /** + * Add a new consumer. + * Set its demand and supply to 0.0 + */ + @Override + public void addConsumerEdge(FlowEdge consumerEdge) { + consumerEdge.setConsumerIndex(this.consumerEdges.size()); + + this.consumerEdges.add(consumerEdge); + this.demands.add(0.0); + this.supplies.add(0.0); + } + + @Override + public void addSupplierEdge(FlowEdge supplierEdge) { + this.supplierEdge = supplierEdge; + this.capacity = supplierEdge.getCapacity(); + this.totalSupply = 0; + } + + @Override + public void removeConsumerEdge(FlowEdge consumerEdge) { + int idx = consumerEdge.getConsumerIndex(); + + if (idx == -1) { + return; + } + + this.totalDemand -= consumerEdge.getDemand(); + + this.consumerEdges.remove(idx); + this.demands.remove(idx); + this.supplies.remove(idx); + + // update the consumer index for all consumerEdges higher than this. + for (int i = idx; i < this.consumerEdges.size(); i++) { + this.consumerEdges.get(i).setConsumerIndex(i); + } + + this.currentConsumerIdx = -1; + + if (this.overLoaded) { + this.distributeSupply(); + } + + this.pushDemand(this.supplierEdge, this.totalDemand); + } + + @Override + public void removeSupplierEdge(FlowEdge supplierEdge) { + this.supplierEdge = null; + this.capacity = 0; + this.totalSupply = 0; + } + + @Override + public void handleDemand(FlowEdge consumerEdge, double newDemand) { + int idx = consumerEdge.getConsumerIndex(); + + this.currentConsumerIdx = idx; + + if (idx == -1) { + System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); + return; + } + + // Update the total demand (This is cheaper than summing over all demands) + double prevDemand = demands.get(idx); + + demands.set(idx, newDemand); + this.totalDemand += (newDemand - prevDemand); + + if (overLoaded) { + distributeSupply(); + } + + // Send new totalDemand to CPU + // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply) + this.pushDemand(this.supplierEdge, this.totalDemand); + } + + @Override + public void handleSupply(FlowEdge supplierEdge, double newSupply) { + this.totalSupply = newSupply; + + this.distributeSupply(); + } + + @Override + public void pushDemand(FlowEdge supplierEdge, double newDemand) { + this.supplierEdge.pushDemand(newDemand); + } + + @Override + public void pushSupply(FlowEdge consumerEdge, double newSupply) { + int idx = consumerEdge.getConsumerIndex(); + + if (idx == -1) { + System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer"); + } + + if (supplies.get(idx) == newSupply) { + return; + } + + supplies.set(idx, newSupply); + consumerEdge.pushSupply(newSupply); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java new file mode 100644 index 00000000..b7162508 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +/** + * An edge that connects two FlowStages. + * A connection between FlowStages always consist of a FlowStage that demands + * something, and a FlowStage that Delivers something + * For instance, this could be the connection between a workload, and its machine + */ +public class FlowEdge { + private FlowConsumer consumer; + private FlowSupplier supplier; + + private int consumerIndex = -1; + private int supplierIndex = -1; + + private double demand = 0.0; + private double supply = 0.0; + + private double capacity; + + public FlowEdge(FlowConsumer consumer, FlowSupplier supplier) { + if (!(consumer instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + if (!(supplier instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + + this.consumer = consumer; + this.supplier = supplier; + + this.capacity = supplier.getCapacity(); + + this.consumer.addSupplierEdge(this); + this.supplier.addConsumerEdge(this); + } + + public void close() { + if (this.consumer != null) { + this.consumer.removeSupplierEdge(this); + this.consumer = null; + } + + if (this.supplier != null) { + this.supplier.removeConsumerEdge(this); + this.supplier = null; + } + } + + public FlowConsumer getConsumer() { + return consumer; + } + + public FlowSupplier getSupplier() { + return supplier; + } + + public double getCapacity() { + return capacity; + } + + public double getDemand() { + return this.demand; + } + + public double getSupply() { + return this.supply; + } + + public int getConsumerIndex() { + return consumerIndex; + } + + public void setConsumerIndex(int consumerIndex) { + this.consumerIndex = consumerIndex; + } + + public int getSupplierIndex() { + return supplierIndex; + } + + public void setSupplierIndex(int supplierIndex) { + this.supplierIndex = supplierIndex; + } + + /** + * Push new demand from the Consumer to the Supplier + */ + public void pushDemand(double newDemand) { + if (newDemand == this.demand) { + return; + } + + this.demand = newDemand; + this.supplier.handleDemand(this, newDemand); + } + + /** + * Push new supply from the Supplier to the Consumer + */ + public void pushSupply(double newSupply) { + if (newSupply == this.supply) { + return; + } + + this.supply = newSupply; + this.consumer.handleSupply(this, newSupply); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java new file mode 100644 index 00000000..0e6e137c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +import java.util.ArrayList; +import java.util.HashMap; +import org.opendc.simulator.engine.engine.FlowEngine; + +public class FlowGraph { + private final FlowEngine engine; + private final ArrayList nodes = new ArrayList<>(); + private final ArrayList edges = new ArrayList<>(); + private final HashMap> nodeToEdge = new HashMap<>(); + + public FlowGraph(FlowEngine engine) { + this.engine = engine; + } + + /** + * Return the {@link FlowEngine} driving the simulation of the graph. + */ + public FlowEngine getEngine() { + return engine; + } + + /** + * Create a new {@link FlowNode} representing a node in the flow network. + */ + public void addNode(FlowNode node) { + if (nodes.contains(node)) { + System.out.println("Node already exists"); + } + nodes.add(node); + nodeToEdge.put(node, new ArrayList<>()); + long now = this.engine.getClock().millis(); + node.invalidate(now); + } + + /** + * Internal method to remove the specified {@link FlowNode} from the graph. + */ + public void removeNode(FlowNode node) { + + // Remove all edges connected to node + final ArrayList connectedEdges = nodeToEdge.get(node); + while (connectedEdges.size() > 0) { + removeEdge(connectedEdges.get(0)); + } + + nodeToEdge.remove(node); + + // remove the node + nodes.remove(node); + } + + /** + * Add an edge between the specified consumer and supplier in this graph. + */ + public void addEdge(FlowConsumer flowConsumer, FlowSupplier flowSupplier) { + // Check if the consumer and supplier are both FlowNodes + if (!(flowConsumer instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + if (!(flowSupplier instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + + // Check of the consumer and supplier are present in this graph + if (!(this.nodes.contains((FlowNode) flowConsumer))) { + throw new IllegalArgumentException("The consumer is not a node in this graph"); + } + if (!(this.nodes.contains((FlowNode) flowSupplier))) { + throw new IllegalArgumentException("The consumer is not a node in this graph"); + } + + final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier); + + edges.add(flowEdge); + + nodeToEdge.get((FlowNode) flowConsumer).add(flowEdge); + nodeToEdge.get((FlowNode) flowSupplier).add(flowEdge); + } + + public void removeEdge(FlowEdge flowEdge) { + final FlowConsumer consumer = flowEdge.getConsumer(); + final FlowSupplier supplier = flowEdge.getSupplier(); + nodeToEdge.get((FlowNode) consumer).remove(flowEdge); + nodeToEdge.get((FlowNode) supplier).remove(flowEdge); + + edges.remove(flowEdge); + flowEdge.close(); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java new file mode 100644 index 00000000..6ee947bc --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +import java.time.InstantSource; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.engine.FlowTimerQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link FlowNode} represents a node in a {@link FlowGraph}. + */ +public abstract class FlowNode { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowNode.class); + + protected enum NodeState { + PENDING, // Stage is active, but is not running any updates + UPDATING, // Stage is active, and running an update + INVALIDATED, // Stage is deemed invalid, and should run an update + CLOSING, // Stage is being closed, final updates can still be run + CLOSED // Stage is closed and should not run any updates + } + + protected NodeState nodeState = NodeState.PENDING; + + public NodeState getNodeState() { + return nodeState; + } + + public void setNodeState(NodeState nodeState) { + this.nodeState = nodeState; + } + + public int getTimerIndex() { + return timerIndex; + } + + public void setTimerIndex(int index) { + this.timerIndex = index; + } + + public InstantSource getClock() { + return clock; + } + + public void setClock(InstantSource clock) { + this.clock = clock; + } + + public FlowGraph getParentGraph() { + return parentGraph; + } + + public void setParentGraph(FlowGraph parentGraph) { + this.parentGraph = parentGraph; + } + + public FlowEngine getEngine() { + return engine; + } + + public void setEngine(FlowEngine engine) { + this.engine = engine; + } + + /** + * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch). + */ + public long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } + + /** + * The deadline of the stage after which an update should run. + */ + private long deadline = Long.MAX_VALUE; + + /** + * The index of the timer in the {@link FlowTimerQueue}. + */ + private int timerIndex = -1; + + protected InstantSource clock; + protected FlowGraph parentGraph; + protected FlowEngine engine; + + /** + * Return the {@link FlowGraph} to which this stage belongs. + */ + public FlowGraph getGraph() { + return parentGraph; + } + + /** + * Construct a new {@link FlowNode} instance. + * + * @param parentGraph The {@link FlowGraph} this stage belongs to. + */ + public FlowNode(FlowGraph parentGraph) { + this.parentGraph = parentGraph; + this.engine = parentGraph.getEngine(); + this.clock = engine.getClock(); + + this.parentGraph.addNode(this); + } + + /** + * Invalidate the {@link FlowNode} forcing the stage to update. + * + *

+ * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to + * prevent having to re-query the clock. This method should not be called during an update. + */ + public void invalidate(long now) { + // If there is already an update running, + // notify the update, that a next update should be run after + if (this.nodeState == NodeState.UPDATING) { + this.nodeState = NodeState.INVALIDATED; + } else { + engine.scheduleImmediate(now, this); + } + } + + /** + * Invalidate the {@link FlowNode} forcing the stage to update. + */ + public void invalidate() { + invalidate(clock.millis()); + } + + /** + * Update the state of the stage. + */ + public void update(long now) { + this.nodeState = NodeState.UPDATING; + + long newDeadline = this.deadline; + + try { + newDeadline = this.onUpdate(now); + } catch (Exception e) { + doFail(e); + } + + // Check whether the stage is marked as closing. + if (this.nodeState == NodeState.INVALIDATED) { + newDeadline = now; + } + if (this.nodeState == NodeState.CLOSING) { + closeNode(); + return; + } + + this.deadline = newDeadline; + + // Update the timer queue with the new deadline + engine.scheduleDelayedInContext(this); + + this.nodeState = NodeState.PENDING; + } + + /** + * This method is invoked when the one of the stage's InPorts or OutPorts is invalidated. + * + * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring. + * @return The next deadline for the stage. + */ + public abstract long onUpdate(long now); + + /** + * This method is invoked when an uncaught exception is caught by the engine. When this happens, the + */ + void doFail(Throwable cause) { + LOGGER.warn("Uncaught exception (closing stage)", cause); + + closeNode(); + } + + /** + * This method is invoked when the {@link FlowNode} exits successfully or due to failure. + */ + public void closeNode() { + if (this.nodeState == NodeState.CLOSED) { + // LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed"); + return; + } + + // If this stage is running an update, notify it that is should close after. + if (this.nodeState == NodeState.UPDATING) { + // LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active"); + this.nodeState = NodeState.CLOSING; + return; + } + + // Mark the stage as closed + this.nodeState = NodeState.CLOSED; + + // Remove stage from parent graph + this.parentGraph.removeNode(this); + + // Remove stage from the timer queue + this.deadline = Long.MAX_VALUE; + this.engine.scheduleDelayedInContext(this); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java new file mode 100644 index 00000000..84602ee0 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +public interface FlowSupplier { + + void handleDemand(FlowEdge consumerEdge, double newDemand); + + void pushSupply(FlowEdge consumerEdge, double newSupply); + + void addConsumerEdge(FlowEdge consumerEdge); + + void removeConsumerEdge(FlowEdge consumerEdge); + + double getCapacity(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt index 7744d7b2..4dd17dbe 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test -import org.opendc.simulator.engine.InvocationStack +import org.opendc.simulator.engine.engine.InvocationStack /** * Test suite for the [InvocationStack] class. -- cgit v1.2.3