延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才需要这样的队列呢?
先看看一下业务场景:
通常解决以上问题,最简单直接的办法就是定时去扫表。
扫表存在的问题是:
延时队列能对于上述需求能很好的解决
调研了市场上一些开源的方案,以下:
2.github个人的:https://github.com/ouqiang/delay-queue
1.基于redis实现,redis只能配置一个,如果redis挂了整个服务不可用,可用性差点
2.消费端实现的是拉模式,接入成本大,每个项目都得去实现一遍接入代码
3.在star使用的人数不多,放在生产环境,存在风险,加之对go语言不了解,出了问题难以维护
基本以上原因打算自己写一个,平常使用php多,项目基本redis的zset结构作为存储,用php语言实现 ,实现原理参考了有赞团队:https://tech.youzan.com/queuing_delay/
总体架构
采用master-work架构模式,主要包括6个模块:
环境依赖:PHP 5.4+ 安装sockets,redis,pcntl,pdo_mysql 拓展
create database dq;
#存放告警信息
CREATE TABLE `dq_alert` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`host` varchar(255) NOT NULL DEFAULT '',
`port` int(11) NOT NULL DEFAULT '0',
`user` varchar(255) NOT NULL DEFAULT '',
`pwd` varchar(255) NOT NULL DEFAULT '',
`ext` varchar(2048) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
#存放redis信息
CREATE TABLE `dq_redis` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`t_name` varchar(200) NOT NULL DEFAULT '',
`t_content` varchar(2048) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
#存储注册信息
CREATE TABLE `dq_topic` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`t_name` varchar(1024) NOT NULL DEFAULT '',
`delay` int(11) NOT NULL DEFAULT '0',
`callback` varchar(1024) NOT NULL DEFAULT '',
`timeout` int(11) NOT NULL DEFAULT '3000',
`email` varchar(1024) NOT NULL DEFAULT '',
`topic` varchar(255) NOT NULL DEFAULT '',
`createor` varchar(1024) NOT NULL DEFAULT '',
`status` tinyint(4) NOT NULL DEFAULT '1',
`method` varchar(32) NOT NULL DEFAULT 'GET',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
在DqConf.php文件中修改php了路径 $logPath
命令:
php DqHttpServer.php --port 8088
访问:http://127.0.0.1:8088,出现配置界面
redis信息格式:host:post:auth 比如 127.0.0.1:6379:12345
php DqInit.php --port 6789
看到如下信息说明启动成功
<?php
include_once 'DqLoader.php';
date_default_timezone_set("PRC");
//可配置多个
$server=array(
'127.0.0.1:6789',
);
$dqClient = new DqClient();
$dqClient->addServer($server);
$topic ='order_openvip_checker'; //topic在后台注册
$id = uniqid();
$data=array(
'id'=>$id,
'body'=>array(
'a'=>1,
'b'=>2,
'c'=>3,
'ext'=>str_repeat('a',64),
),
//可选,设置后以这个通知时间为准,默认延时时间在注册topic的时候指定
'fix_time'=>date('Y-m-d 23:50:50'),
);
//添加
$boolRet = $dqClient->add($topic, $data);
echo 'add耗时:'.(msectime() - $time)."ms\n";
//查询
$time = msectime();
$result = $dqClient->get($topic, $id);
echo 'get耗时:'.(msectime() - $time)."ms\n";
//删除
$time = msectime();
$boolRet = $dqClient->del($topic,$id);
echo 'del耗时:'.(msectime() - $time)."ms\n";
执行php test.php
默认日志目录在项目目录的logs目录下,在DqConf.php修改$logPath
ps -ef | grep dq-master| grep -v grep | head -n 1 | awk '{print $2}' | xargs kill -USR2
需要安装pthreads拓展:
测试原理:使用多线程模拟并发,在1s内能成功返回请求成功的个数
php DqBench concurrency requests
concurrency:并发数
requests: 每个并发产生的请求数
测试环境:内存 8G ,8核cpu,2个redis和1个dq-server 部署在一个机器上,数据包64字节
qps:2400
如果调用通知接口在超时时间内,没有收到回复认为通知失败,系统会重新把数据放入队列,重新通知,系统默认最大通知10次(可以在Dqconf.php文件中修改$notify_exp_nums)通知间隔为2n+1,比如第一次1分钟,通知失败,第二次3分钟后,直到收到回复,超出最大通知次数后系统自动丢弃,同时发邮件通知
ps:网络抖动在所难免,通知接口如果涉及到核心的服务,一定要保证幂等!!
线上部署了两个实例每个机房部一个,4个redis作存储,服务稳定运行数月,各项指标均符合预期
主要接入业务:
项目地址: https://github.com/chenlinzhong/php-delayqueue
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。