百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

Redis 中如何实现的消息队列?实现的方式有几种

wptr33 2024-12-27 17:15 27 浏览

Redis 中实现消息队列的方式有几种

1、使用 List 类型实现

2、使用 ZSet 类型实现

3、使用发布订阅者模式实现消息队列;

4、使用 Stream 实现消息队列。

几种消息队列具体使用和优缺点

1、List 类型实现的方式最为简单和直接,它主要是通过 lpush、rpop 存入和读取实现消息队列的,如下图所示:

lpush 可以把最新的消息存储到消息队列(List 集合)的首部,而 rpop 可以读取消息队列的尾部,这样就实现了先进先出,如下图所示:

优点:使用 List 实现消息队列的优点是消息可以被持久化,List 可以借助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用于把数据保存至磁盘,这样当 Redis 重启之后,消息不会丢失。

缺点:但使用 List 同样存在一定的问题,比如消息不支持重复消费、没有按照主题订阅的功能、不支持消费消息确认等。

2、ZSet 实现消息队列:它是利用 zadd 和 zrangebyscore 来实现存入和读取消息的。

优点:同样具备持久化的功能

缺点:List 存在的问题它也同样存在,不但如此,使用 ZSet 还不能存储相同元素的值。因为它是有序集合,有序集合的存储元素值是不能重复的,但分值可以重复,也就是说当消息值重复时,只能存储一条信息在 ZSet 中。

3、发布订阅:使用发布和订阅的类型,我们可以实现主题订阅的功能,也就是 Pattern Subscribe 的功能。因此我们可以使用一个消费者“queue_*”来订阅所有以“queue_”开头的消息队列,如下图所示:

优点:可以按照主题订阅方式

缺点:

a、无法持久化保存消息,如果 Redis 服务器宕机或重启,那么所有的消息将会丢失;

b、发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后就不能消费之前的历史消息;

c、不支持消费者确认机制,稳定性不能得到保证,例如当消费者获取到消息之后,还没来得及执行就宕机了。因为没有消费者确认机制,Redis 就会误以为消费者已经执行了,因此就不会重复发送未被正常消费的消息了,这样整体的 Redis 稳定性就被没有办法得到保障了。

4、Stream 类型实现:使用 Stream 的 xadd 和 xrange 来实现消息的存入和读取了,并且 Stream 提供了 xack 手动确认消息消费的命令,用它我们就可以实现消费者确认的功能了,使用命令如下:

127.0.0.1:6379> xack mq group1 1580959593553-0

(integer) 1

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:

其中“Group”为群组,消费者也就是接收者需要订阅到群组才能正常获取到消息。

在 Java 代码中使用 List 实现消息队列会有什么问题?应该如何解决?

先看代码部分实现:

import redis.clients.jedis.Jedis;

publicclass ListMQTest {

    public static void main(String[] args){

        // 启动一个线程作为消费者

        new Thread(() -> consumer()).start();

        // 生产者

        producer();

    }

    /**

     * 生产者

     */

    public static void producer() {

        Jedis jedis = new Jedis("127.0.0.1", 6379);

        // 推送消息

        jedis.lpush("mq", "Hello, List.");

    }

    /**

     * 消费者

     */

    public static void consumer() {

        Jedis jedis = new Jedis("127.0.0.1", 6379);

        // 消费消息

        while (true) {

            // 获取消息

            String msg = jedis.rpop("mq");

            if (msg != null) {

                // 接收到了消息

                System.out.println("接收到消息:" + msg);

            }

        }

    }

}

可以看出以上消费者的实现是通过 while 无限循环来获取消息,但如果消息的空闲时间比较长,一直没有新任务,而 while 循环不会因此停止,它会一直执行循环的动作,这样就会白白浪费了系统的资源。

解决办法:借助 Redis 中的阻塞读来替代 rpop 的方法就可以解决此问题

import redis.clients.jedis.Jedis;

public class ListMQExample {

    public static void main(String[] args) throws InterruptedException {

        // 消费者

        new Thread(() -> bConsumer()).start();

        // 生产者

        producer();

    }

    /**

     * 生产者

     */

    public static void producer() throws InterruptedException {

        Jedis jedis = new Jedis("127.0.0.1", 6379);

        // 推送消息

        jedis.lpush("mq", "Hello, Java.");

        Thread.sleep(1000);

        jedis.lpush("mq", "message 2.");

        Thread.sleep(2000);

        jedis.lpush("mq", "message 3.");

    }

    /**

     * 消费者(阻塞版)

     */

    public static void bConsumer() {

        Jedis jedis = new Jedis("127.0.0.1", 6379);

        while (true) {

            // 阻塞读

            for (String item : jedis.brpop(0,"mq")) {

                // 读取到相关数据,进行业务处理

                System.out.println(item);

            }

        }

    }

}

使用 brpop 替代 rpop 来读取最后一条消息,就可以解决 while 循环在没有数据的情况下,一直循环消耗系统资源的情况了。brpop 中的 b 是 blocking 的意思,表示阻塞读,也就是当队列没有数据时,它会进入休眠状态,当有数据进入队列之后,它才会“苏醒”过来执行读取任务,这样就可以解决 while 循环一直执行消耗系统资源的问题了。

在程序中如何使用 Stream 来实现消息队列

在开始实现消息队列之前,我们必须先创建分组才行,因为消费者需要关联分组信息才能正常运行,具体实现代码如下:

import com.google.gson.Gson;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.StreamEntry;

import redis.clients.jedis.StreamEntryID;

import utils.JedisUtils;

import java.util.AbstractMap;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

public class StreamGroupExample {

    private static final String _STREAM_KEY = "mq"; // 流 key

    private static final String _GROUP_NAME = "g1"; // 分组名称

    private static final String _CONSUMER_NAME = "c1"; // 消费者 1 的名称

    private static final String _CONSUMER2_NAME = "c2"; // 消费者 2 的名称

    public static void main(String[] args) {

        // 生产者

        producer();

        // 创建消费组

        createGroup(_STREAM_KEY, _GROUP_NAME);

        // 消费者 1

        new Thread(() -> consumer()).start();

        // 消费者 2

        new Thread(() -> consumer2()).start();

    }

    /**

     * 创建消费分组

     * @param stream    流 key

     * @param groupName 分组名称

     */

    public static void createGroup(String stream, String groupName) {

        Jedis jedis = JedisUtils.getJedis();

        jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);

    }

    /**

     * 生产者

     */

    public static void producer() {

        Jedis jedis = JedisUtils.getJedis();

        // 添加消息 1

        Map<String, String> map = new HashMap<>();

        map.put("data", "redis");

        StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);

        System.out.println("消息添加成功 ID:" + id);

        // 添加消息 2

        Map<String, String> map2 = new HashMap<>();

        map2.put("data", "java");

        StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);

        System.out.println("消息添加成功 ID:" + id2);

    }

    /**

     * 消费者 1

     */

    public static void consumer() {

        Jedis jedis = JedisUtils.getJedis();

        // 消费消息

        while (true) {

            // 读取消息

            Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,

                    new StreamEntryID().UNRECEIVED_ENTRY);

            // 阻塞读取一条消息(最大阻塞时间120s)

            List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,

                    120 * 1000, true, entry);

            if (list != null && list.size() == 1) {

                // 读取到消息

                Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息内容

                System.out.println("Consumer 1 读取到消息 ID:" + list.get(0).getValue().get(0).getID() +

                        " 内容:" + new Gson().toJson(content));

            }

        }

    }

    /**

     * 消费者 2

     */

    public static void consumer2() {

        Jedis jedis = JedisUtils.getJedis();

        // 消费消息

        while (true) {

            // 读取消息

            Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,

                    new StreamEntryID().UNRECEIVED_ENTRY);

            // 阻塞读取一条消息(最大阻塞时间120s)

            List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,

                    120 * 1000, true, entry);

            if (list != null && list.size() == 1) {

                // 读取到消息

                Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息内容

                System.out.println("Consumer 2 读取到消息 ID:" + list.get(0).getValue().get(0).getID() +

                        " 内容:" + new Gson().toJson(content));

            }

        }

    }

}

以上代码运行结果如下:

消息添加成功 ID:1580971482344-0

消息添加成功 ID:1580971482415-0

Consumer 1 读取到消息 ID:1580971482344-0 内容:{"data":"redis"}

Consumer 2 读取到消息 ID:1580971482415-0 内容:{"data":"java"}

其中,jedis.xreadGroup() 方法的第五个参数 noAck 表示是否自动确认消息,如果设置 true 收到消息会自动确认 (ack) 消息,否则需要手动确认。

可以看出,同一个分组内的多个 consumer 会读取到不同消息,不同的 consumer 不会读取到分组内的同一条消息。

相关推荐

MySQL进阶五之自动读写分离mysql-proxy

自动读写分离目前,大量现网用户的业务场景中存在读多写少、业务负载无法预测等情况,在有大量读请求的应用场景下,单个实例可能无法承受读取压力,甚至会对业务产生影响。为了实现读取能力的弹性扩展,分担数据库压...

Postgres vs MySQL_vs2022连接mysql数据库

...

3分钟短文 | Laravel SQL筛选两个日期之间的记录,怎么写?

引言今天说一个细分的需求,在模型中,或者使用laravel提供的EloquentORM功能,构造查询语句时,返回位于两个指定的日期之间的条目。应该怎么写?本文通过几个例子,为大家梳理一下。学习时...

一文由浅入深带你完全掌握MySQL的锁机制原理与应用

本文将跟大家聊聊InnoDB的锁。本文比较长,包括一条SQL是如何加锁的,一些加锁规则、如何分析和解决死锁问题等内容,建议耐心读完,肯定对大家有帮助的。为什么需要加锁呢?...

验证Mysql中联合索引的最左匹配原则

后端面试中一定是必问mysql的,在以往的面试中好几个面试官都反馈我Mysql基础不行,今天来着重复习一下自己的弱点知识。在Mysql调优中索引优化又是非常重要的方法,不管公司的大小只要后端项目中用到...

MySQL索引解析(联合索引/最左前缀/覆盖索引/索引下推)

目录1.索引基础...

你会看 MySQL 的执行计划(EXPLAIN)吗?

SQL执行太慢怎么办?我们通常会使用EXPLAIN命令来查看SQL的执行计划,然后根据执行计划找出问题所在并进行优化。用法简介...

MySQL 从入门到精通(四)之索引结构

索引概述索引(index),是帮助MySQL高效获取数据的数据结构(有序),在数据之外,数据库系统还维护者满足特定查询算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构...

mysql总结——面试中最常问到的知识点

mysql作为开源数据库中的榜一大哥,一直是面试官们考察的重中之重。今天,我们来总结一下mysql的知识点,供大家复习参照,看完这些知识点,再加上一些边角细节,基本上能够应付大多mysql相关面试了(...

mysql总结——面试中最常问到的知识点(2)

首先我们回顾一下上篇内容,主要复习了索引,事务,锁,以及SQL优化的工具。本篇文章接着写后面的内容。性能优化索引优化,SQL中索引的相关优化主要有以下几个方面:最好是全匹配。如果是联合索引的话,遵循最...

MySQL基础全知全解!超详细无废话!轻松上手~

本期内容提醒:全篇2300+字,篇幅较长,可搭配饭菜一同“食”用,全篇无废话(除了这句),干货满满,可收藏供后期反复观看。注:MySQL中语法不区分大小写,本篇中...

深入剖析 MySQL 中的锁机制原理_mysql 锁详解

在互联网软件开发领域,MySQL作为一款广泛应用的关系型数据库管理系统,其锁机制在保障数据一致性和实现并发控制方面扮演着举足轻重的角色。对于互联网软件开发人员而言,深入理解MySQL的锁机制原理...

Java 与 MySQL 性能优化:MySQL分区表设计与性能优化全解析

引言在数据库管理领域,随着数据量的不断增长,如何高效地管理和操作数据成为了一个关键问题。MySQL分区表作为一种有效的数据管理技术,能够将大型表划分为多个更小、更易管理的分区,从而提升数据库的性能和可...

MySQL基础篇:DQL数据查询操作_mysql 查

一、基础查询DQL基础查询语法SELECT字段列表FROM表名列表WHERE条件列表GROUPBY分组字段列表HAVING分组后条件列表ORDERBY排序字段列表LIMIT...

MySql:索引的基本使用_mysql索引的使用和原理

一、索引基础概念1.什么是索引?索引是数据库表的特殊数据结构(通常是B+树),用于...