温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

kafka流量监控的原理及实现方法是什么

发布时间:2021-11-15 14:51:04 来源:亿速云 阅读:299 作者:柒染 栏目:大数据

本篇文章给大家分享的是有关kafka流量监控的原理及实现方法是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

工程能力

作为一个优秀的开发人员,项目开发的过程中监控告警系统的可靠性是可以体现出一个人的工程管理能力的。优秀的监控告警系统可以免去很多精力消耗,比如维护,故障预判,故障及时准确通知,故障定位排查等。

可以想像项目上线后,假如没有监控告警系统,这么一个暗箱是多么可怕。

对于大数据项目,数据一般需要先入消息队列,如kafka,然后分离线和实时将数据进行解耦分流,用于实时处理和离线处理。消息队列存在的好处:

  • 消息队列的订阅者可以根据需要随时扩展,可以很好的扩展数据的使用者。


  • 消息队列的横向扩展,增加吞吐量,做起来还是很简单的。这个用传统数据库,分库分表还是很麻烦的。


  • 由于消息队列的存在,也可以帮助我们抗高峰,避免高峰时期后端处理压力过大导致整个业务处理宕机。

kafka在大数据项目中作用至关重要,那么对其的监控告警就至关重要了,我们这里主要是讲针对kafka流量的监控告警,其目的也是很明显的便于我们了解数据的整体情况及波动情况,以调整处理后端,如spark streaming,flume等。

kafka 监控工具很多,常见的有kafka manager,KafkaOffsetMonitor,kafka eagle,kafka tools等,浪尖最经常使用的是kafka manager,也建议大家使用该工具,其不仅有监控功能还有管理功能。具体使用方法可以参看:

kafka管理神器-kafkamanager

监控指标

kafka的指标服务器和客户端都有的。具体指标内容,可以参看kafka官网:

http://kafka.apache.org/0102/documentation.html#monitoring

查看可用指标的最简单方法是启动jconsole并将其指向正在运行的kafka客户端或服务器; 这将允许使用JMX浏览所有指标。

对于熟悉kafka manager的朋友都应该看过broker相关信息,比如每秒钟的流入的消息条数,每秒钟的流入的消息大小,流出的消息大小等。

使用kafka manager可以很方便的查看。但是,这其实不能让我们及时的发现数据流量波动,或者说我们想画个曲线的详细对比历史流量,它是做不到的。所以,我们要想办法去获取出来这些指标,然后做我们自己的展示。还有一点就是,流量波动告警。

浪尖这里只做了图中几个指标的接口:

   
     
   
   
   
def getBytesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {    getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesInPerSec", topicOption)  }
 def getBytesOutPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {    getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesOutPerSec", topicOption)  }
 def getBytesRejectedPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {    getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesRejectedPerSec", topicOption)  }
 def getFailedFetchRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {    getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedFetchRequestsPerSec", topicOption)  }
 def getFailedProduceRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {    getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedProduceRequestsPerSec", topicOption)  }
 def getMessagesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {    getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "MessagesInPerSec", topicOption)  }
           

jmx客户端

连接jmx的server是可以使用jconsole,但是满足不了我们的需求。所以,我们使用JMXConnectorFactory 方式连接jmx。使用JMXConnectorFactory 链接jmx时,JMXServiceURL 的参数 url 必须使用 service:jmx 方式进行连接,具体链接创建方式很简单,几行代码而已,如下:


val jmxHost = "hostname"val jmxPort = 9999val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/jmxrmi"val url = new JMXServiceURL(urlString)val jmxc = JMXConnectorFactory.connect(url )
val mbsc = jmxc.getMBeanServerConnection;
println(KafkaMetrics.getMessagesInPerSec(Kafka_0_10_2_1,mbsc,Some("test")).fifteenMinuteRate)jmxc.close()

开启kafka的jmx端口

kafka的jmx服务默认时关闭的,开启的话很简单,只需要在kafka server的启动脚本kafka-server-start.sh里增加一行代码即可,内容export JMX_PORT="9999",增加位置如下:

   
     
   
   
   
if [ "x$KAFKA_HEAP_OPTS" = "x"]; then
  export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
  export JMX_PORT="9999"
fi
           

测试

我这里测试就比较简单了,主要是将消息条数打出来,大家可以根据需要自行调整,比如均值大于阈值发短信告警等。

kafka流量监控的原理及实现方法是什么

一套完整的kafka监控,包括:

  1. 消费者监控,主要是存活告警,消费滞后告警。

  2. 生产者监控,主要是存活告警,生产者消费上游数据能力告警。

  3. broker监控,主要是存活告警,流量告警,isr列表,topic异常告警,control变换告警。

以上就是kafka流量监控的原理及实现方法是什么,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI