summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-base/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-base/src')
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt85
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt49
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt50
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt12
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt27
5 files changed, 31 insertions, 192 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt
deleted file mode 100644
index 0e354cbd..00000000
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ConfigParser.kt
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.opendc.experiments.base.runner
-
-import kotlinx.serialization.ExperimentalSerializationApi
-import kotlinx.serialization.Serializable
-import kotlinx.serialization.json.Json
-import kotlinx.serialization.json.decodeFromStream
-import java.io.BufferedReader
-import java.io.BufferedWriter
-import java.io.File
-import java.io.IOException
-import java.io.InputStream
-import java.io.InputStreamReader
-import java.io.OutputStream
-import java.io.OutputStreamWriter
-import java.net.ServerSocket
-import java.net.Socket
-import java.net.SocketImpl
-import java.sql.Connection
-import java.sql.DriverManager
-import java.sql.SQLException
-
-
-/**
- * @property name
- * @property backlog the amount of connections to accept
- * @property databasePath
- * @property address IPv4 address
- * @property port
- */
-@Serializable
-public data class Config(
- var name: String = "",
- var backlog: Int = 0,
- val databasePath: String = "",
- val address: String = "",
- val port: Int = 8080,
-){
- public companion object{
- public var input: InputStream? = null
- public var output: OutputStream? = null
- public var database: Connection? = null
-
- public var socket: Socket? = null
-
- public fun setConfigSocket(socket: Socket?){
- this.socket = socket
- try {
- input = socket?.getInputStream()
- output = socket?.getOutputStream()
- } catch (e: IOException){
- print("${e.message}")
- }
- }
-
- public fun getConfigReader() : InputStream? {
- return input
- }
-
- public fun getConfigWriter() : OutputStream? {
- return output
- }
-
- public fun setupDatabase(path : String) : Connection?{
- val url = "jdbc:sqlite:$path"
- try {
- this.database = DriverManager.getConnection(url)
- return database
- } catch (e : SQLException) {
- print("${e.message}")
- return null
- }
- }
- }
-}
-/**
- * Reads `config.json` into Config data class.
- */
-public class ConfigReader {
- private val jsonReader = Json
- public fun read(file: File): Config = read(file.inputStream())
- @OptIn(ExperimentalSerializationApi::class)
- public fun read(input: InputStream): Config {
- return jsonReader.decodeFromStream<Config>(input)
- }
-}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
index b9395001..ad662f25 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
@@ -28,80 +28,47 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.defaultLazy
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.types.file
+import org.opendc.common.utils.HTTPClient
import org.opendc.experiments.base.experiment.getExperiment
import java.io.File
import java.io.IOException
-import java.net.InetAddress
-import java.net.ServerSocket
-import java.net.Socket
/**
* Main entrypoint of the application.
*/
public fun main(args: Array<String>) {
- if(args.size == 4) ExperimentCommand().main(args)
+ if(args.size == 2) ExperimentCommand().main(args)
else ExperimentListener().main(args)
}
-/**
- * Opens a client socket from `config`, but otherwise works as before.
- */
internal class ExperimentCommand : CliktCommand(name = "experiment") {
- private val configPath by option("--config-path", help = "path to config file")
- .file(canBeDir = false, canBeFile = true)
- .defaultLazy { File("resources/config.json") }
-
private val experimentPath by option("--experiment-path", help = "path to experiment file")
.file(canBeDir = false, canBeFile = true)
.defaultLazy { File("resources/experiment.json") }
override fun run() {
- val configReader = ConfigReader()
- val config = configReader.read(configPath)
- var clientSocket : Socket? = null
-
try {
- clientSocket = Socket(config.address, config.port)
- Config.setConfigSocket(clientSocket)
val experiment = getExperiment(experimentPath)
+ HTTPClient.getInstance()?.sendExperiment(experimentPath)
runExperiment(experiment)
} catch (e: IOException) {
println("${e.message}")
- } finally {
- clientSocket?.close()
}
}
}
-
/**
- * Creates a server socket and database connection from `config`.
+ * Entry point to the digital twin.
+ *
+ * @author Mateusz Kwiatkowski
*/
internal class ExperimentListener: CliktCommand(name = "listener") {
- private val configPath by option("--config-path", help = "path to config file")
- .file(canBeDir = false, canBeFile = true)
- .defaultLazy { File("resources/config.json") }
-
override fun run() {
- val configReader = ConfigReader()
- var serverSocket: ServerSocket? = null
- val config = configReader.read(configPath)
- val inetAddress = InetAddress.getByName(config.address)
-
try {
- serverSocket = ServerSocket(config.port, config.backlog, inetAddress)
- Config.setupDatabase(config.databasePath)
- runListener(serverSocket)
+ runListener()
} catch (e: IOException) {
println("${e.message}")
- } finally {
- serverSocket?.close()
}
}
-}
-
-
-
-
-
+} \ No newline at end of file
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
index 312cb0ac..8eab48e6 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
@@ -24,17 +24,10 @@ package org.opendc.experiments.base.runner
import me.tongfei.progressbar.ProgressBarBuilder
import me.tongfei.progressbar.ProgressBarStyle
+import org.opendc.common.utils.JavalinRunner
+import org.opendc.common.utils.PostgresqlDB
+import org.opendc.common.utils.Redis
import org.opendc.experiments.base.experiment.Scenario
-import java.io.BufferedReader
-import java.io.BufferedWriter
-import java.io.IOException
-import java.io.InputStream
-import java.io.InputStreamReader
-import java.io.OutputStream
-import java.io.OutputStreamWriter
-import java.net.ServerSocket
-import java.net.Socket
-
/**
* Run scenario when no pool is available for parallel execution
@@ -61,9 +54,6 @@ public fun runExperiment(experiment: List<Scenario>) {
println("$ansiGreen================================================================================$ansiReset")
for (seed in 0..<scenario.runs) {
-
- Config.getConfigWriter()?.write("123\n".toByteArray())
-
println("$ansiBlue Starting seed: $seed $ansiReset")
runScenario(scenario, seed.toLong())
pb.step()
@@ -73,28 +63,16 @@ public fun runExperiment(experiment: List<Scenario>) {
}
/**
- * Accepts a (single) connection and listens for requests.
- * @param socket The socket to listen to.
+ * Established a connection with PostgreSQL.
+ * Creates a Javalin HTTP server and listens for requests.
+ *
+ * @author Mateusz Kwiatkowski
+ *
+ * @see <a href=https://javalin.io/documentation>https://javalin.io/documentation</a>
*/
-public fun runListener(socket: ServerSocket) {
- var client : Socket? = null
- try {
- client = socket.accept()
- // here you should create another thread listening for sever-Kafka-client
- // communication, and to store the incoming results into Postgresql
-
- Config.setConfigSocket(client)
- var request = ByteArray(1024)
-
- while(true){
- var ret : Int? = Config.getConfigReader()?.read(request)
- if(ret == -1) break
- if(ret != null && ret > 0) runRequest(String(request, 0, ret))
- }
-
- } catch (e: IOException) {
- println("${e.message}")
- } finally {
- client?.close()
- }
+public fun runListener() {
+ PostgresqlDB()
+ JavalinRunner()
+ Redis().run()
+ println("Hello world, this means that Javalin already runs on another thread.")
} \ No newline at end of file
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt
deleted file mode 100644
index 0316119f..00000000
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/RequestRunner.kt
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.opendc.experiments.base.runner
-
-/**
- * Processes a single HTTP request.
- * @param request The request to process.
- */
-
-public fun runRequest(request: String) {
- // https://github.com/am-i-helpful/opendc/blob/master/opendc-oda/opendc-oda-experiments/src/main/kotlin/org/opendc/oda/experimentrunner/ODAComputeMonitor.kt
- // Do this here
- println("The request is $request\n")
-} \ No newline at end of file
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index 6d094eb4..5a9a4f3c 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -30,10 +30,8 @@ import org.opendc.compute.simulator.provisioner.setupComputeService
import org.opendc.compute.simulator.provisioner.setupHosts
import org.opendc.compute.simulator.scheduler.ComputeScheduler
import org.opendc.compute.simulator.service.ComputeService
-import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig
-import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor
-import org.opendc.compute.simulator.telemetry.parquet.withGpuColumns
import org.opendc.compute.topology.clusterTopology
+import org.opendc.compute.simulator.telemetry.KafkaComputeMonitor
import org.opendc.experiments.base.experiment.Scenario
import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec
import org.opendc.experiments.base.experiment.specs.allocation.createComputeScheduler
@@ -136,11 +134,7 @@ public fun runScenario(
provisioner,
serviceDomain,
scenario,
- seed,
startTime,
- scenario.id,
- computeExportConfig =
- scenario.exportModelSpec.computeExportConfig.withGpuColumns(gpuCount),
)
val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
@@ -181,7 +175,7 @@ public fun runScenario(
}
/**
- * Saves the simulation results into a specific output folder received from the input.A
+ * Saves the simulation results into a specific output folder received from the input.
*
* @param provisioner The provisioner used to setup and run the simulation.
* @param serviceDomain The domain of the compute service.
@@ -193,21 +187,18 @@ public fun addExportModel(
provisioner: Provisioner,
serviceDomain: String,
scenario: Scenario,
- seed: Long,
startTime: Duration,
- index: Int,
- computeExportConfig: ComputeExportConfig = scenario.exportModelSpec.computeExportConfig,
) {
+
+ /*
+ * Here is the entry point to KafkaComputeMonitor().
+ * With this setting, the simulator no longer writes to parquet files.
+ * To get back the original code, refer to https://github.com/atlarge-research/opendc
+ * */
provisioner.runStep(
registerComputeMonitor(
serviceDomain,
- ParquetComputeMonitor(
- File("${scenario.outputFolder}/raw-output/$index"),
- "seed=$seed",
- bufferSize = 4096,
- scenario.exportModelSpec.filesToExportDict,
- computeExportConfig = computeExportConfig,
- ),
+ KafkaComputeMonitor(),
Duration.ofSeconds(scenario.exportModelSpec.exportInterval),
startTime,
scenario.exportModelSpec.filesToExportDict,