这篇文章主要讲解了“Apache Pulsar启动了哪些服务”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Apache Pulsar启动了哪些服务”吧!
PulsarStandaloneStarter
在standalone模式下,主要启动了以下几个服务
PulsarService
PulsarAdmin
LocalBookeeperEnsemble
WorkerService
PulsarBrokerStarter.BrokerStarter
在普通模式下,启动了以下几个服务
PulsarService
BookieServer
AutoRecoveryMain
StatsProvider
WorkerService
简单说一些这几个服务
WorkerService: Pulsar function 相关,可以不启动
PulsarService: 主要的PulsarBroker相关
BookieServer: Bookeeper相关
AutoRecoveryMain: Bookeeper autorecovery相关
StatsProvider: Metric Exporter类似的功能
PulsarService.start
ProtocolHandlers
支持不同protocol处理(kafka协议等)
localZookeeperConnectionProvider
维护zk session 和zk连接
startZkCacheService
LocalZooKeeperCache => LocalZooKeeperCacheService
GlobalZooKeeperCache => ConfigurationCacheService
BookkeeperClientFactory
创建配置Bookkeeper 客户端
managedLedgerClientFactory
维护一个ManagedLedger的客户端,借用BookkeeperClient
BrokerService
这个是服务器的主要逻辑了,这个放在后面说
loadManager
收集集群机器负载,并根据负载情况均衡负载
startNamespaceService
NameSpaceService,管理放置的ResourceBundle,和LoadManager相关
schemaStorage
schemaRegistryService
上面2个都是和Schema相关的
defaultOffloader
LedgerOffloader,用来将Ledger(Bookkeeper)中的冷数据放到其他存储当中
WebService
webSocketService
http,websocket相关
LeaderElectionService
和LoadManager有关,如果是集中方式的话需要选出一个Leader定期根据集群情况进行均衡负载
transactionMetadataStoreService
事务相关
metricGenerator
metric相关
WorkerService
pulsar function 相关
public void start() throws Exception {
// producer id 分布式生成器
this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
pulsar.getConfiguration().getClusterName());
// 网络层配置
ServerBootstrap bootstrap = defaultServerBootstrap.clone();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
...
// 绑定端口
listenChannel = bootstrap.bind(addr).sync().channel();
...
// metric
this.startStatsUpdater(
serviceConfig.getStatsUpdateInitialDelayInSecs(),
serviceConfig.getStatsUpdateFrequencyInSecs());
// 启动了一堆需要定期执行的任务
this.startInactivityMonitor();
// 启动3个schedule任务分别检测
// 1. 长时间无效的topic
// 2. 长时间无效的producer(和message去重相关)
// 3. 长时间无效的subscription
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
this.startCheckReplicationPolicies();
// register listener to capture zk-latency
ClientCnxnAspect.addListener(zkStatsListener);
ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
顺着netty的初始化方式我们直接看ChannelInitializer,这里应该和Kafka类似进行处理请求的操作。
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("flowController", new FlowControlHandler());
ServerCnx cnx = new ServerCnx(pulsar);
ch.pipeline().addLast("handler", cnx);
connections.put(ch.remoteAddress(), cnx);
}
这个类的作用可以对标KafkaApis,处理各种Api请求
这个类实际上是一个ChannelHandler
继承了PulsarHandler(主要负责一些连接的keepalive逻辑)
PulsarHandler继承了 PulsarDecoder ( 主要负责序列化,反序列化Api请求)
PulsarDecoder实际上是一个 ChannelInboundHandlerAdapter
而PulsarAPi实际上是通过Pulsar.proto 生成的,这里编写了各种Api的定义
感谢各位的阅读,以上就是“Apache Pulsar启动了哪些服务”的内容了,经过本文的学习后,相信大家对Apache Pulsar启动了哪些服务这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。