summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
authormjkwiatkowski <mati.rewa@gmail.com>2026-02-16 15:18:21 +0100
committermjkwiatkowski <mati.rewa@gmail.com>2026-02-16 15:18:21 +0100
commit2f16cb0f48eca4453e3e894b3d45a3aa09e6dcc0 (patch)
tree672d98baa2ac071f2c30de06d613254d0d8cd105 /opendc-compute
parent86d35fcec83057e346e4982b5a6908f25342a392 (diff)
feat: opendc -> kafka -> postgresql works; added protobuf encodingHEADmaster
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt50
1 files changed, 50 insertions, 0 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
new file mode 100644
index 00000000..d0be10db
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
@@ -0,0 +1,50 @@
+package org.opendc.compute.simulator.telemetry
+
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.opendc.common.utils.Config
+import org.opendc.common.utils.Kafka
+import org.opendc.common.ProtobufMetrics
+
+import org.opendc.compute.simulator.telemetry.table.host.HostTableReader
+
+/**
+ * @author Mateusz
+ * This class logs data from the simulator into Kafka.
+ */
+public class KafkaComputeMonitor: ComputeMonitor {
+ private val metrics : MonitoringMetrics = MonitoringMetrics()
+ private val kafka : Kafka? = Config.getKafkaInstance()
+
+ @Override
+ override fun record(reader: HostTableReader) {
+ metrics.id += 1
+ metrics.timestamp = reader.timestamp.toEpochMilli()
+ metrics.tasksActive = reader.tasksActive
+ metrics.clusterName = reader.hostInfo.clusterName
+
+ try{
+ val packet = ProtobufMetrics.ProtoExport.newBuilder()
+ .setId(metrics.id)
+ .setTasksactive(metrics.tasksActive)
+ .build()
+ kafka?.send(packet)
+ }
+
+ catch(e: Exception){
+ println("${e.message}")
+ }
+ }
+
+}
+
+/**
+ * @author Mateusz
+ * This serves as editable data class for ObjectMapper().
+ */
+public class MonitoringMetrics {
+ public var id: Int = 0
+ public var timestamp: Long = 0
+ public var tasksActive : Int = 0
+ public var clusterName: String = ""
+}