diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-base')
6 files changed, 34 insertions, 140 deletions
diff --git a/opendc-experiments/opendc-experiments-base/build.gradle.kts b/opendc-experiments/opendc-experiments-base/build.gradle.kts index ea764d4b..76de6a3c 100644 --- a/opendc-experiments/opendc-experiments-base/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-base/build.gradle.kts @@ -40,8 +40,8 @@ dependencies { implementation(libs.progressbar) implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-core"))) - //@Mateusz: for the sqlite database - implementation("org.xerial:sqlite-jdbc:3.36.0") + implementation(project(mapOf("path" to ":opendc-demo"))) + implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-workload"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-topology"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon"))) @@ -83,3 +83,8 @@ distributions { } } } +repositories { + mavenCentral() + // @Mateusz crucial to include this for kafka proto + maven(url = "https://packages.confluent.io/maven/") +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt deleted file mode 100644 index 0e354cbd..00000000 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt +++ /dev/null @@ -1,85 +0,0 @@ -package org.opendc.experiments.base.runner - -import kotlinx.serialization.ExperimentalSerializationApi -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import kotlinx.serialization.json.decodeFromStream -import java.io.BufferedReader -import java.io.BufferedWriter -import java.io.File -import java.io.IOException -import java.io.InputStream -import java.io.InputStreamReader -import java.io.OutputStream -import java.io.OutputStreamWriter -import java.net.ServerSocket -import java.net.Socket -import java.net.SocketImpl -import java.sql.Connection -import java.sql.DriverManager -import java.sql.SQLException - - -/** - * @property name - * @property backlog the amount of connections to accept - * @property databasePath - * @property address IPv4 address - * @property port - */ -@Serializable -public data class Config( - var name: String = "", - var backlog: Int = 0, - val databasePath: String = "", - val address: String = "", - val port: Int = 8080, -){ - public companion object{ - public var input: InputStream? = null - public var output: OutputStream? = null - public var database: Connection? = null - - public var socket: Socket? = null - - public fun setConfigSocket(socket: Socket?){ - this.socket = socket - try { - input = socket?.getInputStream() - output = socket?.getOutputStream() - } catch (e: IOException){ - print("${e.message}") - } - } - - public fun getConfigReader() : InputStream? { - return input - } - - public fun getConfigWriter() : OutputStream? { - return output - } - - public fun setupDatabase(path : String) : Connection?{ - val url = "jdbc:sqlite:$path" - try { - this.database = DriverManager.getConnection(url) - return database - } catch (e : SQLException) { - print("${e.message}") - return null - } - } - } -} -/** - * Reads `config.json` into Config data class. - */ -public class ConfigReader { - private val jsonReader = Json - public fun read(file: File): Config = read(file.inputStream()) - @OptIn(ExperimentalSerializationApi::class) - public fun read(input: InputStream): Config { - return jsonReader.decodeFromStream<Config>(input) - } -} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt index b9395001..96071833 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt @@ -28,6 +28,10 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.defaultLazy import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.types.file +import org.opendc.common.utils.Config +import org.opendc.common.utils.ConfigReader +import org.opendc.common.utils.Kafka +import org.opendc.common.utils.PostgresqlDB import org.opendc.experiments.base.experiment.getExperiment import java.io.File import java.io.IOException @@ -43,7 +47,10 @@ public fun main(args: Array<String>) { else ExperimentListener().main(args) } + + /** + * @author Mateusz * Opens a client socket from `config`, but otherwise works as before. */ internal class ExperimentCommand : CliktCommand(name = "experiment") { @@ -63,6 +70,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { try { clientSocket = Socket(config.address, config.port) Config.setConfigSocket(clientSocket) + Config.setKafkaInstance(Kafka(config.topic, config.address, config.kafka)) val experiment = getExperiment(experimentPath) runExperiment(experiment) @@ -76,6 +84,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { } /** + * @author Mateusz * Creates a server socket and database connection from `config`. */ internal class ExperimentListener: CliktCommand(name = "listener") { @@ -87,11 +96,11 @@ internal class ExperimentListener: CliktCommand(name = "listener") { val configReader = ConfigReader() var serverSocket: ServerSocket? = null val config = configReader.read(configPath) - val inetAddress = InetAddress.getByName(config.address) + Config.setDB(PostgresqlDB(config.address, config.postgresql, config.database, config.username, config.password)) try { + val inetAddress = InetAddress.getByName(config.address) serverSocket = ServerSocket(config.port, config.backlog, inetAddress) - Config.setupDatabase(config.databasePath) runListener(serverSocket) } catch (e: IOException) { println("${e.message}") @@ -99,9 +108,4 @@ internal class ExperimentListener: CliktCommand(name = "listener") { serverSocket?.close() } } -} - - - - - +}
\ No newline at end of file diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt index 312cb0ac..9fee6cf9 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt @@ -24,17 +24,12 @@ package org.opendc.experiments.base.runner import me.tongfei.progressbar.ProgressBarBuilder import me.tongfei.progressbar.ProgressBarStyle +import org.opendc.common.utils.Config import org.opendc.experiments.base.experiment.Scenario -import java.io.BufferedReader -import java.io.BufferedWriter import java.io.IOException -import java.io.InputStream -import java.io.InputStreamReader -import java.io.OutputStream -import java.io.OutputStreamWriter import java.net.ServerSocket import java.net.Socket - +import org.opendc.demo.runRequest /** * Run scenario when no pool is available for parallel execution @@ -61,9 +56,6 @@ public fun runExperiment(experiment: List<Scenario>) { println("$ansiGreen================================================================================$ansiReset") for (seed in 0..<scenario.runs) { - - Config.getConfigWriter()?.write("123\n".toByteArray()) - println("$ansiBlue Starting seed: $seed $ansiReset") runScenario(scenario, seed.toLong()) pb.step() @@ -73,6 +65,7 @@ public fun runExperiment(experiment: List<Scenario>) { } /** + * @author Mateusz * Accepts a (single) connection and listens for requests. * @param socket The socket to listen to. */ @@ -80,14 +73,11 @@ public fun runListener(socket: ServerSocket) { var client : Socket? = null try { client = socket.accept() - // here you should create another thread listening for sever-Kafka-client - // communication, and to store the incoming results into Postgresql - Config.setConfigSocket(client) - var request = ByteArray(1024) + val request = ByteArray(1024) while(true){ - var ret : Int? = Config.getConfigReader()?.read(request) + val ret : Int? = Config.getConfigReader()?.read(request) if(ret == -1) break if(ret != null && ret > 0) runRequest(String(request, 0, ret)) } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt deleted file mode 100644 index 0316119f..00000000 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt +++ /dev/null @@ -1,12 +0,0 @@ -package org.opendc.experiments.base.runner - -/** - * Processes a single HTTP request. - * @param request The request to process. - */ - -public fun runRequest(request: String) { - // https://github.com/am-i-helpful/opendc/blob/master/opendc-oda/opendc-oda-experiments/src/main/kotlin/org/opendc/oda/experimentrunner/ODAComputeMonitor.kt - // Do this here - println("The request is $request\n") -}
\ No newline at end of file diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 6d094eb4..ab25ef25 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -30,10 +30,8 @@ import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.service.ComputeService -import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig -import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor -import org.opendc.compute.simulator.telemetry.parquet.withGpuColumns import org.opendc.compute.topology.clusterTopology +import org.opendc.compute.simulator.telemetry.KafkaComputeMonitor import org.opendc.experiments.base.experiment.Scenario import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec import org.opendc.experiments.base.experiment.specs.allocation.createComputeScheduler @@ -136,11 +134,7 @@ public fun runScenario( provisioner, serviceDomain, scenario, - seed, startTime, - scenario.id, - computeExportConfig = - scenario.exportModelSpec.computeExportConfig.withGpuColumns(gpuCount), ) val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! @@ -181,7 +175,7 @@ public fun runScenario( } /** - * Saves the simulation results into a specific output folder received from the input.A + * Saves the simulation results into a specific output folder received from the input. * * @param provisioner The provisioner used to setup and run the simulation. * @param serviceDomain The domain of the compute service. @@ -193,21 +187,19 @@ public fun addExportModel( provisioner: Provisioner, serviceDomain: String, scenario: Scenario, - seed: Long, startTime: Duration, - index: Int, - computeExportConfig: ComputeExportConfig = scenario.exportModelSpec.computeExportConfig, ) { + + /* + * @author Mateusz + * Here is the entry point to KafkaComputeMonitor(). + * With this setting, the simulator no longer writes to parquet files. + * To get back the original code, refer to https://github.com/atlarge-research/opendc + * */ provisioner.runStep( registerComputeMonitor( serviceDomain, - ParquetComputeMonitor( - File("${scenario.outputFolder}/raw-output/$index"), - "seed=$seed", - bufferSize = 4096, - scenario.exportModelSpec.filesToExportDict, - computeExportConfig = computeExportConfig, - ), + KafkaComputeMonitor(), Duration.ofSeconds(scenario.exportModelSpec.exportInterval), startTime, scenario.exportModelSpec.filesToExportDict, |
