温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Apache Pulsar启动了哪些服务

发布时间:2021-12-24 10:34:28 阅读:274 作者:iii 栏目:大数据

这篇文章主要讲解了“Apache Pulsar启动了哪些服务”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Apache Pulsar启动了哪些服务”吧!

1.启动入口

PulsarStandaloneStarter
在standalone模式下,主要启动了以下几个服务

  1. PulsarService

  2. PulsarAdmin

  3. LocalBookeeperEnsemble

  4. WorkerService

PulsarBrokerStarter.BrokerStarter
在普通模式下,启动了以下几个服务

  1. PulsarService

  2. BookieServer

  3. AutoRecoveryMain

  4. StatsProvider

  5. WorkerService

简单说一些这几个服务

  • WorkerService: Pulsar function 相关,可以不启动

  • PulsarService: 主要的PulsarBroker相关

  • BookieServer: Bookeeper相关

  • AutoRecoveryMain: Bookeeper autorecovery相关

  • StatsProvider: Metric Exporter类似的功能

2. PulsarService

PulsarService.start

  1. ProtocolHandlers
    支持不同protocol处理(kafka协议等)

  2. localZookeeperConnectionProvider
    维护zk session 和zk连接

  3. startZkCacheService

    • LocalZooKeeperCache => LocalZooKeeperCacheService

    • GlobalZooKeeperCache => ConfigurationCacheService

  4. BookkeeperClientFactory
    创建配置Bookkeeper 客户端

  5. managedLedgerClientFactory
    维护一个ManagedLedger的客户端,借用BookkeeperClient

  6. BrokerService
    这个是服务器的主要逻辑了,这个放在后面说

  7. loadManager
    收集集群机器负载,并根据负载情况均衡负载

  8. startNamespaceService
    NameSpaceService,管理放置的ResourceBundle,和LoadManager相关

  9. schemaStorage

  10. schemaRegistryService
    上面2个都是和Schema相关的

  11. defaultOffloader
    LedgerOffloader,用来将Ledger(Bookkeeper)中的冷数据放到其他存储当中

  1. WebService

  2. webSocketService
    http,websocket相关

  3. LeaderElectionService
    和LoadManager有关,如果是集中方式的话需要选出一个Leader定期根据集群情况进行均衡负载

  4. transactionMetadataStoreService
    事务相关

  5. metricGenerator
    metric相关

  6. WorkerService
    pulsar function 相关

3. BrokerService

public void start() throws Exception {
        // producer id 分布式生成器
        this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
                pulsar.getConfiguration().getClusterName());

        // 网络层配置
        ServerBootstrap bootstrap = defaultServerBootstrap.clone();

        ServiceConfiguration serviceConfig = pulsar.getConfiguration();

        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
        ...
        // 绑定端口
        listenChannel = bootstrap.bind(addr).sync().channel();
        ...

       // metric
        this.startStatsUpdater(
                serviceConfig.getStatsUpdateInitialDelayInSecs(),
                serviceConfig.getStatsUpdateFrequencyInSecs());

       // 启动了一堆需要定期执行的任务
        this.startInactivityMonitor();
       // 启动3个schedule任务分别检测
       // 1. 长时间无效的topic
       // 2. 长时间无效的producer(和message去重相关)
       // 3. 长时间无效的subscription
        this.startMessageExpiryMonitor();
        this.startCompactionMonitor();
        this.startMessagePublishBufferMonitor();
        this.startConsumedLedgersMonitor();
        this.startBacklogQuotaChecker();
        this.updateBrokerPublisherThrottlingMaxRate();
        this.startCheckReplicationPolicies();

        // register listener to capture zk-latency
        ClientCnxnAspect.addListener(zkStatsListener);
        ClientCnxnAspect.registerExecutor(pulsar.getExecutor());

4. PulsarChannelInitializer

顺着netty的初始化方式我们直接看ChannelInitializer,这里应该和Kafka类似进行处理请求的操作。

protected void initChannel(SocketChannel ch) throws Exception {
        
        ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
     
        ch.pipeline().addLast("frameDecoder"new LengthFieldBasedFrameDecoder(
            brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0404));

        ch.pipeline().addLast("flowController"new FlowControlHandler());
        ServerCnx cnx = new ServerCnx(pulsar);
        ch.pipeline().addLast("handler", cnx);

        connections.put(ch.remoteAddress(), cnx);
    }

5. ServerCnx

这个类的作用可以对标KafkaApis,处理各种Api请求
这个类实际上是一个ChannelHandler
继承了PulsarHandler(主要负责一些连接的keepalive逻辑)
PulsarHandler继承了 PulsarDecoder ( 主要负责序列化,反序列化Api请求)
PulsarDecoder实际上是一个 ChannelInboundHandlerAdapter

而PulsarAPi实际上是通过Pulsar.proto 生成的,这里编写了各种Api的定义

感谢各位的阅读,以上就是“Apache Pulsar启动了哪些服务”的内容了,经过本文的学习后,相信大家对Apache Pulsar启动了哪些服务这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI