summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-15 14:46:19 +0200
committerGitHub <noreply@github.com>2021-04-15 14:46:19 +0200
commitf586769b37994628fbdc7ab0756b1b96650fca10 (patch)
treea5c74ae02a812e050f4feec778f274705ce66076
parent890dd87376f0d131292e3cdc685ab13192d11634 (diff)
parente68046465d39611259fa13952add731e80ea7331 (diff)
Fix issues with web runner (v2) (#118)
This is a second pull request to address several issues that were present in the web runner and the associated experiments: * Re-use topology across repeats * Disallow re-use of `SimTraceWorkload` * Construct new `SimTraceWorkload` for each simulation run
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt4
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt15
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt16
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt4
4 files changed, 24 insertions, 15 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 997eba0c..763234f8 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -41,6 +41,7 @@ import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FaultInjector
@@ -261,6 +262,7 @@ public suspend fun processTrace(
delay(max(0, (entry.start - offset) - clock.millis()))
launch {
chan.send(Unit)
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace)
val server = client.newServer(
entry.name,
image,
@@ -269,7 +271,7 @@ public suspend fun processTrace(
entry.meta["cores"] as Int,
entry.meta["required-memory"] as Long
),
- meta = entry.meta
+ meta = entry.meta + mapOf("workload" to workload)
)
suspendCancellableCoroutine { cont ->
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index a368dfee..31f7876e 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -31,7 +31,6 @@ import com.mongodb.MongoClientSettings
import com.mongodb.MongoCredential
import com.mongodb.ServerAddress
import com.mongodb.client.MongoClients
-import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
import io.opentelemetry.api.metrics.MeterProvider
@@ -51,6 +50,7 @@ import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
+import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.telemetry.sdk.toOtelClock
@@ -159,7 +159,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
/**
* Run a single scenario.
*/
- private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>): List<WebExperimentMonitor.Result> {
+ private suspend fun runScenario(portfolio: Document, scenario: Document, topologyParser: TopologyParser): List<WebExperimentMonitor.Result> {
val id = scenario.getObjectId("_id")
logger.info { "Constructing performance interference model" }
@@ -182,11 +182,13 @@ public class RunnerCli : CliktCommand(name = "runner") {
}
val targets = portfolio.get("targets", Document::class.java)
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
+ val environment = topologyParser.read(topologyId)
val results = (0 until targets.getInteger("repeatsPerScenario")).map {
logger.info { "Starting repeat $it" }
withTimeout(runTimeout * 1000) {
- runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader)
+ runRepeat(scenario, it, environment, traceReader, performanceInterferenceReader)
}
}
@@ -201,7 +203,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
private suspend fun runRepeat(
scenario: Document,
repeat: Int,
- topologies: MongoCollection<Document>,
+ environment: EnvironmentReader,
traceReader: Sc20RawParquetTraceReader,
performanceInterferenceReader: Sc20PerformanceInterferenceReader?
): WebExperimentMonitor.Result {
@@ -274,8 +276,6 @@ public class RunnerCli : CliktCommand(name = "runner") {
Workload(workloadName, workloadFraction),
seed
)
- val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
- val environment = TopologyParser(topologies, topologyId)
val failureFrequency = if (operational.getBoolean("failuresEnabled", false)) 24.0 * 7 else 0.0
withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
@@ -326,6 +326,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
val manager = ScenarioManager(database.getCollection("scenarios"))
val portfolios = database.getCollection("portfolios")
val topologies = database.getCollection("topologies")
+ val topologyParser = TopologyParser(topologies)
logger.info { "Watching for queued scenarios" }
@@ -357,7 +358,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
try {
val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!!
- val results = runScenario(portfolio, scenario, topologies)
+ val results = runScenario(portfolio, scenario, topologyParser)
logger.info { "Writing results to database" }
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
index 1683cdb8..2dd63340 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
@@ -42,9 +42,12 @@ import java.util.*
/**
* A helper class that converts the MongoDB topology into an OpenDC environment.
*/
-public class TopologyParser(private val collection: MongoCollection<Document>, private val id: ObjectId) : EnvironmentReader {
+public class TopologyParser(private val collection: MongoCollection<Document>) {
- public override fun read(): List<MachineDef> {
+ /**
+ * Parse the topology from the specified [id].
+ */
+ public fun read(id: ObjectId): EnvironmentReader {
val nodes = mutableListOf<MachineDef>()
val random = Random(0)
@@ -78,16 +81,17 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p
"node-$clusterId-$position",
mapOf("cluster" to clusterId),
SimMachineModel(processors, memoryUnits),
- LinearPowerModel(2 * energyConsumptionW, .5)
+ LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
)
)
}
- return nodes
+ return object : EnvironmentReader {
+ override fun read(): List<MachineDef> = nodes
+ override fun close() {}
+ }
}
- override fun close() {}
-
/**
* Fetch the metadata of the topology.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 694a928b..ffb332d1 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -34,12 +34,14 @@ import org.opendc.simulator.resources.consumer.SimConsumerBarrier
* consumption for some period of time.
*/
public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkload {
- private var offset = 0L
+ private var offset = Long.MIN_VALUE
private val iterator = trace.iterator()
private var fragment: Fragment? = null
private lateinit var barrier: SimConsumerBarrier
override fun onStart(ctx: SimMachineContext) {
+ check(offset == Long.MIN_VALUE) { "Workload does not support re-use" }
+
barrier = SimConsumerBarrier(ctx.cpus.size)
fragment = nextFragment()
offset = ctx.clock.millis()