这篇文章主要讲解了“Kafka的基本原理是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Kafka的基本原理是什么”吧!
1、什么是Kafka?
Kafka是一个使用Scala编写的消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司作为多种类型的数据管道和消息系统使用。
Kafka是一种分布式的,基于发布/订阅的消息系统。
Kafka使用zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。同时借助zookeeper,kafka能够生产者、消费者和broker在内的所以组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。
2、kafka的特性
(1)以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
(2)高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
(3)支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序存储和传输。
(4)同时支持离线数据处理(Offline)和实时数据处理(Online)。
(5)Scale out:支持在线水平扩展。无需停机即可扩展机器。
(6)支持定期删除数据机制。可以按照时间段来删除,也可以按照文档大小来删除。
(7)Consumer采用pull的方式消费数据,消费状态由Consumer控制,减轻Broker负担。
3、Kafka架构
(1)Broker:和RabbitMQ中的Broker概念类似。一个kafka服务器就是一个Broker,而一个kafka集群包含一个或多个Broker。Broker会持久化数据到相应的Partition中,不会有cache压力。
(2)Topic:主题。每条消息都有一个类别,这个类别就叫做Topic。Kafka的Topic可以理解为RabbitMQ的Queue消息队列,相同类别的消息被发送到同一个Topic中,然后再被此Topic的Consumer消费。Topic是逻辑上的概念,而物理上的实现就是Partition。
(3)Partition:分区。分区是物理上的概念,每个Topic包含一个或者多个Partition,每个Partition都是一个有序队列。发送给Topic的消息经过分区算法(可以自定义),决定消息存储在哪一个Partition当中。每一条数据都会被分配一个有序id:Offset。注意:kafka只保证按一个partition中的顺序将消息发给Consumer,不保证一个Topic的整体(多个partition间)的顺序。
(4)Replication:备份。Replication是基于Partition而不是Topic的。每个Partition都有自己的备份,且分布在不同的Broker上。
(5)Offset:偏移量。kafka的存储文件都是用offset来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.log的文件即可。当然the first offset就是00000000000.log。注意:每个Partition中的Offset都是各不影响的从0开始的有序数列。
(6)Producer:消息生产者。
(7)Consumer:消息消费者。Consumer采用pull的方式从Broker获取消息,并且Consumer要维护消费状态,因此Kafaka系统中,业务重心一般都在Consumer身上,而不像RabbitMQ那样Broker做了大部分的事情。
(8)Consumer Group:消费组。每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。每个Topic可以被多个Group订阅,每个Group中可以有多个Consumer。发送到Topic的一条消息会被每个Group中的一个Consumer消费,多个Consumer之间将交错消费整个Topic的消息,实现负载均衡。
(9)Record:消息。每一个消息由一个Key、一个Value和一个时间戳构成。
Kafka内部结构图(图片源于网络)
Kafka拓扑结构图(图片源于网络)
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹。partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
(1)每个partition目录相当于一个巨型文件被平均分配到多个大小相等segment数据文件中。但每个segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
(2)每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
(3)segment file组成:由2大部分组成,分别为index file(后缀“.index”)和data file(后缀“.log”),此2个文件一一对应,成对出现。
(4)segment文件命名规则:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
Segment file结构图(图片来源于网络)
以上述图2中一对segment file文件为例,说明segment中index和log文件对应关系物理结构如下:
Kafka集群Partition分布图1(图片来源于网络)
当集群中新增2节点,Partition增加到6个时分布情况如下:
Kafka集群Partition分布图2(图片来源于网络)
在Kafka集群中,每个Broker都有均等分配Leader Partition机会。
上述图Broker Partition中,箭头指向为副本,以Partition-0为例:broker1中parition-0为Leader,Broker2中Partition-0为副本。每个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配,多副本都遵循此规则。
副本分配算法:
(1)将所有n个Broker和待分配的i个Partition排序。
(2)将第i个Partition分配到第(i mod n)个Broker上。
(3)将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上
例如图2中的第三个Partition:partition-2,将被分配到Broker3((3 mod 6)=3)上,partition-2的副本将被分配到Broker4上((3+1) mod 6=4)。
(1)Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。可以设置segment文件大小定期删除和消息过期时间定期删除
(2)通过索引信息可以快速定位message。
(3)通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
(4)通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
对于多个Partition,多个Consumer
(1)如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数。
(2)如果consumer比partition少,一个consumer会对应于多个partition,这里要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目。
(3)如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
(4)增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
(5)High-level接口中获取不到数据的时候是会block的。
关于zookeeper中Offset初始值的问题:
Zookeeper中Offset的初始值默认是非法的,因此通过设置Consumer的参数auto.offset.reset来告诉Consumer读取到Offset非法时该怎么做。
auto.offset.reset有三个值:
(1)smallest : 自动把zookeeper中的offset设为Partition中最小的offset;
(2)largest : 自动把zookeeper中offset设为Partition中最大的offset;
(3)anything else: 抛出异常;
auto.offset.reset默认值是largest,此种情况下如果producer先发送了10条数据到某个Partition,然后Consumer启功后修改zookeeper中非法Offset值为Partition中的最大值9(Offset从0开始),这样Consumer就忽略了这10条消息。就算现在再次设置成smallest也读取不到之前的10条数据了,因为此时Offset是合法的了。
所以,想要读取之前的数据,就需要在一开始指定auto.offset.reset=smallest。
Replication是基于Partition而不是Topic的。每个Partition都有自己的备份,且分布在不同的Broker上。这些Partition当中有一个是Leader,其他都是Follower。Leader Partition负责读写操作,Follower Partition只负责从Leader处复制数据,使自己与Leader保持一致。Zookeeper负责两者间的故障切换(fail over,可以理解为Leader选举)。
消息复制延迟受最慢的Follower限制,Leader负责跟踪所有Follower的状态,如果Follower“落后”太多或者失效,Leader就将此Follower从Replication同步列表中移除,但此时Follower是活着的,并且一直从Leader拉取数据,直到差距小于replica.lag.max.messages值,然后重新加入同步列表。当一条消息被所有的Follower保存成功,此消息才被认为是“committed”,Consumer才能消费这条消息。这种同步方式就要求Leader和Follower之间要有良好的网络环境。
一个partition的follower落后于leader足够多时,会被认为不在同步副本列表或处于滞后状态。在Kafka-0.8.2.x中,副本滞后判断依据是副本落后于leader最大消息数量(replica.lag.max.messages)或replication响应Leader partition的最长等待时间(replica.lag.time.max.ms)。前者是用来检测缓慢的副本,而后者是用来检测失效或死亡的副本。假设replica.lag.max.messages设置为4,表明只要follower落后leader的消息数小于4,就不会从同步副本列表中移除。replica.lag.time.max设置为500 ms,表明只要follower向leader发送拉取数据请求时间间隔超过500 ms,就会被标记为死亡,并且会从同步副本列表中移除。
当Leader处于流量高峰时,比如一瞬间就收到了4条数据,此时所有Follower将被认为是“out-of-sync”并且从同步副本列表中移除,然后Follower拉取数据赶上Leader过后又重新加入同步列表,就这样Follower频繁在副本同步列表移除和重新加入之间来回切换。
即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可(注意:不同于其他分布式存储,比如hbase需要"多数派"存活才行)。
当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower。kafka中leader选举并没有采用"投票多数派"的算法,因为这种算法对于"网络稳定性"/"投票参与者数量"等条件有较高的要求,而且kafka集群的设计,还需要容忍N-1个replicas失效。对于kafka而言,每个partition中所有的replicas信息都可以在zookeeper中获得,那么选举leader将是一件非常简单的事情。选择follower时需要兼顾一个问题,就是新leader 所在的server服务器上已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力。在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader。在整个集群中,只要有一个replicas存活,那么此partition都可以继续接受读写操作。
当一个Group中,有Consumer加入或者离开时,会触发Partitions均衡。均衡的最终目的,是提升Topic的并发消费能力。
(1)假如topic1,具有如下partitions: P0,P1,P2,P3
(2)加入group中,有如下consumer: C0,C1
(3)首先根据partition索引号对partitions排序: P0,P1,P2,P3
(4)根据consumer.id排序: C0,C1
(5)计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
(6)然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
通过此算法,就能知道具体Consumer消费的是哪个分区中的数据。
在kafka-Client-0.11.0.0.jar中,提供的有默认的KafkaProducer和DefaultPartitioner实现。其中DefaultPartitioner主要提供了Producer发送消息到分区的路由算法,如果给定Key值,就通过Key的哈希值和分区个数取余来计算;如果没有给定Key,就通过ThreadLocalRandom.current().nextInt()产生的随机数与分区数取余(其中涉及复杂步奏参考如下代码)。具体代码如下:
public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<string, atomicinteger=""> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<string,> configs) {} /** * 计算给定记录的分区 * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<partitioninfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<partitioninfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } public void close() {} }
我们也可以设置自己的Partition路由规则,需要继承Partitioner类实现
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
Kafka的消息delivery保证主要有三种:
(1)At most once 最多一次。消息可能会丢失,但绝不会重复传输。
(2)At least once 最少一次。消息绝不会丢失,但可能会重复传输。
(3)Exactly once 正好一次。每条消息正好被传输一次和消费一次。
Producer的delivery保证可以通过参数request.required.acks设置来保证:
(1)request.required.acks=0。
相当于消息异步发送。消息一发送完毕马上发送下一条。由于不需要ack,可能会造成数据丢失,相当于实现了At most once。
(2)request.required.acks=1。
消息发送给Leader Partition,在Leader Partition确认消息并ack 生产者过后才发下一条。
(3)request.required.acks=-1。
消息发送给Leader,在Leader收到所有Follower确认保存消息的ack后对producer进行ack才发送下一条。
所以一条消息从Producer到Broker至少是确保了At least once的,因为有Replication的存在,只要消息到达Broker就不会丢失。如果ack出现问题,比如网络中断,有可能会导致producer收不到ack而重复发送消息。Exactly once这种方式,没有查到相关的实现。
第(3)种方式的具体步奏如下:
a. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
b. producer 将消息发送给该 leader
c. leader 将消息写入本地 log
d. followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
e. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
Consumer从Broker拉取数据过后,可以选择commit,此操作会在zookeeper中存下此Consumer读取对应Partition消息的Offset,以便下一次拉取数据时会从Partition的下一个Offset消费,避免重复消费。
同样,Consumer可以通过设置参数enable.auto.commit=true来自动确认消息,即Consumer一收到消息立刻自动commit。如果只看消息的读取过程,kafka是确保了Exactly once的,但是实际情况中Consumer不可能读取到数据就结束了,往往还需要处理读取到的数据。因此Consumer处理消息和commit消息的顺序就决定了delivery保证的类别。
(1)先处理后commit
这种方式实现了At least once。Consumer收到消息先处理后提交,如果在处理完成后机器崩溃了,导致Offset没有更新,Consumer下次启动时又会重新读取上一次消费的数据,实际上此消息已经处理过了。
(2)先commit后处理
这种方式实现了At most once。Consumer收到消息过后立刻commit,更新zookeeper上的Offset,然后再处理消息。如果处理还未结束Consumer崩溃了,等Consumer再次启动的时候会读取Offset更新过后的下一条数据,这就导致了数据丢失。
Kafka提供了两种Consumer API,选用哪种API需要视具体情况而定。
High Level Consumer API围绕着Consumer Group这个逻辑概念展开,它屏蔽了每个Topic的每个Partition的Offset管理(自动读取zookeeper中该Partition的last offset )、Broker失败转移以及增减Partition、Consumer时的负载均衡(当Partition和Consumer增减时,Kafka自动进行Rebalance)。
Low Level Consumer API,作为底层的Consumer API,提供了消费Kafka Message更大的控制,用户可以实现重复读取、跳读等功能。
使用Low Level Consumer API,是没有对Broker、Consumer、Partition增减进行处理,如果出现这些的增减时,需要自己处理负载均衡。
Low Level Consumer API提供更大灵活控制是以增加复杂性为代价的:
(1)Offset不再透明
(2)Broker自动失败转移需要处理
(3)增加Consumer、Partition、Broker需要自己做负载均衡
感谢各位的阅读,以上就是“Kafka的基本原理是什么”的内容了,经过本文的学习后,相信大家对Kafka的基本原理是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。