一口气整理一波延时队列实现方案 延时队列java
wptr33 2024-12-17 16:47 37 浏览
作者:kevinkrcai,腾讯IEG后台开发工程师
一、前言
前段时间参与了海外客服系统相关需求的开发工作,其中需要实现客服对用户消息回复超时的相关处理策略。比如:当客服接收到用户消息后,如果在3分钟内没有回复用户,则需要给用户推送一个问卷表单;当客服30分钟还没有回复用户消息,则需要给对应的客服发告警消息等。这类属于在当前时间点往后推迟指定时间执行的任务,可以采用延时队列实现此类功能,本文对延时队列的相关实现方案做一个简单的整理和总结。
二、相关实现
延时队列,就是具有延时功能的数据结构
2.1 基于内存数据结构实现
基于链表实现
基于链表实现的话,链表的每一个元素都是一个延迟任务,包含延时执行时间和延时任务的创建时间,旁路开启一个时间检测线程去轮询遍历链表,只要有当前时间减去延时任务的创建时间大于或等于延迟时间,任务就可以执行,执行完后从链表删除该任务,因此,新增任务的时间复杂度可以做到O(1),但是删除和检测任务是否能够执行时间复杂度则是O(N)。
上面是比较简单的实现思路,但是每次检测任务是否能够执行需要遍历整个链表,时间复杂度位O(N),如果在新增任务时,就根据执行时间的先后做排序,让延时时间最短的任务排在链表头部,那么检测任务是否能够执行和删除延时任务的时间复杂度能优化为O(1),相对前者较好,但是在新增任务时因需要做排序,所以时间复杂度会变为O(N)。
基于最小堆实现
基于排序链表的实现可以给我们提供了一种思路,将任务按照执行时间排序,所以我们可以考虑排序时间复杂度更优的数据结构:最小堆。JDK JUC包内置的延时队列DelayQueue就是基于最小堆实现。最小堆能保证每次新增任务,延时时间最近的排在堆的根结点,检测任务是否能够执行只需要检查根节点的任务是否到期了就行,检测任务时间复杂度是O(1),但是新增和删除任务,因为要做堆排序,所以时间复杂度是O(logN)
基于时间轮实现
什么是时间轮?
时间轮是一个存储定时任务的环状数组,相对上面提到的基于最小堆和链表的实现,时间轮可以做到新增,删除和检测任务的时间复杂度都是O(1)。
最简单的时间轮是单层时间轮,如下图所示,图中有8个时间格,每个时间格表示1s,那么这个时间轮就是表示周期为8s的时间轮,定时任务存放在时间轮每一个格子上,时间过去1s,则当前时间的指针则会向前移动一个单位时间。
延时任务如何存放在时间轮上:
计算某一个定时任务应该存放在时间轮哪一个下标下:
((延时任务执行时间 - 当前时间) / 时间轮单位时间 + 当前时间在时间轮的索引下标) % 时间周期比如:当前时间00:03,现在有一个在00:04执行的任务,那么计算后得出,(1 / 1 + 3) % 8 = 4,定时任务应该存放在时间轮下标为4的格子上。同样道理,00:10执行的任务,就需要放在下标为2的格子上,可以复用之前的时间轮格子。
如果一个时间格子上刚好有多个任务,怎么执行?
如果延时执行的时间不同,但是下标计算的结果相同,放在了同一个时间格中,可以在这个任务中多一个round字段,表示要多转几圈,例如:当前时间是00:03,有两个延时任务,一个是00:10执行,一个是00:18执行,那他们都会放在下标为2的格子上,但是00:18执行的任务,round = 1,表示要多转一圈。只有round = 0的任务才是当前时间周期可以执行的。这些任务可以在对应的时间格中通过链表连接起来,每次对时间格的处理还需要遍历链表更新任务的round,超出当前时间周期的任务的round字段可以在减1,所以这块的时间复杂度是O(n)。
上面在检测任务的时候,还需要更新对应时间格链表的round字段,时间复杂度是O(n),通过层级时间轮可以将该复杂度优化为O(1),层级时间轮就是有多个时间轮,但是每个时间轮的单位时间是不一样的,如图所示:第一层时间轮的单位时间是1s,时间周期是0到8s,第二层的单位时间等于第一层的时间周期即8s,第二层的时间周期是0到64秒。这样对于超出第一层时间轮时间周期的任务就可以放在第二层。第一层转一圈,第二层转一格。
如下图所示:可以一个三层时间轮表示一天24小时,根据定时任务执行时间放置在不同的时间轮上,比如:02时00分01秒执行的任务比下面两个时间周期大,需要放在hours时间轮上,当current hour指向2,当前时间轮需要下降到下一层时间轮,直到下降到最后一层时间轮为止。
这类基于内存数据结构实现的延迟队列不支持分布式或者持久化,进程重启后数据会丢失,并且当延时任务很多时,队列中对象也会很多,不适合任务量比较大的情况,只适合不需要持久化且任务量相对较少的单机任务,并且能容忍丢失。
2.2 基于DB定时轮询实现
基于DB+定时任务轮询的实现方案通常就是通过一个线程定时去扫描数据库表,找到可以执行的任务触发执行。基于DB轮询,虽然可以解决任务持久化,但是也有以下几个缺点:
- 首先,定时轮询就会存在延迟,这个延迟的误差就在于你设定的轮询的间隔时间。比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
- 第二,扫描数据表对DB有压力,如果任务数量多,表的数据量大,定时扫表会增加DB磁盘IO压力。
- 第三,对自身服务器,定时轮询也会增加CPU的开销。
2.3 基于Redis实现
基于Redis发布订阅实现
延时消息事件通过 Redis 的pub/sub来进行分发, 设置key的过期时间是延迟时间,利用key过期事件通知来实现延迟消息。因为事件监听配置默认关闭,故开启 redis 的事件监听与发布,需修改redis.conf配置文件:
# notify-keyspace-events ""
notify-keyspace-events Ex连接redis,订阅监听expire事件:当key过期后,就会监听到过期的key
PSUBSCRIBE __keyevent@*__:expired如图所示:监听5秒后过期的key:
小demo:
var redisCli *redis.Client
func init() {
// 连接redis
redisCli = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
}
func SetExpireEvent() {
// 设置一个键,并且3秒钟之后过期
redisCli.Set(context.Background(), "test_expire_event_notify", "测试键值过期通知", 3*time.Second)
fmt.Println(time.Now())
}
func SubExpireEvent() {
// 订阅key过期事件
sub := redisCli.PSubscribe(context.Background(), "__keyevent@*__:expired")
for {
msg := <- sub.Channel()
fmt.Println("Channel ", msg.Channel)
fmt.Println("pattern ", msg.Pattern)
fmt.Println("pattern ", msg.Payload) // 过期的key
fmt.Println(time.Now())
}
}
func main() {
SetExpireEvent()
go SubExpireEvent()
select {}
}这种方案缺点在于:
- 第一:redis官方文档中也明确指出,redis从未保证key过期后会立马删除key并立即发送过期通知,因为redis实现key过期的方式是定时随机扫描key,发现过期则删除或者在访问key的时候再去检查key是否过期,过期则删除,所以消息的延迟是必然存在的。
- 第二:redis的事件消息是通过Pub/Sub发出来的,Pub/Sub消息不会做持久化,也没有消息的Ack机制,那样如果消费者没有ack就挂掉或者中间断链,重启后无法再处理这个消息,导致消息丢失。
基于Redis的ZSet实现
Redis的ZSet是基于score进行排序的有序数据结构,因此可以将延迟消息的到期时间戳作为score放到zset中,定时轮询存放延时任务的Zset,取出ZSet中最近要执行的延时任务和当前时间比较,小于等于当前时间即可以执行该任务。新增和查询任务的时间复杂度都是O(logN)。
小demo:
var redisCli *redis.Client
func init() {
// 连接redis
redisCli = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
}
func addTask() {
// 添加一个三秒的定时任务
redisCli.ZAdd(context.Background(),"first-job",&redis.Z{Score:float64(time.Now().Add(3 * time.Second).Unix()),Member: 1})
fmt.Println("添加定时任务: ",time.Now())
}
func getTaskFromZset() {
for {
luaScript := `
local message = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'WITHSCORES', 'LIMIT', 0, 1);
if #message > 0 then
redis.call('ZREM', KEYS[1], message[1]);
return message[1];
else
return nil;
end
`
res, err := redis.NewScript(luaScript).Run(context.Background(), redisCli, []string{"first-job"},strconv.FormatInt(time.Now().Unix(), 10)).Result()
if err != nil {
continue
}
// 执行延迟任务
fmt.Println("res:",res)
fmt.Println("执行延迟任务:",time.Now())
}
}
func main() {
addTask()
go getTaskFromZset()
select {}
}基于Redis ZSet实现相对简单,但也有几点需要注意:
- 第一就是延时消息处理的原子性:轮询线程从Zset获取最近要执行的任务执行并将该任务移除出Zset这几个步骤整体需要封装为一个原子操作,否则服务多实例环境下,可能被其他服务的轮询线程消费到同一任务。
- 第二,定时轮训Zset可能会出现很多空轮训消耗服务器CPU
2.4 基于MQ实现
基于Redis ZSet实现延时队列是一种理解起来比较简单直观并且可以快速落地的方案,但是需要我们自己去实现延迟消息的检测,利用消息队列对延迟消息的支持,我们可以只关心延迟消息的业务逻辑,而不需要关心延迟消息什么时候可以执行。
基于Pulsar延时消费实现
Pulsar最早是在2.4.0引入了延迟消息投递的特性,在Pulsar中使用延迟消息,可以精确指定延迟投递的时间,有deliverAfter和deliverAt两种方式。其中deliverAt可以指定具体的时间戳;deliverAfter可以指定在当前多长时间后执行。
Pulsar延迟消息投递:
// 发送消息,5分钟后执行
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
// 消息内容
Payload: []byte("hello go client, this is a message."),
DeliverAfter: 5 * time.Minute,
})
// 发送消息,在指定时间执行
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
// 消息内容
Payload: []byte("hello go client, this is a message."),
DeliverAt: time.Now().Add(time.Second * 10),
})Pulsar延迟消息实现原理:Pulsar中的延迟消息是基于Netty中的时间轮去实现的,通过时间轮去推动时间,到达对应时间后去寻找对应的消息索引,延迟消息的索引通过优先级队列维护,优先队列根据延时时间排序,维护在Broker内存中。找到消息后在发给Consumer去消费。 Pulsar延迟消息有以下不足之处:
- 第一:延迟消息索引维护在内存中,一条延迟消息的消息索引由三个long类型的字段组成<延迟执行的时间戳以及用于定位消息的ledgerId和entryId,所以当有大量的延迟消息需要执行或者延迟消息的延迟时间越长,对内存的开销也会越大。
- 第二:如果集群出现 Broker 宕机或者 topic 的 ownership 转移,Pulsar 会重建延迟消息索引,对于大规模的延时消息,索引重建的时间也会比较长。
我们目前是为海外的超核玩家服务,用户基数比较小,所以我们也选用了腾讯云的TDMQ-Pulsar作为延时队列的实现方案。
基于RabbitMQ实现
基于RabbitMQ实现有两种方式:
- 第一种是基于RabbitMQ的死信队列实现
- 第二种是基于RabbitMQ官方的延迟插件实现
其实RabbitMQ本身是不支持延迟消息的,但是RabbitMQ支持消息和队列TTL和死信队列,因此,可以利用这两个特性实现延迟队列的效果。
什么是死信消息?
在消息生产和消费过程中,当消息出现以下情况,则会变为死信消息:
- 消息被拒绝且不再重复投递
- 消息超时未被消费,TTL过期
- 队列达到最大长度无法正常投递消息
死信消息会被投递到Dead-Letter-Exchange,然后根据绑定的规则转发到队列的死信队列上,监听这些队列就消费消息就可以实现延迟消息的效果。
但是基于这种方式实现的延迟队列有个缺陷:RabbitMQ只会检查第一个消息是否过期,比如这种情况,第一个消息设置了20s的TTL,第二个消息设置了10s的TTL,RabbitMQ只会等到第一个消息过期了,才会让接下来的第二个消息过期,因此,可能会出现延迟消息投递不按时。
第一种肯定是不能应用于生产环境的,因此RabbitMQ推出了官方的延迟插件
这个插件其实是在消息发到Exchange后,将延迟消息暂时存放在mnesia(一种分布式数据库),等消息到期后在投递到队列中给消费者去消费。
使用延时插件实现延时队列也存在一些局限:
- 延迟消息存储在 Mnesia 数据库,在集群情况下,数据只会分布在其中一个节点,如果该节点宕机,延时消息不可用,但是恢复之后消息不会丢
- 官方文档也有提到,该插件依赖 Erlang 计时器,设计上并 不适合大量的延迟消息,如:百万级以上的延时消息
- 插件一旦被禁用了,没有被消费的延时消息会丢失。
三、总结
相关推荐
- oracle数据导入导出_oracle数据导入导出工具
-
关于oracle的数据导入导出,这个功能的使用场景,一般是换服务环境,把原先的oracle数据导入到另外一台oracle数据库,或者导出备份使用。只不过oracle的导入导出命令不好记忆,稍稍有点复杂...
- 继续学习Python中的while true/break语句
-
上次讲到if语句的用法,大家在微信公众号问了小编很多问题,那么小编在这几种解决一下,1.else和elif是子模块,不能单独使用2.一个if语句中可以包括很多个elif语句,但结尾只能有一个else解...
- python continue和break的区别_python中break语句和continue语句的区别
-
python中循环语句经常会使用continue和break,那么这2者的区别是?continue是跳出本次循环,进行下一次循环;break是跳出整个循环;例如:...
- 简单学Python——关键字6——break和continue
-
Python退出循环,有break语句和continue语句两种实现方式。break语句和continue语句的区别:break语句作用是终止循环。continue语句作用是跳出本轮循环,继续下一次循...
- 2-1,0基础学Python之 break退出循环、 continue继续循环 多重循
-
用for循环或者while循环时,如果要在循环体内直接退出循环,可以使用break语句。比如计算1至100的整数和,我们用while来实现:sum=0x=1whileTrue...
- Python 中 break 和 continue 傻傻分不清
-
大家好啊,我是大田。今天分享一下break和continue在代码中的执行效果是什么,进一步区分出二者的区别。一、continue例1:当小明3岁时不打印年龄,其余年龄正常循环打印。可以看...
- python中的流程控制语句:continue、break 和 return使用方法
-
Python中,continue、break和return是控制流程的关键语句,用于在循环或函数中提前退出或跳过某些操作。它们的用途和区别如下:1.continue(跳过当前循环的剩余部分,进...
- L017:continue和break - 教程文案
-
continue和break在Python中,continue和break是用于控制循环(如for和while)执行流程的关键字,它们的作用如下:1.continue:跳过当前迭代,...
- 作为前端开发者,你都经历过怎样的面试?
-
已经裸辞1个月了,最近开始投简历找工作,遇到各种各样的面试,今天分享一下。其实在职的时候也做过面试官,面试官时,感觉自己问的问题很难区分候选人的能力,最好的办法就是看看候选人的github上的代码仓库...
- 面试被问 const 是否不可变?这样回答才显功底
-
作为前端开发者,我在学习ES6特性时,总被const的"善变"搞得一头雾水——为什么用const声明的数组还能push元素?为什么基本类型赋值就会报错?直到翻遍MDN文档、对着内存图反...
- 2023金九银十必看前端面试题!2w字精品!
-
导文2023金九银十必看前端面试题!金九银十黄金期来了想要跳槽的小伙伴快来看啊CSS1.请解释CSS的盒模型是什么,并描述其组成部分。答案:CSS的盒模型是用于布局和定位元素的概念。它由内容区域...
- 前端面试总结_前端面试题整理
-
记得当时大二的时候,看到实验室的学长学姐忙于各种春招,有些收获了大厂offer,有些还在苦苦面试,其实那时候的心里还蛮忐忑的,不知道自己大三的时候会是什么样的一个水平,所以从19年的寒假放完,大二下学...
- 由浅入深,66条JavaScript面试知识点(七)
-
作者:JakeZhang转发链接:https://juejin.im/post/5ef8377f6fb9a07e693a6061目录由浅入深,66条JavaScript面试知识点(一)由浅入深,66...
- 2024前端面试真题之—VUE篇_前端面试题vue2020及答案
-
添加图片注释,不超过140字(可选)1.vue的生命周期有哪些及每个生命周期做了什么?beforeCreate是newVue()之后触发的第一个钩子,在当前阶段data、methods、com...
- 今年最常见的前端面试题,你会做几道?
-
在面试或招聘前端开发人员时,期望、现实和需求之间总是存在着巨大差距。面试其实是一个交流想法的地方,挑战人们的思考方式,并客观地分析给定的问题。可以通过面试了解人们如何做出决策,了解一个人对技术和解决问...
- 一周热门
- 最近发表
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)
- c语言 switch (34)
- git commit (34)
