diff options
14 files changed, 159 insertions, 82 deletions
@@ -1,56 +1,21 @@ ### Dependencies -Paths are hardcoded. Be aware the project will only work with this OpenDC as there are hidden Maven dependencies. Kafka topic should be named `postgres_topic`. Confluent local (see https://www.confluent.io/installation/): -```bash -export CONFLUENT_HOME=/opt/confluent -export PATH=/opt/confluent/bin:$PATH -cd /opt/confluent -kafka-storage random-uuid -kafka-storage format -t 2vi2WtHxQAOPyXb1Bj1Jvw -c $CONFLUENT_HOME/etc/kafka/server.properties --standalone -kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties -schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties -connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/share/confluent-common/connectors/sink-jdbc.properties -``` +PostgreSQL (see https://www.postgresql.org/) Confluent JDBC sink and source (includes Postgres connector) (see https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc) Be mindful to configure the right `plugin.path` in `etc/kafka/connect-standalone.properties` -```bash -ln -s /home/matt/git/opendc/resources/experiments/sink-jdbc.properties /opt/confluent/share/confluent-common/connectors -``` - -Protobuf: -extra/protobuf 33.1-3 +Protobuf (see https://archlinux.org/packages/extra/x86_64/protobuf/) You need to run this each time you change `schema.proto` ```bash cd resources/experiments/ protoc --java_out=/home/matt/git/opendc/opendc-common/src/main/java/ schema.proto curl -X DELETE http://localhost:8081/subjects/postgres-topic-value -``` - -Postgresql: - -extra/postgresql 18.1-2 - -```bash -initdb -D /var/lib/postgres/data -mkdir /run/postgresql/ -cd /run/postgresql/ -touch .s.PGSQL.5432.lock -chown -R postgres:postgres /run/postgresql -``` - -Random -```bash -bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic postgres_topic -bin/kafka-topics.sh --create --topic postgres_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 -bin/kafka-topics.sh --list --bootstrap-server localhost:9092 -bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic postgres_topic --from-beginning -``` +```
\ No newline at end of file diff --git a/opendc-common/build.gradle.kts b/opendc-common/build.gradle.kts index 9fe4710c..dc323140 100644 --- a/opendc-common/build.gradle.kts +++ b/opendc-common/build.gradle.kts @@ -68,6 +68,9 @@ dependencies { implementation("io.javalin:javalin:6.7.0") + // Source: https://mvnrepository.com/artifact/redis.clients/jedis + implementation("redis.clients:jedis:7.3.0") + } diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/HTTPClient.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/HTTPClient.kt new file mode 100644 index 00000000..cc89d48f --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/HTTPClient.kt @@ -0,0 +1,50 @@ +package org.opendc.common.utils + +import java.io.File +import java.net.URI +import java.net.http.* +import java.net.http.HttpResponse.BodyHandlers.ofString +/** + * Singleton class representing the real datacenter client. + * The client is asynchronous and initiates the connection first. + * + * @constructor Initiates the connection. + * + * @author Mateusz Kwiatkowski + */ + +public class HTTPClient private constructor() { + public companion object { + private var instance: HTTPClient? = null + private var client: HttpClient? = null + private var handshake = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:8080/")) + .build() + + public fun getInstance(): HTTPClient? { + if (instance == null) { + try { + client = HttpClient.newBuilder().build() + val response = client?.send(handshake, ofString()) + check(response?.statusCode() == 200) + } catch (e: IllegalStateException) { + println("${e.message}") + } + instance = HTTPClient() + } + return instance + } + } + + // TODO: this class must send the experiment JSON file to the digital twin + public fun sendExperiment(experiment: File) { + val body : HttpRequest.BodyPublisher + val request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:8080/")) + .header("Content-type", "application/json") + // TODO: this is obviously wrong, find an efficient way to send JSON over network + .POST(HttpRequest.BodyPublishers.ofString(experiment)) + .build() + println("Haha") + } +}
\ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt index 9db7bfaf..a43e23a8 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt @@ -1,25 +1,50 @@ package org.opendc.common.utils import io.javalin.Javalin -import io.javalin.http.Context import io.javalin.http.Handler +/** + * Represents the digital twin monitoring server. + * @author Mateusz Kwiatkowski + * @see <a href=https://javalin.io/documentation>https://javalin.io/documentation</a> + */ + public class JavalinRunner { + private val handshake: Handler = Handler { ctx -> ctx.status(200) } - private val handleHello: Handler = Handler { ctx -> - ctx.status(200) - ctx.contentType("application/x-protobuf") - ctx.result("Hello world") + private val scenario: Handler = Handler { ctx -> } init { + // Make a CRUD RESTful API + // Specify server config val app = Javalin.create().start() - app.get("/hello", handleHello) + // returns a list of all experiments + app.get("/experiment", handshake) + + // returns a specific experiment + app.get("/experiment/:id", handshake) + + // you need another endpoint for the metrics + + // get the results for the metrics evaluation + app.get("/results/:id", handshake) + + // returns all results + app.get("/results", handshake) + + // sends a specific experiment + app.post("/experiment", scenario) + + // changes a specific experiment + app.put("/experiment/:id", scenario) + // this should delete metrics associated with the experiment + + // deletes an experiment with id + app.delete("/experiment/:id", scenario) - app.exception<Exception?>(Exception::class.java, { e: Exception?, ctx: Context? -> - e!!.printStackTrace() - ctx!!.status(500) - }) + // deletes all experiments + app.delete("/experiment", scenario) } }
\ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt index d7ccd385..1430898e 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt @@ -38,19 +38,4 @@ public class Kafka(private val topic: String) { } } } - - // TODO: fix - public fun getReceive() : () -> Unit { - val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties) - return fun() : Unit { - try { - consumer.subscribe(listOf(topic)) - while (true) { - consumer.poll(1.microseconds.toJavaDuration()) - } - } catch (e: Exception) { - println("${e.message}") - } - } - } }
\ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt index 03fd902c..35d03feb 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt @@ -47,7 +47,6 @@ public class PostgresqlDB { private fun String.asJdbc(database : String) : String { return "jdbc:postgresql://$this/$database" - } }
\ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt new file mode 100644 index 00000000..67547778 --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt @@ -0,0 +1,44 @@ +package org.opendc.common.utils + +import com.fasterxml.jackson.dataformat.toml.TomlMapper +import redis.clients.jedis.RedisClient +import redis.clients.jedis.StreamEntryID +import redis.clients.jedis.params.XReadParams +import java.util.Properties + +/** + * This class represents the Redis server instance. + * @author Mateusz Kwiatkowski + * @see <a href=https://redis.io/docs/latest/>https://redis.io/docs/latest/</a> + * + * @see <a href=https://redis.io/docs/latest/develop/data-types/streams/>https://redis.io/docs/latest/develop/data-types/streams/</a> + */ + +@Suppress("DEPRECATION") +public class Redis { + private var properties : Properties + + init { + properties = TomlMapper().readerFor(Properties().javaClass) + .readValue(Kafka::class.java.getResource("/producer.toml")) + } + + public fun run() { + val jedis : RedisClient = RedisClient.create("redis://localhost:6379") + + val res5 = jedis.xread( + XReadParams.xReadParams().block(300).count(100), + object : HashMap<String?, StreamEntryID?>() { + init { + put("${properties.getProperty("table")}", StreamEntryID()) + } + }) + + // in Redis you can subscribe to updates to a stream. + // you should base your application off this. + // you can listen for new items with XREAD + println(res5) + jedis.close() + } + +}
\ No newline at end of file diff --git a/opendc-common/src/main/resources/database.toml b/opendc-common/src/main/resources/database.toml index 35e1d159..c9aaa253 100644 --- a/opendc-common/src/main/resources/database.toml +++ b/opendc-common/src/main/resources/database.toml @@ -2,4 +2,4 @@ "username" = "matt" "password" = "admin" "database" = "opendc" -"table" = "postgres_topic"
\ No newline at end of file +"stream" = "postgres_topic"
\ No newline at end of file diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt index a4c4209c..baadd806 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt @@ -22,10 +22,21 @@ package org.opendc.compute.simulator.scheduler +import org.opendc.common.utils.HTTPClient import org.opendc.compute.simulator.service.HostView import org.opendc.compute.simulator.service.ServiceTask -public class SmartScheduler() : ComputeScheduler { +public class SmartScheduler : ComputeScheduler { + private val client = HTTPClient.getInstance() + // the point is that a smart scheduler listens for suggestions from the + // digital twin + // and here is where you change your actions based on the result from the DT + // predictive analytics is going to be much easier to do. + // you will completely overcome the overhead of having to tap-into + // the digital twin mid-through the simulation/in between two hosts being scheduled + // i.e., the normal simulation will NOT have to wait. + // predictive analytics will overcome the problem of the scheduling time overhead + override fun addHost(host: HostView) { TODO("Not yet implemented") } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt index 5dfa21c5..c8368af2 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt @@ -25,9 +25,8 @@ package org.opendc.compute.simulator.telemetry import org.opendc.common.ProtobufMetrics import org.opendc.common.utils.Kafka import org.opendc.compute.simulator.telemetry.table.host.HostTableReader - /** - * This class logs data from the simulator into Kafka. + * This class logs data from the simulator into a Kafka topic. * The data uses the Protobuf format. * * @author Mateusz Kwiatkowski @@ -50,6 +49,7 @@ public class KafkaComputeMonitor : ComputeMonitor { .setEnergyusage(reader.energyUsage) .build() this.send(packet) + } catch (e: Exception) { println("${e.message}") } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt index f3e634e6..ad662f25 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt @@ -28,6 +28,7 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.defaultLazy import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.types.file +import org.opendc.common.utils.HTTPClient import org.opendc.experiments.base.experiment.getExperiment import java.io.File import java.io.IOException @@ -40,10 +41,6 @@ public fun main(args: Array<String>) { else ExperimentListener().main(args) } -/** - * Opens a client socket from `config`, but otherwise works as before. - * @author Mateusz - */ internal class ExperimentCommand : CliktCommand(name = "experiment") { private val experimentPath by option("--experiment-path", help = "path to experiment file") .file(canBeDir = false, canBeFile = true) @@ -52,6 +49,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { override fun run() { try { val experiment = getExperiment(experimentPath) + HTTPClient.getInstance()?.sendExperiment(experimentPath) runExperiment(experiment) } catch (e: IOException) { @@ -60,9 +58,8 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { } } - /** - * Parses CLI arguments. + * Entry point to the digital twin. * * @author Mateusz Kwiatkowski */ diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt index a777262f..8eab48e6 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt @@ -26,6 +26,7 @@ import me.tongfei.progressbar.ProgressBarBuilder import me.tongfei.progressbar.ProgressBarStyle import org.opendc.common.utils.JavalinRunner import org.opendc.common.utils.PostgresqlDB +import org.opendc.common.utils.Redis import org.opendc.experiments.base.experiment.Scenario /** @@ -62,13 +63,16 @@ public fun runExperiment(experiment: List<Scenario>) { } /** - * Accepts a HTTP server and listens for requests. + * Established a connection with PostgreSQL. + * Creates a Javalin HTTP server and listens for requests. * * @author Mateusz Kwiatkowski * - * @see <a href=https://ktor.io/docs>https://ktor.io/docs</a> + * @see <a href=https://javalin.io/documentation>https://javalin.io/documentation</a> */ public fun runListener() { PostgresqlDB() JavalinRunner() + Redis().run() + println("Hello world, this means that Javalin already runs on another thread.") }
\ No newline at end of file diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index ab25ef25..5a9a4f3c 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -191,7 +191,6 @@ public fun addExportModel( ) { /* - * @author Mateusz * Here is the entry point to KafkaComputeMonitor(). * With this setting, the simulator no longer writes to parquet files. * To get back the original code, refer to https://github.com/atlarge-research/opendc diff --git a/run.sh b/run.sh deleted file mode 100755 index ed300fc2..00000000 --- a/run.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/zsh -kafka-storage format -t 2vi2WtHxQAOPyXb1Bj1Jvw -c $CONFLUENT_HOME/etc/kafka/server.properties --standalone -kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties -schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties -connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/share/confluent-common/connectors/sink-jdbc.properties |
