summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt50
1 files changed, 50 insertions, 0 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
new file mode 100644
index 00000000..d0be10db
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
@@ -0,0 +1,50 @@
+package org.opendc.compute.simulator.telemetry
+
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.opendc.common.utils.Config
+import org.opendc.common.utils.Kafka
+import org.opendc.common.ProtobufMetrics
+
+import org.opendc.compute.simulator.telemetry.table.host.HostTableReader
+
+/**
+ * @author Mateusz
+ * This class logs data from the simulator into Kafka.
+ */
+public class KafkaComputeMonitor: ComputeMonitor {
+ private val metrics : MonitoringMetrics = MonitoringMetrics()
+ private val kafka : Kafka? = Config.getKafkaInstance()
+
+ @Override
+ override fun record(reader: HostTableReader) {
+ metrics.id += 1
+ metrics.timestamp = reader.timestamp.toEpochMilli()
+ metrics.tasksActive = reader.tasksActive
+ metrics.clusterName = reader.hostInfo.clusterName
+
+ try{
+ val packet = ProtobufMetrics.ProtoExport.newBuilder()
+ .setId(metrics.id)
+ .setTasksactive(metrics.tasksActive)
+ .build()
+ kafka?.send(packet)
+ }
+
+ catch(e: Exception){
+ println("${e.message}")
+ }
+ }
+
+}
+
+/**
+ * @author Mateusz
+ * This serves as editable data class for ObjectMapper().
+ */
+public class MonitoringMetrics {
+ public var id: Int = 0
+ public var timestamp: Long = 0
+ public var tasksActive : Int = 0
+ public var clusterName: String = ""
+}