温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

rabbitmq的事务机制

发布时间:2021-09-01 17:54:49 来源:亿速云 阅读:198 作者:chen 栏目:大数据

这篇文章主要介绍“rabbitmq的事务机制”,在日常操作中,相信很多人在rabbitmq的事务机制问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”rabbitmq的事务机制”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

rabbitmq事务机制:

    1:通过事务机制实现

         1:channel.txSelect()声明启动事务模式;

         2 :  channel.txComment()提交事务;

         3:channel.txRollback()回滚事务;

try {
    channel.txSelect(); // 声明事务
    // 发送消息
    channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    channel.txCommit(); // 提交事务
} catch (Exception e) {
    channel.txRollback();
} finally {
    channel.close();
    conn.close();
}

    2:通过发送方确认   publisher   confirm   机制实现。

Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。

        Confirm的三种实现方式:

        方式一:channel.waitForConfirms()普通发送方确认模式;

        方式二:channel.waitForConfirmsOrDie()批量确认模式;

        方式三:channel.addConfirmListener()异步监听发送方确认模式;

        方式一:普通Confirm模式

// 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(config.UserName);
        factory.setPassword(config.Password);
        factory.setVirtualHost(config.VHost);
        factory.setHost(config.Host);
        factory.setPort(config.Port);
        Connection conn = factory.newConnection();
// 创建信道
        Channel channel = conn.createChannel();
// 声明队列
        channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
        channel.confirmSelect();
        String message = String.format("时间 => %s", new Date().getTime());
        channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
        if (channel.waitForConfirms()) {
            System.out.println("消息发送成功" );
        }
        看代码可以知道,我们只需要在推送消息之前,channel.confirmSelect()声明开启发送方确认模式,再使用channel.waitForConfirms()等待消息被服务器确认即可。

        方式二:批量Confirm模式

// 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(config.UserName);
        factory.setPassword(config.Password);
        factory.setVirtualHost(config.VHost);
        factory.setHost(config.Host);
        factory.setPort(config.Port);
        Connection conn = factory.newConnection();
// 创建信道
        Channel channel = conn.createChannel();
// 声明队列
        channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            String message = String.format("时间 => %s", new Date().getTime());
            channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
        }
        channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
        System.out.println("全部执行完成");
        以上代码可以看出来channel.waitForConfirmsOrDie(),使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。

        方式三:异步Confirm模式

// 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(config.UserName);
        factory.setPassword(config.Password);
        factory.setVirtualHost(config.VHost);
        factory.setHost(config.Host);
        factory.setPort(config.Port);
        Connection conn = factory.newConnection();
// 创建信道
        Channel channel = conn.createChannel();
// 声明队列
        channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            String message = String.format("时间 => %s", new Date().getTime());
            channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
        }
//异步监听确认和未确认的消息
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("未确认消息,标识:" + deliveryTag);
            }
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
            }
        });

rabbitmq 消息分发

RabbitMQ 队列拥有多个消费者时 ,队列收到的消息将以轮询 (round-robin )的分发方发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展,而它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
        很多时候轮询的分发机制也不是那么优雅。默认情况下,如果有 个消费者,那么 RabbitM会将第 条消息分发给第 m%n (取余的方式)个消费者, RabbitMQ 不管消费者是否消费并己
        经确认 (Basic.Ack) 了消息。试想一下,如果某些消费者任务繁重,来不及消费那么多的消
        息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了
        所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。
        那么该如何处理这种情况呢?这里就要用到 channel.basicQos(int prefetchCoun这个方法,如前面章节所述, channel.basicQos 方法允许限制信道上的消费者所能保持的最未确认消息的数量。
        举例说明,在订阅消费队列之前,消费端程序调用了 channel.basicQos(5) ,之后阅了某个队列进行消费。 RabbitM 会保存一个消费者的列表,每发送一条消息都会为对应的费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。
        直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,
        直到再次到达计数上限。这种机制可以类比于 TCP!IP中的"滑动窗口"
        注意要点:
        Basic.Qos 的使用对于拉模式的消费方式无效.
        channel.basicQos 有三种类型的重载方法:
        (1) void basicQos(int prefetchCount) throws IOException;
        (2) void basicQos( nt prefetchCount , boo1ean globa1) throws IOExcepti(3) void basicQos(int prefetchSize , int prefetchCount , boo1ean global) IOException ;
        前面介绍的都只用到了 prefetchCount 这个参数,当 prefetchCount 设置 没有上限。还有 prefetchSize 这个参数表示消费者所能接收未确认消息的总体大小的上单位为 ,设置为 则表示没有上限。
        对于 个信道来说,它可以同时消费多个队列,当设置了 prefetchCount 大于 时信道需要和各个队列协调以确保发送的消息都没有超过所限定的 prefetchCount 的值,RabbitM 的性能降低,尤其是这些队列分散在集群中的多个 Broker 节点之中。Rabbit提升相关的性能,在 AMQPO-9-1 协议之上重新定义了 global 这个参数,对比如表 4- 所4-1 global 参数的对比
        global 参数
        false:信道上新的消费者需要遵从 prefetchCount 的限定值true:信道上所有的消费者都需要遵从 prefetchCount的限定值

rabbitmq 消息顺序消费:

        rabbitmq重发,死信队列,延时,及网络闪断都会造成生产顺序乱序,故不支持顺序消费。

        可以在业务上自定义排序值,当接受到的排序值与下一个预估要消费的值不一致是,就等待。

rabbitmq 消息可靠性保障:

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消传输保障分为三个层级。
{> At most once: 最多一次。消息可能会丢失,但绝不会重复传输
    At least once: 最少一次。消息绝不会丢失,但可能会重复传输。
    Exactly once: 恰好一次。每条消息肯定会被传输一次且仅传输一次。
    RabbitMQ 支持其中的"最多一次 "和"最少一次"。其中"最少 次"投递实现需要考以下这个几个方面的内容:
(1)消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消 息可以可靠地输到 RabbitMQ 中。
    (2) 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换路由到队列中,进而能够保存下来而不会被丢弃。
    3) 消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不• 90 • Rabbi{MQ 进阶
    造成消息丢失
            (4) 消费者在消费消息的同时需要将 autoAck 设置为 false ,然后通过手动确认的方式去
    确认己经正确消费的消息,以避免在消费端引起不必要的消息丢失。
    "最多 次"的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这
            样很难确保消息不会丢失
    "恰好 次"是 RabbitMQ 目前无法保障的。考虑这样一种情况,消费者在消费完一条消息
    之后向 RabbitMQ 发送确认 Basic.Ack 命令,此时由于网络断开或者其他原因造成 RabbitMQ
    并没有收到这个确认命令,那么 RabbitMQ 不会将此条消息标记删除。在重新建立连接之后,
    消费者还是会消费到这 条消息,这就造成了重复消费。再考虑 种情况,生产者在使用
    ublisher confirm 机制的时候,发送完 条消息等待 RabbitMQ 返回确认通知,此时网络断开,
    生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样 RabbitMQ 中就有两条同样
    的消息,在消费的时候,消费者就会重复消费
    那么 RabbitMQ 有没有去重的机制来保证"恰好一次"呢?答案是并没有,不仅是 RabbitMQ
    目前大多数 流的消息中间件都没有消息去重机制,也不保障"恰好 次"。去重处理 般是在
    业务客户端实现,比如引入 GUID (Globally Unique Identifier) 的概念。针对 GUID ,如果从客
    户端的角度去 ,那么 要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小也难以
    界定 建议在实际生产环境中,业务方根据自身的业务特性进行去重,比如业务消息本身具备
    等'性,或者借助 Redis 等其他产品进行去重处理。

到此,关于“rabbitmq的事务机制”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI