一口气整理一波延时队列实现方案 延时队列java
wptr33 2024-12-17 16:47 31 浏览
作者:kevinkrcai,腾讯IEG后台开发工程师
一、前言
前段时间参与了海外客服系统相关需求的开发工作,其中需要实现客服对用户消息回复超时的相关处理策略。比如:当客服接收到用户消息后,如果在3分钟内没有回复用户,则需要给用户推送一个问卷表单;当客服30分钟还没有回复用户消息,则需要给对应的客服发告警消息等。这类属于在当前时间点往后推迟指定时间执行的任务,可以采用延时队列实现此类功能,本文对延时队列的相关实现方案做一个简单的整理和总结。
二、相关实现
延时队列,就是具有延时功能的数据结构
2.1 基于内存数据结构实现
基于链表实现
基于链表实现的话,链表的每一个元素都是一个延迟任务,包含延时执行时间和延时任务的创建时间,旁路开启一个时间检测线程去轮询遍历链表,只要有当前时间减去延时任务的创建时间大于或等于延迟时间,任务就可以执行,执行完后从链表删除该任务,因此,新增任务的时间复杂度可以做到O(1),但是删除和检测任务是否能够执行时间复杂度则是O(N)。
上面是比较简单的实现思路,但是每次检测任务是否能够执行需要遍历整个链表,时间复杂度位O(N),如果在新增任务时,就根据执行时间的先后做排序,让延时时间最短的任务排在链表头部,那么检测任务是否能够执行和删除延时任务的时间复杂度能优化为O(1),相对前者较好,但是在新增任务时因需要做排序,所以时间复杂度会变为O(N)。
基于最小堆实现
基于排序链表的实现可以给我们提供了一种思路,将任务按照执行时间排序,所以我们可以考虑排序时间复杂度更优的数据结构:最小堆。JDK JUC包内置的延时队列DelayQueue就是基于最小堆实现。最小堆能保证每次新增任务,延时时间最近的排在堆的根结点,检测任务是否能够执行只需要检查根节点的任务是否到期了就行,检测任务时间复杂度是O(1),但是新增和删除任务,因为要做堆排序,所以时间复杂度是O(logN)
基于时间轮实现
什么是时间轮?
时间轮是一个存储定时任务的环状数组,相对上面提到的基于最小堆和链表的实现,时间轮可以做到新增,删除和检测任务的时间复杂度都是O(1)。
最简单的时间轮是单层时间轮,如下图所示,图中有8个时间格,每个时间格表示1s,那么这个时间轮就是表示周期为8s的时间轮,定时任务存放在时间轮每一个格子上,时间过去1s,则当前时间的指针则会向前移动一个单位时间。
延时任务如何存放在时间轮上:
计算某一个定时任务应该存放在时间轮哪一个下标下:
((延时任务执行时间 - 当前时间) / 时间轮单位时间 + 当前时间在时间轮的索引下标) % 时间周期
比如:当前时间00:03,现在有一个在00:04执行的任务,那么计算后得出,(1 / 1 + 3) % 8 = 4,定时任务应该存放在时间轮下标为4的格子上。同样道理,00:10执行的任务,就需要放在下标为2的格子上,可以复用之前的时间轮格子。
如果一个时间格子上刚好有多个任务,怎么执行?
如果延时执行的时间不同,但是下标计算的结果相同,放在了同一个时间格中,可以在这个任务中多一个round字段,表示要多转几圈,例如:当前时间是00:03,有两个延时任务,一个是00:10执行,一个是00:18执行,那他们都会放在下标为2的格子上,但是00:18执行的任务,round = 1,表示要多转一圈。只有round = 0的任务才是当前时间周期可以执行的。这些任务可以在对应的时间格中通过链表连接起来,每次对时间格的处理还需要遍历链表更新任务的round,超出当前时间周期的任务的round字段可以在减1,所以这块的时间复杂度是O(n)。
上面在检测任务的时候,还需要更新对应时间格链表的round字段,时间复杂度是O(n),通过层级时间轮可以将该复杂度优化为O(1),层级时间轮就是有多个时间轮,但是每个时间轮的单位时间是不一样的,如图所示:第一层时间轮的单位时间是1s,时间周期是0到8s,第二层的单位时间等于第一层的时间周期即8s,第二层的时间周期是0到64秒。这样对于超出第一层时间轮时间周期的任务就可以放在第二层。第一层转一圈,第二层转一格。
如下图所示:可以一个三层时间轮表示一天24小时,根据定时任务执行时间放置在不同的时间轮上,比如:02时00分01秒执行的任务比下面两个时间周期大,需要放在hours时间轮上,当current hour指向2,当前时间轮需要下降到下一层时间轮,直到下降到最后一层时间轮为止。
这类基于内存数据结构实现的延迟队列不支持分布式或者持久化,进程重启后数据会丢失,并且当延时任务很多时,队列中对象也会很多,不适合任务量比较大的情况,只适合不需要持久化且任务量相对较少的单机任务,并且能容忍丢失。
2.2 基于DB定时轮询实现
基于DB+定时任务轮询的实现方案通常就是通过一个线程定时去扫描数据库表,找到可以执行的任务触发执行。基于DB轮询,虽然可以解决任务持久化,但是也有以下几个缺点:
- 首先,定时轮询就会存在延迟,这个延迟的误差就在于你设定的轮询的间隔时间。比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
- 第二,扫描数据表对DB有压力,如果任务数量多,表的数据量大,定时扫表会增加DB磁盘IO压力。
- 第三,对自身服务器,定时轮询也会增加CPU的开销。
2.3 基于Redis实现
基于Redis发布订阅实现
延时消息事件通过 Redis 的pub/sub来进行分发, 设置key的过期时间是延迟时间,利用key过期事件通知来实现延迟消息。因为事件监听配置默认关闭,故开启 redis 的事件监听与发布,需修改redis.conf配置文件:
# notify-keyspace-events ""
notify-keyspace-events Ex
连接redis,订阅监听expire事件:当key过期后,就会监听到过期的key
PSUBSCRIBE __keyevent@*__:expired
如图所示:监听5秒后过期的key:
小demo:
var redisCli *redis.Client
func init() {
// 连接redis
redisCli = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
}
func SetExpireEvent() {
// 设置一个键,并且3秒钟之后过期
redisCli.Set(context.Background(), "test_expire_event_notify", "测试键值过期通知", 3*time.Second)
fmt.Println(time.Now())
}
func SubExpireEvent() {
// 订阅key过期事件
sub := redisCli.PSubscribe(context.Background(), "__keyevent@*__:expired")
for {
msg := <- sub.Channel()
fmt.Println("Channel ", msg.Channel)
fmt.Println("pattern ", msg.Pattern)
fmt.Println("pattern ", msg.Payload) // 过期的key
fmt.Println(time.Now())
}
}
func main() {
SetExpireEvent()
go SubExpireEvent()
select {}
}
这种方案缺点在于:
- 第一:redis官方文档中也明确指出,redis从未保证key过期后会立马删除key并立即发送过期通知,因为redis实现key过期的方式是定时随机扫描key,发现过期则删除或者在访问key的时候再去检查key是否过期,过期则删除,所以消息的延迟是必然存在的。
- 第二:redis的事件消息是通过Pub/Sub发出来的,Pub/Sub消息不会做持久化,也没有消息的Ack机制,那样如果消费者没有ack就挂掉或者中间断链,重启后无法再处理这个消息,导致消息丢失。
基于Redis的ZSet实现
Redis的ZSet是基于score进行排序的有序数据结构,因此可以将延迟消息的到期时间戳作为score放到zset中,定时轮询存放延时任务的Zset,取出ZSet中最近要执行的延时任务和当前时间比较,小于等于当前时间即可以执行该任务。新增和查询任务的时间复杂度都是O(logN)。
小demo:
var redisCli *redis.Client
func init() {
// 连接redis
redisCli = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
}
func addTask() {
// 添加一个三秒的定时任务
redisCli.ZAdd(context.Background(),"first-job",&redis.Z{Score:float64(time.Now().Add(3 * time.Second).Unix()),Member: 1})
fmt.Println("添加定时任务: ",time.Now())
}
func getTaskFromZset() {
for {
luaScript := `
local message = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'WITHSCORES', 'LIMIT', 0, 1);
if #message > 0 then
redis.call('ZREM', KEYS[1], message[1]);
return message[1];
else
return nil;
end
`
res, err := redis.NewScript(luaScript).Run(context.Background(), redisCli, []string{"first-job"},strconv.FormatInt(time.Now().Unix(), 10)).Result()
if err != nil {
continue
}
// 执行延迟任务
fmt.Println("res:",res)
fmt.Println("执行延迟任务:",time.Now())
}
}
func main() {
addTask()
go getTaskFromZset()
select {}
}
基于Redis ZSet实现相对简单,但也有几点需要注意:
- 第一就是延时消息处理的原子性:轮询线程从Zset获取最近要执行的任务执行并将该任务移除出Zset这几个步骤整体需要封装为一个原子操作,否则服务多实例环境下,可能被其他服务的轮询线程消费到同一任务。
- 第二,定时轮训Zset可能会出现很多空轮训消耗服务器CPU
2.4 基于MQ实现
基于Redis ZSet实现延时队列是一种理解起来比较简单直观并且可以快速落地的方案,但是需要我们自己去实现延迟消息的检测,利用消息队列对延迟消息的支持,我们可以只关心延迟消息的业务逻辑,而不需要关心延迟消息什么时候可以执行。
基于Pulsar延时消费实现
Pulsar最早是在2.4.0引入了延迟消息投递的特性,在Pulsar中使用延迟消息,可以精确指定延迟投递的时间,有deliverAfter和deliverAt两种方式。其中deliverAt可以指定具体的时间戳;deliverAfter可以指定在当前多长时间后执行。
Pulsar延迟消息投递:
// 发送消息,5分钟后执行
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
// 消息内容
Payload: []byte("hello go client, this is a message."),
DeliverAfter: 5 * time.Minute,
})
// 发送消息,在指定时间执行
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
// 消息内容
Payload: []byte("hello go client, this is a message."),
DeliverAt: time.Now().Add(time.Second * 10),
})
Pulsar延迟消息实现原理:Pulsar中的延迟消息是基于Netty中的时间轮去实现的,通过时间轮去推动时间,到达对应时间后去寻找对应的消息索引,延迟消息的索引通过优先级队列维护,优先队列根据延时时间排序,维护在Broker内存中。找到消息后在发给Consumer去消费。 Pulsar延迟消息有以下不足之处:
- 第一:延迟消息索引维护在内存中,一条延迟消息的消息索引由三个long类型的字段组成<延迟执行的时间戳以及用于定位消息的ledgerId和entryId,所以当有大量的延迟消息需要执行或者延迟消息的延迟时间越长,对内存的开销也会越大。
- 第二:如果集群出现 Broker 宕机或者 topic 的 ownership 转移,Pulsar 会重建延迟消息索引,对于大规模的延时消息,索引重建的时间也会比较长。
我们目前是为海外的超核玩家服务,用户基数比较小,所以我们也选用了腾讯云的TDMQ-Pulsar作为延时队列的实现方案。
基于RabbitMQ实现
基于RabbitMQ实现有两种方式:
- 第一种是基于RabbitMQ的死信队列实现
- 第二种是基于RabbitMQ官方的延迟插件实现
其实RabbitMQ本身是不支持延迟消息的,但是RabbitMQ支持消息和队列TTL和死信队列,因此,可以利用这两个特性实现延迟队列的效果。
什么是死信消息?
在消息生产和消费过程中,当消息出现以下情况,则会变为死信消息:
- 消息被拒绝且不再重复投递
- 消息超时未被消费,TTL过期
- 队列达到最大长度无法正常投递消息
死信消息会被投递到Dead-Letter-Exchange,然后根据绑定的规则转发到队列的死信队列上,监听这些队列就消费消息就可以实现延迟消息的效果。
但是基于这种方式实现的延迟队列有个缺陷:RabbitMQ只会检查第一个消息是否过期,比如这种情况,第一个消息设置了20s的TTL,第二个消息设置了10s的TTL,RabbitMQ只会等到第一个消息过期了,才会让接下来的第二个消息过期,因此,可能会出现延迟消息投递不按时。
第一种肯定是不能应用于生产环境的,因此RabbitMQ推出了官方的延迟插件
这个插件其实是在消息发到Exchange后,将延迟消息暂时存放在mnesia(一种分布式数据库),等消息到期后在投递到队列中给消费者去消费。
使用延时插件实现延时队列也存在一些局限:
- 延迟消息存储在 Mnesia 数据库,在集群情况下,数据只会分布在其中一个节点,如果该节点宕机,延时消息不可用,但是恢复之后消息不会丢
- 官方文档也有提到,该插件依赖 Erlang 计时器,设计上并 不适合大量的延迟消息,如:百万级以上的延时消息
- 插件一旦被禁用了,没有被消费的延时消息会丢失。
三、总结
相关推荐
- MySQL进阶五之自动读写分离mysql-proxy
-
自动读写分离目前,大量现网用户的业务场景中存在读多写少、业务负载无法预测等情况,在有大量读请求的应用场景下,单个实例可能无法承受读取压力,甚至会对业务产生影响。为了实现读取能力的弹性扩展,分担数据库压...
- 3分钟短文 | Laravel SQL筛选两个日期之间的记录,怎么写?
-
引言今天说一个细分的需求,在模型中,或者使用laravel提供的EloquentORM功能,构造查询语句时,返回位于两个指定的日期之间的条目。应该怎么写?本文通过几个例子,为大家梳理一下。学习时...
- 一文由浅入深带你完全掌握MySQL的锁机制原理与应用
-
本文将跟大家聊聊InnoDB的锁。本文比较长,包括一条SQL是如何加锁的,一些加锁规则、如何分析和解决死锁问题等内容,建议耐心读完,肯定对大家有帮助的。为什么需要加锁呢?...
- 验证Mysql中联合索引的最左匹配原则
-
后端面试中一定是必问mysql的,在以往的面试中好几个面试官都反馈我Mysql基础不行,今天来着重复习一下自己的弱点知识。在Mysql调优中索引优化又是非常重要的方法,不管公司的大小只要后端项目中用到...
- MySQL索引解析(联合索引/最左前缀/覆盖索引/索引下推)
-
目录1.索引基础...
- 你会看 MySQL 的执行计划(EXPLAIN)吗?
-
SQL执行太慢怎么办?我们通常会使用EXPLAIN命令来查看SQL的执行计划,然后根据执行计划找出问题所在并进行优化。用法简介...
- MySQL 从入门到精通(四)之索引结构
-
索引概述索引(index),是帮助MySQL高效获取数据的数据结构(有序),在数据之外,数据库系统还维护者满足特定查询算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构...
- mysql总结——面试中最常问到的知识点
-
mysql作为开源数据库中的榜一大哥,一直是面试官们考察的重中之重。今天,我们来总结一下mysql的知识点,供大家复习参照,看完这些知识点,再加上一些边角细节,基本上能够应付大多mysql相关面试了(...
- mysql总结——面试中最常问到的知识点(2)
-
首先我们回顾一下上篇内容,主要复习了索引,事务,锁,以及SQL优化的工具。本篇文章接着写后面的内容。性能优化索引优化,SQL中索引的相关优化主要有以下几个方面:最好是全匹配。如果是联合索引的话,遵循最...
- MySQL基础全知全解!超详细无废话!轻松上手~
-
本期内容提醒:全篇2300+字,篇幅较长,可搭配饭菜一同“食”用,全篇无废话(除了这句),干货满满,可收藏供后期反复观看。注:MySQL中语法不区分大小写,本篇中...
- 深入剖析 MySQL 中的锁机制原理_mysql 锁详解
-
在互联网软件开发领域,MySQL作为一款广泛应用的关系型数据库管理系统,其锁机制在保障数据一致性和实现并发控制方面扮演着举足轻重的角色。对于互联网软件开发人员而言,深入理解MySQL的锁机制原理...
- Java 与 MySQL 性能优化:MySQL分区表设计与性能优化全解析
-
引言在数据库管理领域,随着数据量的不断增长,如何高效地管理和操作数据成为了一个关键问题。MySQL分区表作为一种有效的数据管理技术,能够将大型表划分为多个更小、更易管理的分区,从而提升数据库的性能和可...
- MySQL基础篇:DQL数据查询操作_mysql 查
-
一、基础查询DQL基础查询语法SELECT字段列表FROM表名列表WHERE条件列表GROUPBY分组字段列表HAVING分组后条件列表ORDERBY排序字段列表LIMIT...
- MySql:索引的基本使用_mysql索引的使用和原理
-
一、索引基础概念1.什么是索引?索引是数据库表的特殊数据结构(通常是B+树),用于...
- 一周热门
-
-
C# 13 和 .NET 9 全知道 :13 使用 ASP.NET Core 构建网站 (1)
-
程序员的开源月刊《HelloGitHub》第 71 期
-
详细介绍一下Redis的Watch机制,可以利用Watch机制来做什么?
-
假如有100W个用户抢一张票,除了负载均衡办法,怎么支持高并发?
-
Java面试必考问题:什么是乐观锁与悲观锁
-
如何将AI助手接入微信(打开ai手机助手)
-
redission YYDS spring boot redission 使用
-
SparkSQL——DataFrame的创建与使用
-
一文带你了解Redis与Memcached? redis与memcached的区别
-
如何利用Redis进行事务处理呢? 如何利用redis进行事务处理呢英文
-
- 最近发表
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)
- c语言 switch (34)
- git commit (34)