diff options
Diffstat (limited to 'opendc-experiments')
4 files changed, 14 insertions, 21 deletions
diff --git a/opendc-experiments/opendc-experiments-base/build.gradle.kts b/opendc-experiments/opendc-experiments-base/build.gradle.kts index bac04854..76de6a3c 100644 --- a/opendc-experiments/opendc-experiments-base/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-base/build.gradle.kts @@ -85,4 +85,6 @@ distributions { } repositories { mavenCentral() + // @Mateusz crucial to include this for kafka proto + maven(url = "https://packages.confluent.io/maven/") } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt index 874f5654..96071833 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt @@ -30,6 +30,7 @@ import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.types.file import org.opendc.common.utils.Config import org.opendc.common.utils.ConfigReader +import org.opendc.common.utils.Kafka import org.opendc.common.utils.PostgresqlDB import org.opendc.experiments.base.experiment.getExperiment import java.io.File @@ -49,6 +50,7 @@ public fun main(args: Array<String>) { /** + * @author Mateusz * Opens a client socket from `config`, but otherwise works as before. */ internal class ExperimentCommand : CliktCommand(name = "experiment") { @@ -68,6 +70,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { try { clientSocket = Socket(config.address, config.port) Config.setConfigSocket(clientSocket) + Config.setKafkaInstance(Kafka(config.topic, config.address, config.kafka)) val experiment = getExperiment(experimentPath) runExperiment(experiment) @@ -81,6 +84,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { } /** + * @author Mateusz * Creates a server socket and database connection from `config`. */ internal class ExperimentListener: CliktCommand(name = "listener") { @@ -92,14 +96,11 @@ internal class ExperimentListener: CliktCommand(name = "listener") { val configReader = ConfigReader() var serverSocket: ServerSocket? = null val config = configReader.read(configPath) - val database = PostgresqlDB() - val inetAddress = InetAddress.getByName(config.address) + Config.setDB(PostgresqlDB(config.address, config.postgresql, config.database, config.username, config.password)) try { + val inetAddress = InetAddress.getByName(config.address) serverSocket = ServerSocket(config.port, config.backlog, inetAddress) - database.setupDatabase(config.address, config.postgresql, config.username, config.password, config.database) - database.clear() - database.create() runListener(serverSocket) } catch (e: IOException) { println("${e.message}") @@ -107,9 +108,4 @@ internal class ExperimentListener: CliktCommand(name = "listener") { serverSocket?.close() } } -} - - - - - +}
\ No newline at end of file diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt index 9d1f7374..9fee6cf9 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt @@ -65,6 +65,7 @@ public fun runExperiment(experiment: List<Scenario>) { } /** + * @author Mateusz * Accepts a (single) connection and listens for requests. * @param socket The socket to listen to. */ @@ -72,9 +73,6 @@ public fun runListener(socket: ServerSocket) { var client : Socket? = null try { client = socket.accept() - // here you should create another thread listening for sever-Kafka-client - // communication, and to store the incoming results into Postgresql - Config.setConfigSocket(client) val request = ByteArray(1024) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index f33b6da8..ab25ef25 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -30,11 +30,8 @@ import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.service.ComputeService -import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig -import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor -import org.opendc.compute.simulator.telemetry.parquet.withGpuColumns import org.opendc.compute.topology.clusterTopology -import org.opendc.demo.DemoComputeMonitor +import org.opendc.compute.simulator.telemetry.KafkaComputeMonitor import org.opendc.experiments.base.experiment.Scenario import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec import org.opendc.experiments.base.experiment.specs.allocation.createComputeScheduler @@ -194,15 +191,15 @@ public fun addExportModel( ) { /* - * @Mateusz - * Here is the entry point to DemoComputeMonitor(). + * @author Mateusz + * Here is the entry point to KafkaComputeMonitor(). * With this setting, the simulator no longer writes to parquet files. * To get back the original code, refer to https://github.com/atlarge-research/opendc * */ provisioner.runStep( registerComputeMonitor( serviceDomain, - DemoComputeMonitor(), + KafkaComputeMonitor(), Duration.ofSeconds(scenario.exportModelSpec.exportInterval), startTime, scenario.exportModelSpec.filesToExportDict, |
