diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-29 16:45:21 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-29 16:45:21 +0100 |
| commit | 548e9c0f1c87afe92579d518c31b81ef29c3ab33 (patch) | |
| tree | f0a96c6eb00743e4bcd445b57c41bcdd21117795 | |
| parent | a4d2f94f950f60a7960e008d26e099f639acfe9a (diff) | |
Add support for ObjectId in simulator
4 files changed, 18 insertions, 14 deletions
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 26577ef2..d21d000b 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -38,6 +38,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging import org.bson.Document +import org.bson.types.ObjectId import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.sc20.experiment.attachMonitor import org.opendc.experiments.sc20.experiment.createFailureDomain @@ -165,7 +166,7 @@ public class RunnerCli : CliktCommand(name = "runner") { * Run a single scenario. */ private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) { - val id = scenario.getString("_id") + val id = scenario.getObjectId("_id") logger.info { "Constructing performance interference model" } @@ -206,7 +207,7 @@ public class RunnerCli : CliktCommand(name = "runner") { traceReader: Sc20RawParquetTraceReader, performanceInterferenceReader: Sc20PerformanceInterferenceReader? ) { - val id = scenario.getString("_id") + val id = scenario.getObjectId("_id") val seed = repeat val traceDocument = scenario.get("trace", Document::class.java) val workloadName = traceDocument.getString("traceId") @@ -240,7 +241,7 @@ public class RunnerCli : CliktCommand(name = "runner") { Workload(workloadName, workloadFraction), seed ) - val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = TopologyParser(topologies, topologyId) val monitor = ParquetExperimentMonitor( outputPath, @@ -321,7 +322,7 @@ public class RunnerCli : CliktCommand(name = "runner") { continue } - val id = scenario.getString("_id") + val id = scenario.getObjectId("_id") logger.info { "Found queued scenario $id: attempting to claim" } @@ -340,7 +341,7 @@ public class RunnerCli : CliktCommand(name = "runner") { } try { - val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! + val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!! runScenario(portfolio, scenario, topologies) logger.info { "Starting result processing" } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt index f9a3cd2b..979e5d3c 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt @@ -27,6 +27,7 @@ import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.* +import org.bson.types.ObjectId import java.io.File /** @@ -36,7 +37,7 @@ public class ResultProcessor(private val master: String, private val outputPath: /** * Process the results of the scenario with the given [id]. */ - public fun process(id: String): Result { + public fun process(id: ObjectId): Result { val spark = SparkSession.builder() .master(master) .appName("opendc-simulator-$id") diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt index 504fccdc..14f66757 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt @@ -26,6 +26,7 @@ import com.mongodb.client.MongoCollection import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import org.bson.Document +import org.bson.types.ObjectId import java.time.Instant /** @@ -44,7 +45,7 @@ public class ScenarioManager(private val collection: MongoCollection<Document>) /** * Claim the scenario in the database with the specified id. */ - public fun claim(id: String): Boolean { + public fun claim(id: ObjectId): Boolean { val res = collection.findOneAndUpdate( Filters.and( Filters.eq("_id", id), @@ -61,7 +62,7 @@ public class ScenarioManager(private val collection: MongoCollection<Document>) /** * Update the heartbeat of the specified scenario. */ - public fun heartbeat(id: String) { + public fun heartbeat(id: ObjectId) { collection.findOneAndUpdate( Filters.and( Filters.eq("_id", id), @@ -74,7 +75,7 @@ public class ScenarioManager(private val collection: MongoCollection<Document>) /** * Mark the scenario as failed. */ - public fun fail(id: String) { + public fun fail(id: ObjectId) { collection.findOneAndUpdate( Filters.eq("_id", id), Updates.combine( @@ -87,7 +88,7 @@ public class ScenarioManager(private val collection: MongoCollection<Document>) /** * Persist the specified results. */ - public fun finish(id: String, result: ResultProcessor.Result) { + public fun finish(id: ObjectId, result: ResultProcessor.Result) { collection.findOneAndUpdate( Filters.eq("_id", id), Updates.combine( diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index 5e483271..0d1e96e6 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -31,6 +31,7 @@ import com.mongodb.client.model.Projections import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document +import org.bson.types.ObjectId import org.opendc.compute.core.metal.NODE_CLUSTER import org.opendc.compute.core.metal.service.ProvisioningService import org.opendc.compute.core.metal.service.SimpleProvisioningService @@ -51,7 +52,7 @@ import java.util.* /** * A helper class that converts the MongoDB topology into an OpenDC environment. */ -public class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader { +public class TopologyParser(private val collection: MongoCollection<Document>, private val id: ObjectId) : EnvironmentReader { /** * Parse the topology with the specified [id]. */ @@ -60,7 +61,7 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p val random = Random(0) for (machine in fetchMachines(id)) { - val clusterId = machine.getString("rack_id") + val clusterId = machine.getObjectId("rack_id") val position = machine.getInteger("position") val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> @@ -121,7 +122,7 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p /** * Fetch the metadata of the topology. */ - private fun fetchName(id: String): String { + private fun fetchName(id: ObjectId): String { return collection.aggregate( listOf( Aggregates.match(Filters.eq("_id", id)), @@ -135,7 +136,7 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p /** * Fetch a topology from the database with the specified [id]. */ - private fun fetchMachines(id: String): AggregateIterable<Document> { + private fun fetchMachines(id: ObjectId): AggregateIterable<Document> { return collection.aggregate( listOf( Aggregates.match(Filters.eq("_id", id)), |
