summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormjkwiatkowski <mati.rewa@gmail.com>2026-02-23 12:05:58 +0100
committermjkwiatkowski <mati.rewa@gmail.com>2026-02-23 12:05:58 +0100
commit4f816318b6672d40f23b22ca44cc06b77cadf961 (patch)
tree40aa2cae25fee7a92eb36d1d471534a8b53fecd0
parentdaad473975cc3e6eba0536d5a8fe750cf8b2fa7d (diff)
feat: added a working connection to Redis, and a scaffolding for the RESTful APIHEADmaster
-rw-r--r--README.md41
-rw-r--r--opendc-common/build.gradle.kts3
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/HTTPClient.kt50
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt45
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt15
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt1
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt44
-rw-r--r--opendc-common/src/main/resources/database.toml2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt13
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt4
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt9
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt8
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt1
-rwxr-xr-xrun.sh5
14 files changed, 159 insertions, 82 deletions
diff --git a/README.md b/README.md
index 55448b73..429af1d7 100644
--- a/README.md
+++ b/README.md
@@ -1,56 +1,21 @@
### Dependencies
-Paths are hardcoded.
Be aware the project will only work with this OpenDC as there are hidden Maven dependencies.
Kafka topic should be named `postgres_topic`.
Confluent local (see https://www.confluent.io/installation/):
-```bash
-export CONFLUENT_HOME=/opt/confluent
-export PATH=/opt/confluent/bin:$PATH
-cd /opt/confluent
-kafka-storage random-uuid
-kafka-storage format -t 2vi2WtHxQAOPyXb1Bj1Jvw -c $CONFLUENT_HOME/etc/kafka/server.properties --standalone
-kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties
-schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
-connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/share/confluent-common/connectors/sink-jdbc.properties
-```
+PostgreSQL (see https://www.postgresql.org/)
Confluent JDBC sink and source (includes Postgres connector)
(see https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc)
Be mindful to configure the right `plugin.path` in `etc/kafka/connect-standalone.properties`
-```bash
-ln -s /home/matt/git/opendc/resources/experiments/sink-jdbc.properties /opt/confluent/share/confluent-common/connectors
-```
-
-Protobuf:
-extra/protobuf 33.1-3
+Protobuf (see https://archlinux.org/packages/extra/x86_64/protobuf/)
You need to run this each time you change `schema.proto`
```bash
cd resources/experiments/
protoc --java_out=/home/matt/git/opendc/opendc-common/src/main/java/ schema.proto
curl -X DELETE http://localhost:8081/subjects/postgres-topic-value
-```
-
-Postgresql:
-
-extra/postgresql 18.1-2
-
-```bash
-initdb -D /var/lib/postgres/data
-mkdir /run/postgresql/
-cd /run/postgresql/
-touch .s.PGSQL.5432.lock
-chown -R postgres:postgres /run/postgresql
-```
-
-Random
-```bash
-bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic postgres_topic
-bin/kafka-topics.sh --create --topic postgres_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
-bin/kafka-topics.sh --list --bootstrap-server localhost:9092
-bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic postgres_topic --from-beginning
-```
+``` \ No newline at end of file
diff --git a/opendc-common/build.gradle.kts b/opendc-common/build.gradle.kts
index 9fe4710c..dc323140 100644
--- a/opendc-common/build.gradle.kts
+++ b/opendc-common/build.gradle.kts
@@ -68,6 +68,9 @@ dependencies {
implementation("io.javalin:javalin:6.7.0")
+ // Source: https://mvnrepository.com/artifact/redis.clients/jedis
+ implementation("redis.clients:jedis:7.3.0")
+
}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/HTTPClient.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/HTTPClient.kt
new file mode 100644
index 00000000..cc89d48f
--- /dev/null
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/HTTPClient.kt
@@ -0,0 +1,50 @@
+package org.opendc.common.utils
+
+import java.io.File
+import java.net.URI
+import java.net.http.*
+import java.net.http.HttpResponse.BodyHandlers.ofString
+/**
+ * Singleton class representing the real datacenter client.
+ * The client is asynchronous and initiates the connection first.
+ *
+ * @constructor Initiates the connection.
+ *
+ * @author Mateusz Kwiatkowski
+ */
+
+public class HTTPClient private constructor() {
+ public companion object {
+ private var instance: HTTPClient? = null
+ private var client: HttpClient? = null
+ private var handshake = HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:8080/"))
+ .build()
+
+ public fun getInstance(): HTTPClient? {
+ if (instance == null) {
+ try {
+ client = HttpClient.newBuilder().build()
+ val response = client?.send(handshake, ofString())
+ check(response?.statusCode() == 200)
+ } catch (e: IllegalStateException) {
+ println("${e.message}")
+ }
+ instance = HTTPClient()
+ }
+ return instance
+ }
+ }
+
+ // TODO: this class must send the experiment JSON file to the digital twin
+ public fun sendExperiment(experiment: File) {
+ val body : HttpRequest.BodyPublisher
+ val request = HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:8080/"))
+ .header("Content-type", "application/json")
+ // TODO: this is obviously wrong, find an efficient way to send JSON over network
+ .POST(HttpRequest.BodyPublishers.ofString(experiment))
+ .build()
+ println("Haha")
+ }
+} \ No newline at end of file
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt
index 9db7bfaf..a43e23a8 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt
@@ -1,25 +1,50 @@
package org.opendc.common.utils
import io.javalin.Javalin
-import io.javalin.http.Context
import io.javalin.http.Handler
+/**
+ * Represents the digital twin monitoring server.
+ * @author Mateusz Kwiatkowski
+ * @see <a href=https://javalin.io/documentation>https://javalin.io/documentation</a>
+ */
+
public class JavalinRunner {
+ private val handshake: Handler = Handler { ctx -> ctx.status(200) }
- private val handleHello: Handler = Handler { ctx ->
- ctx.status(200)
- ctx.contentType("application/x-protobuf")
- ctx.result("Hello world")
+ private val scenario: Handler = Handler { ctx ->
}
init {
+ // Make a CRUD RESTful API
+ // Specify server config
val app = Javalin.create().start()
- app.get("/hello", handleHello)
+ // returns a list of all experiments
+ app.get("/experiment", handshake)
+
+ // returns a specific experiment
+ app.get("/experiment/:id", handshake)
+
+ // you need another endpoint for the metrics
+
+ // get the results for the metrics evaluation
+ app.get("/results/:id", handshake)
+
+ // returns all results
+ app.get("/results", handshake)
+
+ // sends a specific experiment
+ app.post("/experiment", scenario)
+
+ // changes a specific experiment
+ app.put("/experiment/:id", scenario)
+ // this should delete metrics associated with the experiment
+
+ // deletes an experiment with id
+ app.delete("/experiment/:id", scenario)
- app.exception<Exception?>(Exception::class.java, { e: Exception?, ctx: Context? ->
- e!!.printStackTrace()
- ctx!!.status(500)
- })
+ // deletes all experiments
+ app.delete("/experiment", scenario)
}
} \ No newline at end of file
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
index d7ccd385..1430898e 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
@@ -38,19 +38,4 @@ public class Kafka(private val topic: String) {
}
}
}
-
- // TODO: fix
- public fun getReceive() : () -> Unit {
- val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties)
- return fun() : Unit {
- try {
- consumer.subscribe(listOf(topic))
- while (true) {
- consumer.poll(1.microseconds.toJavaDuration())
- }
- } catch (e: Exception) {
- println("${e.message}")
- }
- }
- }
} \ No newline at end of file
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt
index 03fd902c..35d03feb 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt
@@ -47,7 +47,6 @@ public class PostgresqlDB {
private fun String.asJdbc(database : String) : String {
return "jdbc:postgresql://$this/$database"
-
}
} \ No newline at end of file
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt
new file mode 100644
index 00000000..67547778
--- /dev/null
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt
@@ -0,0 +1,44 @@
+package org.opendc.common.utils
+
+import com.fasterxml.jackson.dataformat.toml.TomlMapper
+import redis.clients.jedis.RedisClient
+import redis.clients.jedis.StreamEntryID
+import redis.clients.jedis.params.XReadParams
+import java.util.Properties
+
+/**
+ * This class represents the Redis server instance.
+ * @author Mateusz Kwiatkowski
+ * @see <a href=https://redis.io/docs/latest/>https://redis.io/docs/latest/</a>
+ *
+ * @see <a href=https://redis.io/docs/latest/develop/data-types/streams/>https://redis.io/docs/latest/develop/data-types/streams/</a>
+ */
+
+@Suppress("DEPRECATION")
+public class Redis {
+ private var properties : Properties
+
+ init {
+ properties = TomlMapper().readerFor(Properties().javaClass)
+ .readValue(Kafka::class.java.getResource("/producer.toml"))
+ }
+
+ public fun run() {
+ val jedis : RedisClient = RedisClient.create("redis://localhost:6379")
+
+ val res5 = jedis.xread(
+ XReadParams.xReadParams().block(300).count(100),
+ object : HashMap<String?, StreamEntryID?>() {
+ init {
+ put("${properties.getProperty("table")}", StreamEntryID())
+ }
+ })
+
+ // in Redis you can subscribe to updates to a stream.
+ // you should base your application off this.
+ // you can listen for new items with XREAD
+ println(res5)
+ jedis.close()
+ }
+
+} \ No newline at end of file
diff --git a/opendc-common/src/main/resources/database.toml b/opendc-common/src/main/resources/database.toml
index 35e1d159..c9aaa253 100644
--- a/opendc-common/src/main/resources/database.toml
+++ b/opendc-common/src/main/resources/database.toml
@@ -2,4 +2,4 @@
"username" = "matt"
"password" = "admin"
"database" = "opendc"
-"table" = "postgres_topic" \ No newline at end of file
+"stream" = "postgres_topic" \ No newline at end of file
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt
index a4c4209c..baadd806 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt
@@ -22,10 +22,21 @@
package org.opendc.compute.simulator.scheduler
+import org.opendc.common.utils.HTTPClient
import org.opendc.compute.simulator.service.HostView
import org.opendc.compute.simulator.service.ServiceTask
-public class SmartScheduler() : ComputeScheduler {
+public class SmartScheduler : ComputeScheduler {
+ private val client = HTTPClient.getInstance()
+ // the point is that a smart scheduler listens for suggestions from the
+ // digital twin
+ // and here is where you change your actions based on the result from the DT
+ // predictive analytics is going to be much easier to do.
+ // you will completely overcome the overhead of having to tap-into
+ // the digital twin mid-through the simulation/in between two hosts being scheduled
+ // i.e., the normal simulation will NOT have to wait.
+ // predictive analytics will overcome the problem of the scheduling time overhead
+
override fun addHost(host: HostView) {
TODO("Not yet implemented")
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
index 5dfa21c5..c8368af2 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
@@ -25,9 +25,8 @@ package org.opendc.compute.simulator.telemetry
import org.opendc.common.ProtobufMetrics
import org.opendc.common.utils.Kafka
import org.opendc.compute.simulator.telemetry.table.host.HostTableReader
-
/**
- * This class logs data from the simulator into Kafka.
+ * This class logs data from the simulator into a Kafka topic.
* The data uses the Protobuf format.
*
* @author Mateusz Kwiatkowski
@@ -50,6 +49,7 @@ public class KafkaComputeMonitor : ComputeMonitor {
.setEnergyusage(reader.energyUsage)
.build()
this.send(packet)
+
} catch (e: Exception) {
println("${e.message}")
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
index f3e634e6..ad662f25 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
@@ -28,6 +28,7 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.defaultLazy
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.types.file
+import org.opendc.common.utils.HTTPClient
import org.opendc.experiments.base.experiment.getExperiment
import java.io.File
import java.io.IOException
@@ -40,10 +41,6 @@ public fun main(args: Array<String>) {
else ExperimentListener().main(args)
}
-/**
- * Opens a client socket from `config`, but otherwise works as before.
- * @author Mateusz
- */
internal class ExperimentCommand : CliktCommand(name = "experiment") {
private val experimentPath by option("--experiment-path", help = "path to experiment file")
.file(canBeDir = false, canBeFile = true)
@@ -52,6 +49,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") {
override fun run() {
try {
val experiment = getExperiment(experimentPath)
+ HTTPClient.getInstance()?.sendExperiment(experimentPath)
runExperiment(experiment)
} catch (e: IOException) {
@@ -60,9 +58,8 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") {
}
}
-
/**
- * Parses CLI arguments.
+ * Entry point to the digital twin.
*
* @author Mateusz Kwiatkowski
*/
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
index a777262f..8eab48e6 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
@@ -26,6 +26,7 @@ import me.tongfei.progressbar.ProgressBarBuilder
import me.tongfei.progressbar.ProgressBarStyle
import org.opendc.common.utils.JavalinRunner
import org.opendc.common.utils.PostgresqlDB
+import org.opendc.common.utils.Redis
import org.opendc.experiments.base.experiment.Scenario
/**
@@ -62,13 +63,16 @@ public fun runExperiment(experiment: List<Scenario>) {
}
/**
- * Accepts a HTTP server and listens for requests.
+ * Established a connection with PostgreSQL.
+ * Creates a Javalin HTTP server and listens for requests.
*
* @author Mateusz Kwiatkowski
*
- * @see <a href=https://ktor.io/docs>https://ktor.io/docs</a>
+ * @see <a href=https://javalin.io/documentation>https://javalin.io/documentation</a>
*/
public fun runListener() {
PostgresqlDB()
JavalinRunner()
+ Redis().run()
+ println("Hello world, this means that Javalin already runs on another thread.")
} \ No newline at end of file
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index ab25ef25..5a9a4f3c 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -191,7 +191,6 @@ public fun addExportModel(
) {
/*
- * @author Mateusz
* Here is the entry point to KafkaComputeMonitor().
* With this setting, the simulator no longer writes to parquet files.
* To get back the original code, refer to https://github.com/atlarge-research/opendc
diff --git a/run.sh b/run.sh
deleted file mode 100755
index ed300fc2..00000000
--- a/run.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/usr/bin/zsh
-kafka-storage format -t 2vi2WtHxQAOPyXb1Bj1Jvw -c $CONFLUENT_HOME/etc/kafka/server.properties --standalone
-kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties
-schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
-connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/share/confluent-common/connectors/sink-jdbc.properties