温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

MQ怎么将多消息合并为一条消息发送

发布时间:2021-12-22 11:24:33 来源:亿速云 阅读:162 作者:iii 栏目:大数据

这篇文章主要介绍“MQ怎么将多消息合并为一条消息发送”,在日常操作中,相信很多人在MQ怎么将多消息合并为一条消息发送问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”MQ怎么将多消息合并为一条消息发送”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

为什么要将多消息合并为一个消息发送?

前面也说了,为了节约成本。以每分钟50w的广告点击数来算,一个月将产生50*60*24*31w的点击消息,再乘以3就是每个月的sqs请求数,3代表的是发送消息、拉取消息、删除消息,按每100w请求0.4美刀的价格计算大概一个月要26784美刀。

由于sqs限制单条消息的大小最大为256k,根据业务场景估算每点击消息也不可能达到1k,,所以我将256个请求合并为一个消息发送,或者1s内未达到256个消息也合并为一个消息发送,这样每月的费用可以直接除以256,这不是一个小数目。

什么样的业务场景下才适合这么干?

将大量消息合并为一个消息后会导致消息消费失去原子性。你无法保证原本是256个消息的合并为一个消息后,这256个消息能全部消费成功或者全部消费失败,因此要求业务必须允许消息消费失败直接丢弃的情况。无论多少个成功多少个失败,都需要将整条消息从mq中删除。笔者考虑过这个问题才决定是否要这样做的,也考虑过失败重试的问题,但我觉得没必要为这种概率买单,因为一个点击在非异步的情况下,失败就是失败了。

如何将大量消息合并为一条消息发送而不影响服务的高并发性能呢?

其实不影响是不存在的,只是让影响变得微弱。经过长时间的观察,我了解该高并发服务对内存的消耗并不高,最大qps下也就消耗1.5g左右的堆内存,而netty使用的直接内存大概在2g这样,对于2核8g的机器,有足够多的内存来实现队列缓存消息。

我借签Dubbo的客户端与服务端配置多个连接时使用轮询方式使用连接,同时也借签了netty的EventLoop的设计,实现消息合并发送。我定义一个MesaageLoopGroup,一个MesaageLoopGroup可以配置有多少个MesaageLooper,而每个MesaageLooper就是一个线程,且维护一个阻塞队列,默认队列大小是102400,这个数字是我配置单个进程所能打开的最大文件句柄数。

当往MesaageLoopGroup push一个点击消息时,先用原子类自增1与MesaageLooper数组的长度取余,选出一个MesaageLooper。然后再将消息push到这个MesaageLooper的阻塞队列。

MQ怎么将多消息合并为一条消息发送

每个MesaageLooper的run方法实现的就是一个死循环,从阻塞队列中拿消息,当消息等于256时,或者阻塞超过1s就将拿到的消息合并成一个消息发送到mq。如果阻塞队列满,那么push会直接将消息发送到mq。因此,服务重启时如果使用kill 9强行结束进程,至多只会有1s的数据丢失。设置1s还有一个原因就是控制消息的实时性。

灰度上线测试一天后也证明此方案对服务的影响并不大,无论是gc还是内存占用,都看不出加了这么一层逻辑。1s的平均请求按50w计算,四台机器分担,每个服务的每秒请求数平均是2000。

为何用golang实现消费者?

然而消息的消费并不顺利。一个是因为消息消费我用了golang实现,我也是刚入门,写起代码来还感觉别扭,二是一个消息是由原本256个消息组合而成的问题。

使用golang其实是有原因的。原本计划是让消费者占用较小的内存,以实现将消费者寄生在其它服务所在的机器上,充分利用其它耗内存而cpu利用率低的服务所在的机器。同时利用docker实现快速部署,让docker 的镜像更小,不需要安装jdk什么的。还有就是利用go的协程并发处理能力吧,让消费者消费消息的速度能赶上消息的产生速度。

为入门golang买单

为了便于理解,我还是以java的线程池来说明。假设我配置的线程池线程数量是512。寄生在其它服务的机器上需要给主人点面子,不能把人家的cpu全部吃完,导致主服务不可用,所以线程的数量结合消息的消费情况综合考虑,不能超过一半的cpu使用率,而选择512这个数量。

Sqs支持一次拉取多条消息,并且有一个可见性超时的特性,当消息被消费者拉取到之后,在多长时间内未删除,下次可能还会被拉取到,或者其它消费者还能拉取到。最初我设置的可见性超时是60s。

MQ怎么将多消息合并为一条消息发送

一开始我开启5个线程拉取消息,每次最多拉取10条消息。那么很可能同一时间内会拉取到50条消息。由于一条消息是由原本256条消息合并而成的,所以512个线程同一时间段至多只能消费2条消息,而一条消息(合并后的)的消费平均耗时是10s,也就是说一分钟内最多消费12条消息,其它38条消息在一分钟后会被其它消费者拉取到,所以就会出现大量消息重复消费的情况,久而久之,消息越积累越多。

我用golang的channel实现生产者与消费者,channel的大小可设置,当channel满时,拉取到的消息是放不进channel的,因此会将拉取线程阻塞住,只有消费者从 channel取数据才能继续放入。但阻塞的那段时间要小于消息的可见性超时,因为消息只有在开始消费时我才会将其从mq中删除。

后面的改进就是根据消费能力去调整消息的拉取线程数,以及每次拉取的消息数。还有一点要注意,为保证时刻有消息准备就绪开始消费,最好不要让消息消费完再从mq中拉取。但这也会导致另一个问题,一些消息拉取到本地后,由于channel已满,放不进,而其它空闲消费节点又拉不到,导致消息被消费到的时间延长。这就需要作出取舍。

到此,关于“MQ怎么将多消息合并为一条消息发送”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

mq
AI