背景
在开发中,往往会遇到一些关于延时任务的需求。例如
•生成订单30分钟未支付,则自动取消
•生成订单60秒后,给用户发短信
对上述的任务,我们给一个专业的名字来形容,那就是延时任务。
最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下。
实现过程
说到java中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下几点:
1、ScheduledThreadPoolExecutor决定了用ScheduledThreadPoolExecutor来进行实现,接下来就是代码编写啦(大体流程代码)。
主要的延时实现如下:
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new ThreadPoolExecutor.AbortPolicy()); //从消息中取出延迟时间及相关信息的代码略 int delayTime = 0; executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { //具体操作逻辑 }},0,delayTime, TimeUnit.SECONDS);
其中NamedThreadFactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就不多做介绍了。拒绝策略也是采用默认的拒绝策略。
然后测试了一下,满足目标需求的功能,可以做到延迟指定时间后执行,至此似乎功能就被完成了。
大家可能疑问,这也太简单了有什么好说的,但是这种方式实现简单是简单但是存在一个潜在的问题,问题在哪呢,让我们看一下ScheduledThreadPoolExecutor的源码:
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
ScheduledThreadPoolExecutor由于它自身的延时和周期的特性,默认使用了DelayWorkQueue,而并不像我们平时使用的SingleThreadExecutor等构造是可以使用自己定义的LinkedBlockingQueue并且设置队列大小,问题就出在这里。
DelayWrokQueue是一个无界队列,而我们的目标数据源是kafka,也就是一个高并发高吞吐的消息队列,很大可能在某一时间段有大量的消息过来从而导致OOM,在使用多线程时我们是肯定要考虑到OOM的可能性的,因为OOM带来的后果往往比较严重,系统OOM临时的解决办法一般只能是重启,可能会导致用户数据丢失等不可能挽回的问题,所以从编码设计阶段要采用尽可能稳妥的手段来避免这些问题。
2、采用redis和线程结合
这一次换了思路,采用redis来帮助我们做缓冲,从而避免消息过多OOM的问题。
相关redis zset api:
//添加元素 ZADD key score member [[score member] [score member] …] //根据分值及限制数量查询 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] //从zset中删除指定成员 ZREM key member [member …]
我们采用redis基础数据结构的zset结构,采用score来存储我们目标发送时间的数值,整体处理流程如下:
public void onMessage(String topic, String message) { String orderId; int delayTime = 0; try { Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() { }.getType()); if (msgMap.isEmpty()) { return; } LOGGER.info("onMessage kafka content:{}", msgMap.toString()); orderId = msgMap.get("orderId"); if(StringUtils.isNotEmpty(orderId)){ delayTime = Integer.parseInt(msgMap.get("delayTime")); Calendar calendar = Calendar.getInstance(); //计算出预计发送时间 calendar.add(Calendar.MINUTE, delayTime); long sendTime = calendar.getTimeInMillis(); RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId); LOGGER.info("orderId:{}---放入redis中等待发送---sendTime:{}", ---orderId:{}, sendTime); } } catch (Exception e) { LOGGER.info("onMessage 延时发送异常:{}", e); } }
public void run(){ //获取批量大小 int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100")); try { //批量获取离发送时间最近的orderNum条数据 Calendar calendar = Calendar.getInstance(); long now = calendar.getTimeInMillis(); //获取无限早到现在的事件key(防止上次批量数量小于放入数量,存在历史数据未消费情况) Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum); LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds)); if (CollectionUtils.isNotEmpty(orders)){ //删除key 防止重复发送 for (String orderId : orderIds) { RedisUtils.getInstance().zrem(Constant.DELAY, orderId); } //接下来执行发送等业务逻辑 } } catch (Exception e) { LOGGER.warn("task.run exception:{}", e); } }
至此完成了依赖redis和线程完成了延时发送的功能。
结语
那么对上面两种不同的实现方式进行一下优缺点比较:
综上是本人这次延时功能的实现过程的两种实现方式的总结,具体采用哪种方式还需大家根据实际情况选择,希望能给大家带来帮助。ps:由于本人的技术能力有限,文章中可能出现技术描述不准确或者错误的情况恳请各位大佬指出,我立马进行改正,避免误导大家,谢谢!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对亿速云的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。