diff options
Diffstat (limited to 'opendc-compute')
| -rw-r--r-- | opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt new file mode 100644 index 00000000..d0be10db --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt @@ -0,0 +1,50 @@ +package org.opendc.compute.simulator.telemetry + + +import com.fasterxml.jackson.databind.ObjectMapper +import org.opendc.common.utils.Config +import org.opendc.common.utils.Kafka +import org.opendc.common.ProtobufMetrics + +import org.opendc.compute.simulator.telemetry.table.host.HostTableReader + +/** + * @author Mateusz + * This class logs data from the simulator into Kafka. + */ +public class KafkaComputeMonitor: ComputeMonitor { + private val metrics : MonitoringMetrics = MonitoringMetrics() + private val kafka : Kafka? = Config.getKafkaInstance() + + @Override + override fun record(reader: HostTableReader) { + metrics.id += 1 + metrics.timestamp = reader.timestamp.toEpochMilli() + metrics.tasksActive = reader.tasksActive + metrics.clusterName = reader.hostInfo.clusterName + + try{ + val packet = ProtobufMetrics.ProtoExport.newBuilder() + .setId(metrics.id) + .setTasksactive(metrics.tasksActive) + .build() + kafka?.send(packet) + } + + catch(e: Exception){ + println("${e.message}") + } + } + +} + +/** + * @author Mateusz + * This serves as editable data class for ObjectMapper(). + */ +public class MonitoringMetrics { + public var id: Int = 0 + public var timestamp: Long = 0 + public var tasksActive : Int = 0 + public var clusterName: String = "" +} |
