1 主题
主题名称结构的URL:
{persistent|non-persistent}://tenant/namespace/topic
名称组成 | 说明 |
---|---|
persistent / non-persistent | 持久和非持久 |
tenant | 租户 |
namespace | 命名空间 主题的管理单元,用作相关主题的分组机制。大多数主题配置是在命名空间级别执行的。每个租户可以有多个命名空间 |
topic | 名字的最后一部分 |
无需显式创建新主题
你不需要在Pulsar中显式地创建主题。如果客户端尝试向尚未存在的主题写入消息或接收消息,则Pulsar将自动在主题名称中提供的命名空间下创建该主题
2 命名空间
名称空间是租户中的逻辑术语。租户可以通过管理API创建多个名称空间。例如,具有不同应用程序的租户可以为每个应用程序创建单独的命名空间。命名空间允许应用程序创建和管理主题的层次结构。主题my tenant/app1是我的租户的应用程序app1的命名空间。可以在命名空间下创建任意数量的主题
3 订阅模式
Pulsar有三种订阅模式:独占(exclusive),共享(shared),故障转移(failover)
独占模式是默认订阅模式
共享模式的局限性
使用共享模式时有两件重要的事情需要注意:
无法保证消息排序。
不能将累积确认与共享模式一起使用。
4 密钥共享(Key_shared)
在密钥共享模式下,多个消息者可以同一订阅
密钥共享模式的限制
在使用密钥共享模式时,有两件重要的事情需要注意:
您需要为消息指定密钥或orderingKey
不能将累积确认与密钥共享模式一起使用
密钥共享订阅是一个测试版功能。您可以在broker.config禁用它(Pulsar 2.4.1)
5 多主题订阅
从Pulsar 1.23.0版开始,Pulsar用户可以同时订阅多个主题,两种方式
1)基于正则表达式(regex),例如persistent://public/default/finance-*
2)通过明确定义的topic列表
当通过regex订阅多个主题时,所有主题必须位于同一命名空间中
不能保证顺序性
当消费者订阅多个主题时,Pulsar通常就单个主题提供的所有订购保证都不成立。如果您的Pulsar用例涉及任何严格的订购要求,我们强烈建议您不要使用此功能
下面是一些Java的多主题订阅示例:
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
PulsarClient pulsarClient = // Instantiate Pulsar client object
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(allTopicsInNamespace)
.subscriptionName("subscription-1")
.subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(someTopicsInNamespace)
.subscriptionName("subscription-1")
.subscribe();
6分区主题
分区topic和普通topic,对于订阅模式如何工作,没有任何不同。分区只是决定了从生产者生产消息到消费者处理及确认消息过程中发生的事情。
分区topic需要通过admin API指定创建。创建的时候可以指明分区的数量。
路由模式
模式 | 说明 |
---|---|
RoundRobinPartition | 如果没有提供密钥,则生产者将以循环方式发布所有分区上的消息,以实现最大吞吐量。请注意,循环调度不是针对单个消息执行的,而是设置为批处理延迟的相同边界,以确保批处理有效。如果在消息上指定了密钥,则分区的生产者将散列该密钥并将消息分配给特定的分区。这是默认模式。 |
SinglePartition | 如果没有提供密钥,生产者将随机选择一个单独的分区并将所有消息发布到该分区中。如果在消息上指定了密钥,则分区的生产者将散列该密钥并将消息分配给特定的分区。 |
CustomPartition | 使用将被调用的自定义消息路由器实现来确定特定消息的分区。用户可以通过使用Java客户端并实现MessageRouter接口来创建自定义路由模式。 |
消息顺序
消息的顺序与消息路由模式和消息密钥有关。通常,用户需要按密钥分区保证排序。
如果消息附加了密钥,则在使用SinglePartition或RoundRobinPartition模式时,将根据哈希方案将消息路由到相应的分区
顺序规则 | 说明 | 路由模式和key |
---|---|---|
按key分发(Per-key-partition) | 所有具有相同key的消息都将按顺序排列并放置在同一个分区中。 | 使用SinglePartition或RoundRobinPartition模式,每个消息都提供key |
按生产者分发(Per-producer) | 来自同一个生产者的所有消息都将按顺序排列 | 使用SinglePartition模式,并且没有为每个消息提供key |
7 非持久性主题
格式如下:
non-persistent://tenant/namespace/topic
生产者和消费者可以以与持久主题相同的方式连接到非持久主题,关键的区别在于主题名称必须以非持久主题开头。所有三种订阅模式(独占、共享和故障转移)都支持非持久性主题。
客户端API
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
String npTopic = "non-persistent://public/default/my-topic";
String subscriptionName = "my-subscription-name";
消费
Consumer<byte[]> consumer = client.newConsumer()
.topic(npTopic)
.subscriptionName(subscriptionName)
.subscribe();
生产
Producer<byte[]> producer = client.newProducer()
.topic(npTopic)
.create();
8 消息保留和过期
默认情况下,Pulsar消息服务器:
立即删除消费者已确认的所有消息,以及
将所有未确认的消息持久存储在消息待办事项中。
但是,Pulsar有两个特性,使您能够覆盖此默认行为:
消息保留使您能够存储已由消费者确认的消息
消息过期使您能够为尚未确认的消息设置生存时间(TTL)
9 消息去重
当消息被Pulsar持久化多于一次的时候,会发生数据重复。消息去重是Pulsar可选的特性,阻止不必要的消息重复,每条消息仅处理一次
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。