diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-base/src')
5 files changed, 31 insertions, 192 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt deleted file mode 100644 index 0e354cbd..00000000 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt +++ /dev/null @@ -1,85 +0,0 @@ -package org.opendc.experiments.base.runner - -import kotlinx.serialization.ExperimentalSerializationApi -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import kotlinx.serialization.json.decodeFromStream -import java.io.BufferedReader -import java.io.BufferedWriter -import java.io.File -import java.io.IOException -import java.io.InputStream -import java.io.InputStreamReader -import java.io.OutputStream -import java.io.OutputStreamWriter -import java.net.ServerSocket -import java.net.Socket -import java.net.SocketImpl -import java.sql.Connection -import java.sql.DriverManager -import java.sql.SQLException - - -/** - * @property name - * @property backlog the amount of connections to accept - * @property databasePath - * @property address IPv4 address - * @property port - */ -@Serializable -public data class Config( - var name: String = "", - var backlog: Int = 0, - val databasePath: String = "", - val address: String = "", - val port: Int = 8080, -){ - public companion object{ - public var input: InputStream? = null - public var output: OutputStream? = null - public var database: Connection? = null - - public var socket: Socket? = null - - public fun setConfigSocket(socket: Socket?){ - this.socket = socket - try { - input = socket?.getInputStream() - output = socket?.getOutputStream() - } catch (e: IOException){ - print("${e.message}") - } - } - - public fun getConfigReader() : InputStream? { - return input - } - - public fun getConfigWriter() : OutputStream? { - return output - } - - public fun setupDatabase(path : String) : Connection?{ - val url = "jdbc:sqlite:$path" - try { - this.database = DriverManager.getConnection(url) - return database - } catch (e : SQLException) { - print("${e.message}") - return null - } - } - } -} -/** - * Reads `config.json` into Config data class. - */ -public class ConfigReader { - private val jsonReader = Json - public fun read(file: File): Config = read(file.inputStream()) - @OptIn(ExperimentalSerializationApi::class) - public fun read(input: InputStream): Config { - return jsonReader.decodeFromStream<Config>(input) - } -} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt index b9395001..ad662f25 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt @@ -28,80 +28,47 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.defaultLazy import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.types.file +import org.opendc.common.utils.HTTPClient import org.opendc.experiments.base.experiment.getExperiment import java.io.File import java.io.IOException -import java.net.InetAddress -import java.net.ServerSocket -import java.net.Socket /** * Main entrypoint of the application. */ public fun main(args: Array<String>) { - if(args.size == 4) ExperimentCommand().main(args) + if(args.size == 2) ExperimentCommand().main(args) else ExperimentListener().main(args) } -/** - * Opens a client socket from `config`, but otherwise works as before. - */ internal class ExperimentCommand : CliktCommand(name = "experiment") { - private val configPath by option("--config-path", help = "path to config file") - .file(canBeDir = false, canBeFile = true) - .defaultLazy { File("resources/config.json") } - private val experimentPath by option("--experiment-path", help = "path to experiment file") .file(canBeDir = false, canBeFile = true) .defaultLazy { File("resources/experiment.json") } override fun run() { - val configReader = ConfigReader() - val config = configReader.read(configPath) - var clientSocket : Socket? = null - try { - clientSocket = Socket(config.address, config.port) - Config.setConfigSocket(clientSocket) val experiment = getExperiment(experimentPath) + HTTPClient.getInstance()?.sendExperiment(experimentPath) runExperiment(experiment) } catch (e: IOException) { println("${e.message}") - } finally { - clientSocket?.close() } } } - /** - * Creates a server socket and database connection from `config`. + * Entry point to the digital twin. + * + * @author Mateusz Kwiatkowski */ internal class ExperimentListener: CliktCommand(name = "listener") { - private val configPath by option("--config-path", help = "path to config file") - .file(canBeDir = false, canBeFile = true) - .defaultLazy { File("resources/config.json") } - override fun run() { - val configReader = ConfigReader() - var serverSocket: ServerSocket? = null - val config = configReader.read(configPath) - val inetAddress = InetAddress.getByName(config.address) - try { - serverSocket = ServerSocket(config.port, config.backlog, inetAddress) - Config.setupDatabase(config.databasePath) - runListener(serverSocket) + runListener() } catch (e: IOException) { println("${e.message}") - } finally { - serverSocket?.close() } } -} - - - - - +}
\ No newline at end of file diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt index 312cb0ac..8eab48e6 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt @@ -24,17 +24,10 @@ package org.opendc.experiments.base.runner import me.tongfei.progressbar.ProgressBarBuilder import me.tongfei.progressbar.ProgressBarStyle +import org.opendc.common.utils.JavalinRunner +import org.opendc.common.utils.PostgresqlDB +import org.opendc.common.utils.Redis import org.opendc.experiments.base.experiment.Scenario -import java.io.BufferedReader -import java.io.BufferedWriter -import java.io.IOException -import java.io.InputStream -import java.io.InputStreamReader -import java.io.OutputStream -import java.io.OutputStreamWriter -import java.net.ServerSocket -import java.net.Socket - /** * Run scenario when no pool is available for parallel execution @@ -61,9 +54,6 @@ public fun runExperiment(experiment: List<Scenario>) { println("$ansiGreen================================================================================$ansiReset") for (seed in 0..<scenario.runs) { - - Config.getConfigWriter()?.write("123\n".toByteArray()) - println("$ansiBlue Starting seed: $seed $ansiReset") runScenario(scenario, seed.toLong()) pb.step() @@ -73,28 +63,16 @@ public fun runExperiment(experiment: List<Scenario>) { } /** - * Accepts a (single) connection and listens for requests. - * @param socket The socket to listen to. + * Established a connection with PostgreSQL. + * Creates a Javalin HTTP server and listens for requests. + * + * @author Mateusz Kwiatkowski + * + * @see <a href=https://javalin.io/documentation>https://javalin.io/documentation</a> */ -public fun runListener(socket: ServerSocket) { - var client : Socket? = null - try { - client = socket.accept() - // here you should create another thread listening for sever-Kafka-client - // communication, and to store the incoming results into Postgresql - - Config.setConfigSocket(client) - var request = ByteArray(1024) - - while(true){ - var ret : Int? = Config.getConfigReader()?.read(request) - if(ret == -1) break - if(ret != null && ret > 0) runRequest(String(request, 0, ret)) - } - - } catch (e: IOException) { - println("${e.message}") - } finally { - client?.close() - } +public fun runListener() { + PostgresqlDB() + JavalinRunner() + Redis().run() + println("Hello world, this means that Javalin already runs on another thread.") }
\ No newline at end of file diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt deleted file mode 100644 index 0316119f..00000000 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt +++ /dev/null @@ -1,12 +0,0 @@ -package org.opendc.experiments.base.runner - -/** - * Processes a single HTTP request. - * @param request The request to process. - */ - -public fun runRequest(request: String) { - // https://github.com/am-i-helpful/opendc/blob/master/opendc-oda/opendc-oda-experiments/src/main/kotlin/org/opendc/oda/experimentrunner/ODAComputeMonitor.kt - // Do this here - println("The request is $request\n") -}
\ No newline at end of file diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 6d094eb4..5a9a4f3c 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -30,10 +30,8 @@ import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.service.ComputeService -import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig -import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor -import org.opendc.compute.simulator.telemetry.parquet.withGpuColumns import org.opendc.compute.topology.clusterTopology +import org.opendc.compute.simulator.telemetry.KafkaComputeMonitor import org.opendc.experiments.base.experiment.Scenario import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec import org.opendc.experiments.base.experiment.specs.allocation.createComputeScheduler @@ -136,11 +134,7 @@ public fun runScenario( provisioner, serviceDomain, scenario, - seed, startTime, - scenario.id, - computeExportConfig = - scenario.exportModelSpec.computeExportConfig.withGpuColumns(gpuCount), ) val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! @@ -181,7 +175,7 @@ public fun runScenario( } /** - * Saves the simulation results into a specific output folder received from the input.A + * Saves the simulation results into a specific output folder received from the input. * * @param provisioner The provisioner used to setup and run the simulation. * @param serviceDomain The domain of the compute service. @@ -193,21 +187,18 @@ public fun addExportModel( provisioner: Provisioner, serviceDomain: String, scenario: Scenario, - seed: Long, startTime: Duration, - index: Int, - computeExportConfig: ComputeExportConfig = scenario.exportModelSpec.computeExportConfig, ) { + + /* + * Here is the entry point to KafkaComputeMonitor(). + * With this setting, the simulator no longer writes to parquet files. + * To get back the original code, refer to https://github.com/atlarge-research/opendc + * */ provisioner.runStep( registerComputeMonitor( serviceDomain, - ParquetComputeMonitor( - File("${scenario.outputFolder}/raw-output/$index"), - "seed=$seed", - bufferSize = 4096, - scenario.exportModelSpec.filesToExportDict, - computeExportConfig = computeExportConfig, - ), + KafkaComputeMonitor(), Duration.ofSeconds(scenario.exportModelSpec.exportInterval), startTime, scenario.exportModelSpec.filesToExportDict, |
