前言:

消息队列主要用于

解耦,消息异步,流量削峰

为什么要用redis实现消息队列,rabbitmq不行吗?

现实情况中,系统有很多情况可以使用消息队列,但是往往不需要rebbitmq的很多功能,引入rebbitmq通常要增加很多成本,Redis作为消息队列使用。

用 Redis 的几种可实施的方案

  • Redis List:Redis的List数据结构可以被用作简单的消息队列。生产者将消息推送到List的尾部,消费者则从List的头部获取消息。这种方式简单易用,但在高并发情况下可能存在性能瓶颈。

  • Redis Pub/Sub:Redis的发布订阅功能可以用来实现消息队列。生产者发布消息到指定的频道,而消费者订阅这个频道以接收消息。虽然简单,但Pub/Sub不支持消息持久化,且消息无法在消费者离线时被保存。

  • Redis Streams: Redis 5.0引入了Streams数据结构,它提供了更复杂的消息队列功能,包括持久性、分组、消息消费确认等功能。因此,Redis Streams通常被视为更适合用作消息队列的数据结构。

Redis Streams 特点和好处

  1. 持久性:Redis Stream 支持消息持久化,可以将消息存储在内存中,也可以选择将消息保存到磁盘上,保证消息的持久性。

  2. 多消费者分组:Stream 支持将消费者分组,每个消费者组内的消费者可以共享一个消息流,多个消费者组可以并行消费消息。

  3. 消息消费确认:消费者可以对已经处理的消息进行确认,这样可以确保消息被正确处理,避免消息的重复消费。

  4. 消息时间序列:消息在 Stream 中是有序存储的,每条消息都有唯一的ID和时间戳,可以按照时间顺序进行消费。

  5. 复杂数据结构:Stream 支持复杂的消息结构,每个消息可以包含多个字段和值,这样可以存储更加丰富的消息内容。

  6. 消费者消费位置:消费者可以控制自己的消费位置,可以从头开始消费,也可以从某个特定的消息ID开始消费。

1.引入Redis依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.配置yaml文件

redis:
  mq:
    stream:
      # 生产者
      keyName: stream:card
      errorKeyName: stream:card:error
      # 消费组
      group:
        groupName: card_group
        # 消费者
        consumers:
            # 可以配置多个监听lei
          - consumerName: card_consumers
            # 监听类
            listenerClass: com.test.redismq.listener.CardReadRedisListener
          - consumerName: card_consumers1
            # 监听类
            listenerClass: com.test.redismq.listener.CardReadRedisListener1
          - consumerName: card_consumers2   

3.配置实体类

RedisMq

@EnableConfigurationProperties
@Configuration
@ConfigurationProperties(prefix = "redis.mq")
@Order(99)//定义加载顺序,值越大越先加载
@Data
public class RedisMq {
    private RedisMqStream stream;
}

RedisMqConsumers

@Data
public class RedisMqConsumers {
    private String consumerName;
    private String listenerClass;
}

RedisMqGroup

@Data
public class RedisMqGroup {
    private String groupName;
    private List<RedisMqConsumers> consumers;
}

RedisMqStream

@Data
public class RedisMqStream {
    private String keyName;
    private String errorKeyName;
    private RedisMqGroup group;
}

StreamErrorHandler

@Slf4j
public class StreamErrorHandler implements ErrorHandler {
    //配置错误
    @Override
    public void handleError(Throwable t) {
        log.error(t.getMessage());
    }
}

4.redisConfig(重点)

具体步骤如下:

  1. 配置线程池:根据CPU核心数配置线程池,用于处理消息消费任务。

  2. 配置监听容器:设置监听容器的批量大小、超时时间、错误处理器等参数。

  3. 初始化消费主题和组信息:检查并创建Redis流主题和消费组。

  4. 创建监听容器:使用配置好的参数创建监听容器。

  5. 订阅消息:通过监听容器订阅指定主题的消息,并指定消息处理器。

  6. 启动监听:启动监听容器,开始消费消息。

@Component
public class RedisAutoConfiguration {

    @Resource
    private RedisStreamUtil redisStreamUtil;
    @Resource
    private RedisMq redisMq;
   
    
    @Bean
    @Order(300)
    public List<Subscription> subscription(RedisConnectionFactory factory){
        List<Subscription> resultList = new ArrayList<>();
        
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("redis-stream-");
        executor.initialize();

        // 创建选项构建器:初始化 StreamMessageListenerContainerOptions 的构建器,用于        
        //后续配置:
        //设置批量大小:指定每次从Redis流中读取的消息数量为5条,减少频繁的I/O操作。
        //设置线程池:指定用于处理消息的线程池,确保消息处理的并发性和性能。
        //设置轮询超时时间:指定轮询Redis流的超时时间为1秒,避免长时间等待无消息的情况。
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(100)
                        .executor(executor)
                        .pollTimeout(Duration.ofSeconds(5))
                        .errorHandler(new StreamErrorHandler())
                        .build();
       // 遍历所有RedisMqStream
        RedisMq redisMq = SpringUtil.getBean(RedisMq.class);
        RedisMqStream redisMqStream = redisMq.getStream();
        String keyName = redisMqStream.getKeyName();
        RedisMqGroup group = redisMqStream.getGroup();
        if (Objects.isNull(redisMqStream) || Objects.isNull(redisMqStream.getKeyName())||   Objects.isNull(redisMqStream.getGroup())) {
            log.warn("RedisMqStream or keyName or group is null, please check your configuration.");
            return resultList;
        }
        initStream(keyName, group.getGroupName());
        // 初始化Stream,创建Stream并创建对应的消费组
        // 创建Stream监听容器
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer =
                StreamMessageListenerContainer.create(factory, options);

        // 遍历每个消费者配置
        for (RedisMqConsumers redisMqConsumers : group.getConsumers()) {
            Consumer consumer = Consumer.from(group.getGroupName(), redisMqConsumers.getConsumerName());

            String listenerClass = redisMqConsumers.getListenerClass();

            if (Objects.isNull(listenerClass)){
                log.warn("redis stream listenerClass is null");
                continue;
            }
            // 动态创建监听器实例
            StreamListener listener = null;
            try {
                Class<?> aClass = Class.forName(listenerClass);

                listener = (StreamListener) aClass.getConstructor().newInstance();
            } catch (Exception e) {
                log.warn(e.getMessage());
            }
            /**
             * 订阅
             */
            if (null == listener) {
//                log.error("添加订阅监听器【{}】失败", listenerClass);
                continue;
            }
            Subscription subscription = listenerContainer.receive(consumer,
                    StreamOffset.create(keyName, ReadOffset.lastConsumed()), listener);
            resultList.add(subscription);
            log.info("添加订阅监听器【{}】成功", listenerClass);
        }

        // 启动监听容器
        listenerContainer.start();
        return resultList;
    }

    private void initStream(String key, String group){
        boolean hasKey = RedisUtil.hasKey(key);
        if(!hasKey){

            Map<String,Object> map = new HashMap<>(1);
            map.put("field","value");
            //创建主题
            String result = RedisUtil.addMap(key, map);
            //创建消费组
            RedisUtil.createGroup(key,group);
            //将初始化的值删除掉
            RedisUtil.del(key,result);
        }
    }

}

4.创建工具类

@Component
public class RedisUtil {

    private static RedisTemplate redisTemplate;

    @Autowired
    public static void setRedisTemplate(RedisTemplate redisTemplate) {
        RedisUtil.redisTemplate = redisTemplate;
    }

    public static boolean sendMessage(String key, String message){
        // 创建消息记录
        ObjectRecord<String, String> record = StreamRecords.newRecord()
                .ofObject(message) // 消息内容
                .withStreamKey(key); // Stream 键

        // 发送消息
        RecordId recordId = redisTemplate.opsForStream().add(record);

        return !Objects.isNull(recordId);
    }

    public static boolean sendMessage(String key, Map<String, String> message){
        RecordId recordId = null;
        try {
            // 发送消息
            recordId = redisTemplate.opsForStream().add(key, message);
        }catch (Exception e){
            return false;
        }
        return !Objects.isNull(recordId);
    }

    /**
     * 读取消息
     * @param: key
     * @return java.util.List<org.springframework.data.redis.connection.stream.MapRecord<java.lang.String,java.lang.Object,java.lang.Object>>
     */
    public static List<MapRecord<String, Object, Object>> read(String key){
        return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
    }

    public static List<MapRecord<String, Object, Object>> batchRead(String key, String consumerGroup, String consumerName){

        // 从 Stream 中读取消息
        List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
                Consumer.from(consumerGroup, consumerName),
                StreamReadOptions.empty().count(10),
                StreamOffset.create(key, ReadOffset.lastConsumed())
        );

        return messages;
    }

    public static List<MapRecord<String, String, String>> batchRead(String key, Integer ...size){

        int batchSize = Objects.isNull(size) ? 10 : size[0];

        // 从 Stream 中读取消息
        List<MapRecord<String, String, String>> messages = redisTemplate.opsForStream().read(
                StreamReadOptions.empty().count(batchSize),
                StreamOffset.create(key, ReadOffset.from("0-0"))
        );

        return messages;
    }

    /**
     * 查询指定 Stream 中的消息数量
     *
     * @param streamKey Stream 的名称
     * @return 消息数量
     */
    public static long getStreamMessageCount(String streamKey) {
        // 使用 RedisTemplate 执行 XLEN 命令
        Long messageCount = redisTemplate.opsForStream().size(streamKey);
        return messageCount != null ? messageCount : 0;
    }

    /**
     * 确认消费
     * @param streamKey
     * @param consumerGroup
     * @param recordIds
     * @return java.lang.Long
     */
    public static boolean acknowledge(String streamKey, String consumerGroup, RecordId... recordIds){
        Long acknowledge = redisTemplate.opsForStream().acknowledge(streamKey, consumerGroup, recordIds);
        return !Objects.isNull(acknowledge);
    }

    /**
     * 创建消费组
     * @param streamKey stream-key值
     * @param consumerGroup 消费组
     * @return java.lang.String
     */
    public static boolean createGroup(String streamKey, String consumerGroup){
        try {
            String group = redisTemplate.opsForStream().createGroup(streamKey, consumerGroup);
            return !StrUtil.isBlank(group);
        }catch (Exception e){
            return false;
        }
    }

    /**
     * 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
     * @param: key
     * @param: recordIds
     * @return java.lang.Long
     */
    public static Long del(String key, RecordId... recordIds){
        return redisTemplate.opsForStream().delete(key, recordIds);
    }

    public static Long del(String key, String... recordIds){
        return redisTemplate.opsForStream().delete(key, recordIds);
    }

    /**
     * 获取消费者信息
     * @param key stream-key值
     * @param group 消费组
     * @return org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers
     */
    public static StreamInfo.XInfoConsumers queryConsumers(String key, String group){
        return redisTemplate.opsForStream().consumers(key, group);
    }

    /**
     * 添加Map消息
     * @param key stream对应的key
     * @param value 消息数据
     * @return
     */
    public static String addMap(String key, Map<String, Object> value){
        return redisTemplate.opsForStream().add(key, value).getValue();
    }

    public static void batchInsertWithPipeline(Map<String, String> keyValuePairs, long timeout) {
        redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            keyValuePairs.forEach((key, value) -> {
                redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.SECONDS);
            });
            return null;
        });
    }

    /**
     * 批量插入键值对
     *
     * @param keyValuePairs 键值对Map
     */
    public static void batchInsert(Map<String, Object> keyValuePairs) {
        redisTemplate.opsForValue().multiSet(keyValuePairs);
    }

}

5.创建Listener 监听订阅消息 (订阅)

@Slf4j
@Component
public class RedisListener implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        // stream的key值
        String streamKey = message.getStream();
        //消息ID
        RecordId recordId = message.getId();
        //消息内容
        Map<String, String> msg = message.getValue();
        log.info("接收到【{}】消息===={}",streamKey,msg);

        for (Map.Entry<String, String> entry : msg.entrySet()) {
            String value = entry.getValue();

            boolean sendMessage = true;
            List<AttendanceCardRecordApiDTO> attendanceCardRecordApiDTOS = null;
            RedisMq redisMq = SpringUtil.getBean(RedisMq.class);
            try {
              //TODO 业务实现
            }catch (Exception e){
                // 推送到异常队列
                sendMessage = RedisUtil.sendMessage(redisMq.getStream().getErrorKeyName(), value);
                log.error(e.getMessage());
            }finally {
                // 确认消息消息成功
                if (sendMessage){
                    RedisUtil.acknowledge(streamKey, redisMq.getStream().getGroup().getGroupName(), recordId);
                    RedisUtil.del(streamKey, recordId);
                }
            }
        }
    }
}

6.通过工具类使用

//通过工具类使用
boolean sendMessage = RedisUtil.sendMessage(redisMq.getStream().getKeyName(), message);