summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-10-29 16:45:21 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-10-29 16:45:21 +0100
commit548e9c0f1c87afe92579d518c31b81ef29c3ab33 (patch)
treef0a96c6eb00743e4bcd445b57c41bcdd21117795
parenta4d2f94f950f60a7960e008d26e099f639acfe9a (diff)
Add support for ObjectId in simulator
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt11
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt3
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt9
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt9
4 files changed, 18 insertions, 14 deletions
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 26577ef2..d21d000b 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -38,6 +38,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
import org.bson.Document
+import org.bson.types.ObjectId
import org.opendc.compute.simulator.allocation.*
import org.opendc.experiments.sc20.experiment.attachMonitor
import org.opendc.experiments.sc20.experiment.createFailureDomain
@@ -165,7 +166,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
* Run a single scenario.
*/
private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) {
- val id = scenario.getString("_id")
+ val id = scenario.getObjectId("_id")
logger.info { "Constructing performance interference model" }
@@ -206,7 +207,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
traceReader: Sc20RawParquetTraceReader,
performanceInterferenceReader: Sc20PerformanceInterferenceReader?
) {
- val id = scenario.getString("_id")
+ val id = scenario.getObjectId("_id")
val seed = repeat
val traceDocument = scenario.get("trace", Document::class.java)
val workloadName = traceDocument.getString("traceId")
@@ -240,7 +241,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
Workload(workloadName, workloadFraction),
seed
)
- val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java)
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
val environment = TopologyParser(topologies, topologyId)
val monitor = ParquetExperimentMonitor(
outputPath,
@@ -321,7 +322,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
continue
}
- val id = scenario.getString("_id")
+ val id = scenario.getObjectId("_id")
logger.info { "Found queued scenario $id: attempting to claim" }
@@ -340,7 +341,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
}
try {
- val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!!
+ val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!!
runScenario(portfolio, scenario, topologies)
logger.info { "Starting result processing" }
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt
index f9a3cd2b..979e5d3c 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt
@@ -27,6 +27,7 @@ import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.*
+import org.bson.types.ObjectId
import java.io.File
/**
@@ -36,7 +37,7 @@ public class ResultProcessor(private val master: String, private val outputPath:
/**
* Process the results of the scenario with the given [id].
*/
- public fun process(id: String): Result {
+ public fun process(id: ObjectId): Result {
val spark = SparkSession.builder()
.master(master)
.appName("opendc-simulator-$id")
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt
index 504fccdc..14f66757 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt
@@ -26,6 +26,7 @@ import com.mongodb.client.MongoCollection
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import org.bson.Document
+import org.bson.types.ObjectId
import java.time.Instant
/**
@@ -44,7 +45,7 @@ public class ScenarioManager(private val collection: MongoCollection<Document>)
/**
* Claim the scenario in the database with the specified id.
*/
- public fun claim(id: String): Boolean {
+ public fun claim(id: ObjectId): Boolean {
val res = collection.findOneAndUpdate(
Filters.and(
Filters.eq("_id", id),
@@ -61,7 +62,7 @@ public class ScenarioManager(private val collection: MongoCollection<Document>)
/**
* Update the heartbeat of the specified scenario.
*/
- public fun heartbeat(id: String) {
+ public fun heartbeat(id: ObjectId) {
collection.findOneAndUpdate(
Filters.and(
Filters.eq("_id", id),
@@ -74,7 +75,7 @@ public class ScenarioManager(private val collection: MongoCollection<Document>)
/**
* Mark the scenario as failed.
*/
- public fun fail(id: String) {
+ public fun fail(id: ObjectId) {
collection.findOneAndUpdate(
Filters.eq("_id", id),
Updates.combine(
@@ -87,7 +88,7 @@ public class ScenarioManager(private val collection: MongoCollection<Document>)
/**
* Persist the specified results.
*/
- public fun finish(id: String, result: ResultProcessor.Result) {
+ public fun finish(id: ObjectId, result: ResultProcessor.Result) {
collection.findOneAndUpdate(
Filters.eq("_id", id),
Updates.combine(
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
index 5e483271..0d1e96e6 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
@@ -31,6 +31,7 @@ import com.mongodb.client.model.Projections
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.bson.Document
+import org.bson.types.ObjectId
import org.opendc.compute.core.metal.NODE_CLUSTER
import org.opendc.compute.core.metal.service.ProvisioningService
import org.opendc.compute.core.metal.service.SimpleProvisioningService
@@ -51,7 +52,7 @@ import java.util.*
/**
* A helper class that converts the MongoDB topology into an OpenDC environment.
*/
-public class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader {
+public class TopologyParser(private val collection: MongoCollection<Document>, private val id: ObjectId) : EnvironmentReader {
/**
* Parse the topology with the specified [id].
*/
@@ -60,7 +61,7 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p
val random = Random(0)
for (machine in fetchMachines(id)) {
- val clusterId = machine.getString("rack_id")
+ val clusterId = machine.getObjectId("rack_id")
val position = machine.getInteger("position")
val processors = machine.getList("cpus", Document::class.java).flatMap { cpu ->
@@ -121,7 +122,7 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p
/**
* Fetch the metadata of the topology.
*/
- private fun fetchName(id: String): String {
+ private fun fetchName(id: ObjectId): String {
return collection.aggregate(
listOf(
Aggregates.match(Filters.eq("_id", id)),
@@ -135,7 +136,7 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p
/**
* Fetch a topology from the database with the specified [id].
*/
- private fun fetchMachines(id: String): AggregateIterable<Document> {
+ private fun fetchMachines(id: ObjectId): AggregateIterable<Document> {
return collection.aggregate(
listOf(
Aggregates.match(Filters.eq("_id", id)),