summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-base/build.gradle.kts2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt18
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt4
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt11
4 files changed, 14 insertions, 21 deletions
diff --git a/opendc-experiments/opendc-experiments-base/build.gradle.kts b/opendc-experiments/opendc-experiments-base/build.gradle.kts
index bac04854..76de6a3c 100644
--- a/opendc-experiments/opendc-experiments-base/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-base/build.gradle.kts
@@ -85,4 +85,6 @@ distributions {
}
repositories {
mavenCentral()
+ // @Mateusz crucial to include this for kafka proto
+ maven(url = "https://packages.confluent.io/maven/")
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
index 874f5654..96071833 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
@@ -30,6 +30,7 @@ import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.types.file
import org.opendc.common.utils.Config
import org.opendc.common.utils.ConfigReader
+import org.opendc.common.utils.Kafka
import org.opendc.common.utils.PostgresqlDB
import org.opendc.experiments.base.experiment.getExperiment
import java.io.File
@@ -49,6 +50,7 @@ public fun main(args: Array<String>) {
/**
+ * @author Mateusz
* Opens a client socket from `config`, but otherwise works as before.
*/
internal class ExperimentCommand : CliktCommand(name = "experiment") {
@@ -68,6 +70,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") {
try {
clientSocket = Socket(config.address, config.port)
Config.setConfigSocket(clientSocket)
+ Config.setKafkaInstance(Kafka(config.topic, config.address, config.kafka))
val experiment = getExperiment(experimentPath)
runExperiment(experiment)
@@ -81,6 +84,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") {
}
/**
+ * @author Mateusz
* Creates a server socket and database connection from `config`.
*/
internal class ExperimentListener: CliktCommand(name = "listener") {
@@ -92,14 +96,11 @@ internal class ExperimentListener: CliktCommand(name = "listener") {
val configReader = ConfigReader()
var serverSocket: ServerSocket? = null
val config = configReader.read(configPath)
- val database = PostgresqlDB()
- val inetAddress = InetAddress.getByName(config.address)
+ Config.setDB(PostgresqlDB(config.address, config.postgresql, config.database, config.username, config.password))
try {
+ val inetAddress = InetAddress.getByName(config.address)
serverSocket = ServerSocket(config.port, config.backlog, inetAddress)
- database.setupDatabase(config.address, config.postgresql, config.username, config.password, config.database)
- database.clear()
- database.create()
runListener(serverSocket)
} catch (e: IOException) {
println("${e.message}")
@@ -107,9 +108,4 @@ internal class ExperimentListener: CliktCommand(name = "listener") {
serverSocket?.close()
}
}
-}
-
-
-
-
-
+} \ No newline at end of file
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
index 9d1f7374..9fee6cf9 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
@@ -65,6 +65,7 @@ public fun runExperiment(experiment: List<Scenario>) {
}
/**
+ * @author Mateusz
* Accepts a (single) connection and listens for requests.
* @param socket The socket to listen to.
*/
@@ -72,9 +73,6 @@ public fun runListener(socket: ServerSocket) {
var client : Socket? = null
try {
client = socket.accept()
- // here you should create another thread listening for sever-Kafka-client
- // communication, and to store the incoming results into Postgresql
-
Config.setConfigSocket(client)
val request = ByteArray(1024)
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index f33b6da8..ab25ef25 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -30,11 +30,8 @@ import org.opendc.compute.simulator.provisioner.setupComputeService
import org.opendc.compute.simulator.provisioner.setupHosts
import org.opendc.compute.simulator.scheduler.ComputeScheduler
import org.opendc.compute.simulator.service.ComputeService
-import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig
-import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor
-import org.opendc.compute.simulator.telemetry.parquet.withGpuColumns
import org.opendc.compute.topology.clusterTopology
-import org.opendc.demo.DemoComputeMonitor
+import org.opendc.compute.simulator.telemetry.KafkaComputeMonitor
import org.opendc.experiments.base.experiment.Scenario
import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec
import org.opendc.experiments.base.experiment.specs.allocation.createComputeScheduler
@@ -194,15 +191,15 @@ public fun addExportModel(
) {
/*
- * @Mateusz
- * Here is the entry point to DemoComputeMonitor().
+ * @author Mateusz
+ * Here is the entry point to KafkaComputeMonitor().
* With this setting, the simulator no longer writes to parquet files.
* To get back the original code, refer to https://github.com/atlarge-research/opendc
* */
provisioner.runStep(
registerComputeMonitor(
serviceDomain,
- DemoComputeMonitor(),
+ KafkaComputeMonitor(),
Duration.ofSeconds(scenario.exportModelSpec.exportInterval),
startTime,
scenario.exportModelSpec.filesToExportDict,