小编给大家分享一下spring kakfa如何集成,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
1.1 kafka-producer.xml配置说明
<!-- spring的属性加载器,加载多个properties文件中的属性 ,
如果只有一个properties文件则用<context />就行了,用了这个加载器过后不用在其他xml中再使用了-->
<bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:/properties/kafka-producer.properties</value>
</list>
</property>
<property name="fileEncoding" value="utf-8" />
</bean>
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}" /> //kafka服务集群
<entry key="group.id" value="${group.id}" /> //分组
<entry key="retries" value="${retries}" /> //重试次数
<entry key="batch.size" value="${batch.size}" /> //批量数量
<entry key="linger.ms" value="${linger.ms}" />
<entry key="buffer.memory" value="${buffer.memory}" />
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg ref="producerProperties" />
</bean>
<!-- 创建kafkatemplate -->
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg index="0" ref="producerFactory" />
<constructor-arg index="1" value="true" />
<property name="defaultTopic" value="${defaultTopic}" /> //topic名称
</bean>
<bean id="kafkaProducerServer" class="com.rkhd.ienterprise.kafka.producer.KafkaProducerServer">
<property name="kafkaTemplate" ref="kafkaTemplate"/>
</bean>
1.2 kafka-producer.properties属性文件
bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
defaultTopic=topic-test
1.3 生产端接口封装说明:
1)类名:
com.rkhd.ienterprise.kafka.producer.KafkaProducerServer
2)方法:
/**
* 发送信息(不分区)
* @param data 要发送的数据
* @return 返回一个map。如果成功code为0,其他则为失败
*/
public Map<String, Object> sendDefault(Object data);
/**
* 发送信息(不分区)
* @param key 要发送的键
* @param data 要发送的数据
* @return 返回一个map。如果成功code为0,其他则为失败
*/
public Map<String, Object> sendDefault(Object key, Object data);
/**
* 发送信息(分区)
* @param partitionNum 分区数(大于1),请注意分区数是在topic创建的时候就指定了,不能改变了
* @param key 要发送的键
* @param data 要发送的数据
* @return 返回一个map。如果成功code为0,其他则为失败
*/
public Map<String, Object> sendDefault(int partitionNum, Object key, Object data);
/**
* 发送信息(不分区)
* @param topic 发送目的topic名称,如果topic为null或者是为"",则会使用xml中配置的defaultTopic
* @param data 要发送的数据
* @return 返回一个map。如果成功code为0,其他则为失败
*/
public Map<String, Object> sendMessage(String topic, Object data);
/**
* 发送信息(不分区)
* @param topic 发送目的topic名称,如果topic为null或者是为"",则会使用xml中配置的defaultTopic
* @param key 要发送的键
* @param data 要发送的数据
* @return 返回一个map。如果成功code为0,其他则为失败
* */
public Map<String, Object> sendMessage(String topic, Object key, Object data);
/**
* 发送信息(分区)
* @param topic 发送目的topic名称,如果topic为null或者是为"",则会使用xml中配置的defaultTopic
* @param partitionNum 分区数(大于1),请注意分区数是在topic创建的时候就指定了,不能改变了
* @param data 要发送的数据
* @return 返回一个map。如果成功code为0,其他则为失败
*/
public Map<String, Object> sendMessage(String topic, Integer partitionNum, Object data);
/**
* 发送信息(分区)
* @param topic 发送目的topic名称,如果topic为null或者是为"",则会使用xml中配置的defaultTopic
* @param key 要发送的键
* @param value 要发送的数据
* @param partitionNum 分区数(大于1),请注意分区数是在topic创建的时候就指定了,不能改变了
* @return 返回一个map。如果成功code为0,其他则为失败
* */
public Map<String, Object> sendMessage(String topic, int partitionNum, Object key, Object value);
2.1 kafka-consumer.xml配置说明
<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/> //kafka服务集群
<entry key="group.id" value="${group.id}"/> //分组
<entry key="enable.auto.commit" value="${enable.auto.commit}"/> //是否自动提交
<entry key="auto.commit.interval.ms" value="${auto.commit.interval.ms}"/> //自动提交间隔时间
<entry key="session.timeout.ms" value="${session.timeout.ms}"/> //session过期时间
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.rkhd.ienterprise.mq.client.consumer.generalFormula.GeneralFormulaConsumer"/>
<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="${topicName}"/>
<property name="messageListener" ref="messageListernerConsumerService"/>
<!--<property name="ackMode" value="COUNT"/> //手动提交模式分为三种:(1)模式COUNT:数量达到COUNT时提交;(2)模式TIME:时间达到TIME;(3)模式COUNT_TIME:数量达到COUNT或时间达到TIME是提交;
<property name="ackCount" value="90"/>-->
<!--<property name="ackMode" value="TIME"/>
<property name="ackTime" value="5000"/>-->
</bean>
<!-- 创建单实例KafkaMessageListenerContainer-->
<!--<bean id="messageListenerContainer_trade" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties_trade"/>
</bean>-->
<!-- 创建多实例ConcurrentMessageListenerContainer-->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart" >
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
<property name="concurrency" value="1"/> //配置消费端数量
</bean>
2.2 kafka-consumer.properties属性文件
bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
enable.auto.commit=false
auto.commit.interval.ms=1000
session.timeout.ms=15000
topicName=ahao-test
2.3 消费端接口封装说明
1)类名:com.rkhd.ienterprise.mq.client.consumer.client.KafkaConsumerClient
2)对外提供抽象方法(根据不同的业务实现):
public abstract void onConsumer(ConsumerRecord<String, String> record);
3)实现说明:各业务线通过继承该类实现该抽象方法;
3.1 Kafka的特性
高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
可扩展性:kafka集群支持热扩展
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
高并发:支持数千个客户端同时读写
3.2 Kafka架构组件
Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和 consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
topic:消息存放的目录即主题
Producer:生产消息到topic的一方
Consumer:订阅topic消费消息的一方
Broker:Kafka的服务实例就是一个broker
3.3 kafka 应用场景
日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理:比如spark streaming和storm
事件源
以上是“spring kakfa如何集成”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/jumpLee/blog/1529152