一种异步延迟队列的实现方式调研

一、应用场景
目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。通过延时处理,极大地节省系统的资源,不必轮询数据库处理任务。
目前大部分功能通过定时任务完成,定时任务还分使用quartz及xxljob两种类型轮询时间短,每秒执行一次,对数据库造成一定的压力,并且会有1秒的误差。轮询时间久,如30分钟一次,03:01插入一条数据,正常3:31执行过期,但是3:30执行轮询时,扫描330的数据,是扫描不到3:31的数据的,需要4:00的时候才能扫描到,相当于多延迟了29分钟!
二、演示处理方式调研
1.delayqueue
实现方式:
jvm提供的延迟阻塞队列,通过优先级队列对不同延迟时间任务进行排序,通过condition进行阻塞、睡眠dealy时间 获取延迟任务。
当有新任务加入时,会判断新任务是否是第一个待执行的任务,若是,会解除队列睡眠,防止新加入的元素时需要执行的元素而不能正常被执行线程获取到。
存在的问题:
单机运行,系统宕机后,无法进行有效的重试
没有执行记录和备份
没有重试机制
系统重启时,会将任务清空!
不能分片消费
优势: 实现简单,无任务时阻塞,节省资源,执行时间准确
2.延迟队列mq
实现方式:依赖mq,通过设置延迟消费时间,达到延迟消费功能。像rabbitmq、jmq都可以设置延迟消费时间。rabbitmq通过将消息设置过期时间,放入私信队列进行消费实现。
存在的问题:时间设置不灵活,每个queue是固定的到期时间,每次新创建延时队列,需要创建新的消息队列
优点:依靠jmq,可以有效的监控、消费记录、重试,具备多机同时消费能力,不惧怕宕机
3.定时任务
通过定时任务轮询符合条件的数据
缺点:
必须要读业务数据库,对数据库造成一定的压力,
存在延时
一次扫描数据量过大时,占用过多的系统资源。
无法分片消费
 优点:
消费失败后,下次还能继续消费,具备重试能力,
消费能力稳定
4.redis
任务存储在redis中,使用redis的 zset队列根据score进行排序,程序通过线程不断获取队列数据消费,实现延时队列
优点:
查询redis相比较数据库快,set队列长度过大,会根据跳表结构进行查询,效率高
redis可根据时间戳进行排序,只需要查询当前时间戳内的分数的任务即可
无惧机器重启
分布式消费
缺点:
受限于redis性能,并发10w
多个命令无法保证原子性,使用lua脚本会要求所有数据都在一个redis分片上。
5. 时间轮
通过时间轮实现的延迟任务执行,也是基于jvm单机运行,如kafka、netty都有实现时间轮,redisson的看门狗也是通过netty的时间轮实现的。
缺点:不适合分布式服务的使用,宕机后,会丢失任务。
三、实现目标
兼容目前在使用的异步事件组件,并提供更可靠,可重试、有记录、可监控报警、高性能的延迟组件。
消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。
client支持丰富:支持多重语言。
高可用性:支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。
实时性:允许存在一定的时间误差。
支持消息删除:业务使用方,可以随时删除指定消息。
支持消费查询
支持手动重试
对当前异步事件的执行增加监控
四、架构设计
五、延迟组件实现方式
1.实现原理
目前选择使用jimdb通过zset实现延时功能,将任务id和对应的执行时间作为score存在在zset队列中,默认会按照score排序,每次取0-当前时间内的score的任务id,
发送延迟任务时,会根据时间戳+机器ip+queuename+sequence 生成唯一的id,构造消息体,加密后放入zset队列中。
通过搬运线程,将达到执行时间的任务移动到发布队列中,等待消费者获取。
监控方通过集成ump
消费记录通过redis备份+数据库持久化完成。
通过缓存实现的方式,只是实现的一种,可以通过参数控制使用哪一种实现方式,并可通过spi自由扩展。
2.消息结构
每个job必须包含以下几个属性:
topic:job类型,即queuename
id:job的唯一标识。用来检索和删除指定的job信息。
delay:job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
body:job的内容,供消费者做具体的业务处理,以json格式存储。
traceid:发送线程的traceid,待后续pfinder支持设置traceid后,可与发送线程公用同一个traceid,便于日志追踪
具体结构如下图表示:
ttr的设计目的是为了保证消息传输的可靠性。
3.数据流转及流程图
基于redis-disruptor方式进行发布、消费,可以作为消息来进行使用,消费者采用原有异步事件的disruptor无锁队列消费,不同应用、不同queue之间无锁
1)支持应用只发布,不消费,达到消息队列的功能。
2)支持分桶,针对大key问题,若事件多,可以设置延迟队列和任务队列桶的数量,减小因大key造成的redis阻塞问题。
3)通过ducc配置,进行性能的扩展,目前只支持开启消费和关闭消费。 
4)支持设置超时时间配置,防止消费线程执行过久
瓶颈:消费速度慢,生产速度过快,会导致ringbuffer队列占满,当前应用既是生产者也是消费者时,生产者会休眠,性能取决于消费速度,可通过水平扩展机器,直接提升性能。监控redis队列的长度,若不断增长,可考虑增加消费者,直接提高性能。
可能出现的情况:因一个应用公用一个disruptor,拥有64个消费者线程,如果某一个事件消费过慢,导致64个线程都在消费这个事件,会导致其他事件无消费线程消费,生产者线程也被阻塞,导致所有事件的消费都被阻塞。
后期观察是否有这个性能瓶颈,可给每一个queue一个消费者线程池。
六、demo示例
增加配置文件
判断是否开启jd.event.enable:true
com.jd.car senna-event 1.0-snapshot 配置jd:senna:event:enable: truequeue:retryeventqueue:bucketnum: 1handlebean: retryhandle消费代码:package com.jd.car.senna.admin.event;import com.jd.car.senna.event.eventhandler;import com.jd.car.senna.event.annotation.sennaevent;import lombok.extern.slf4j.slf4j;import org.springframework.stereotype.component;/*** @author zhangluyao* @description* @create 2022-02-21-9:54 下午*/@slf4j@component(retryhandle)public class retryqueueevent extends eventhandler {@overrideprotected void onhandle(string key, string eventtype) {log.info(handler开始消费:{}, key);}@overrideprotected void ondelayhandle(string key, string eventtype) {log.info(delayhandler开始消费:{}, key);}}  
注解形式:
package com.jd.car.senna.admin.event;import com.jd.car.senna.event.eventhandler;import com.jd.car.senna.event.annotation.sennaevent;import lombok.extern.slf4j.slf4j;/*** @author zhangluyao* @description* @create 2022-02-21-9:54 下午*/@slf4j@sennaevent(queuename = testqueue, bucketnum = 5,delaybucketnum = 5,delayenable = true)public class testqueueevent extends eventhandler {@overrideprotected void onhandle(string key, string eventtype) {log.info(handler开始消费:{}, key);}@overrideprotected void ondelayhandle(string key, string eventtype) {log.info(delayhandler开始消费:{}, key);}}  
发送代码:
package com.jd.car.senna.admin.controller;import com.jd.car.senna.event.queue.ieventqueue;import lombok.extern.slf4j.slf4j;import org.springframework.context.annotation.lazy;import org.springframework.web.bind.annotation.getmapping;import org.springframework.web.bind.annotation.responsebody;import org.springframework.web.bind.annotation.restcontroller;import javax.annotation.resource;import java.util.concurrent.completablefuture;/*** @author zly*/@restcontroller@slf4jpublic class democontroller {@lazy@resource(name = testqueue)private ieventqueue eventqueue;@responsebody@getmapping(/api/v1/demo)public string demo() {log.info(发送无延迟消息);eventqueue.push(no delay 5000 millseconds message 3);return ok;}@responsebody@getmapping(/api/v1/demo1)public string demo1() {log.info(发送延迟5秒消息);eventqueue.push( delay 5000 millseconds message,name,1000*5l);return ok;}@responsebody@getmapping(/api/v1/demo2)public string demo2() {log.info(发送延迟到2022-04-02 00:00:00执行的消息);eventqueue.push( delay message,name to 2022-04-02 00:00:00, new date(1648828800000));return ok;} }


60余款机器人首秀工博会意味着什么?
功率分析仪怎么接线_功率分析仪接线图
台积电掌握华为芯片的命运 还是台湾晶圆代工巨头
自研软件赋能弹性体3D打印!清锋LuxStudio&LuxFlow让用户3D打印游刃有余
如何设计用于自动选择性焊接过程的PCB?
一种异步延迟队列的实现方式调研
采用μC/OS-Ⅱ的数据采集系统设计与应用
国产降噪蓝牙耳机推荐 高性能主动降噪耳机品牌推荐
vivo Y52s手机已开启预售
辽宁省工业互联网+安全可控先进制造业数字服务产业峰会在沈顺利召开
冲洗型管道式差压在线密度变送器的原理及设计
未来十年,有望诞生全球芯片制造领域的“代工之王”
昆仑芯2代AI芯片荣获「2022年度AI生产力创新奖」
基于FPGA的高速数据采集系统该怎么设计?
AI工场受邀出席2019年华洽会人工智能论坛
芯片制造商巨资进军移动领域
知识分享:圆形LED显示屏的详细介绍
PC机的CPLD串行通信的特点与程序的编写
积分球分光光度计的原理是什么?
怎样在TextWrangler中编译并运行C代码