前言:
消息队列主要用于
解耦,消息异步,流量削峰
为什么要用redis实现消息队列,rabbitmq不行吗?
现实情况中,系统有很多情况可以使用消息队列,但是往往不需要rebbitmq的很多功能,引入rebbitmq通常要增加很多成本,Redis作为消息队列使用。
用 Redis 的几种可实施的方案
Redis List:Redis的List数据结构可以被用作简单的消息队列。生产者将消息推送到List的尾部,消费者则从List的头部获取消息。这种方式简单易用,但在高并发情况下可能存在性能瓶颈。
Redis Pub/Sub:Redis的发布订阅功能可以用来实现消息队列。生产者发布消息到指定的频道,而消费者订阅这个频道以接收消息。虽然简单,但Pub/Sub不支持消息持久化,且消息无法在消费者离线时被保存。
Redis Streams: Redis 5.0引入了Streams数据结构,它提供了更复杂的消息队列功能,包括持久性、分组、消息消费确认等功能。因此,Redis Streams通常被视为更适合用作消息队列的数据结构。
Redis Streams 特点和好处
持久性:Redis Stream 支持消息持久化,可以将消息存储在内存中,也可以选择将消息保存到磁盘上,保证消息的持久性。
多消费者分组:Stream 支持将消费者分组,每个消费者组内的消费者可以共享一个消息流,多个消费者组可以并行消费消息。
消息消费确认:消费者可以对已经处理的消息进行确认,这样可以确保消息被正确处理,避免消息的重复消费。
消息时间序列:消息在 Stream 中是有序存储的,每条消息都有唯一的ID和时间戳,可以按照时间顺序进行消费。
复杂数据结构:Stream 支持复杂的消息结构,每个消息可以包含多个字段和值,这样可以存储更加丰富的消息内容。
消费者消费位置:消费者可以控制自己的消费位置,可以从头开始消费,也可以从某个特定的消息ID开始消费。
1.引入Redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.配置yaml文件
redis:
mq:
stream:
# 生产者
keyName: stream:card
errorKeyName: stream:card:error
# 消费组
group:
groupName: card_group
# 消费者
consumers:
# 可以配置多个监听lei
- consumerName: card_consumers
# 监听类
listenerClass: com.test.redismq.listener.CardReadRedisListener
- consumerName: card_consumers1
# 监听类
listenerClass: com.test.redismq.listener.CardReadRedisListener1
- consumerName: card_consumers2
3.配置实体类
RedisMq
@EnableConfigurationProperties
@Configuration
@ConfigurationProperties(prefix = "redis.mq")
@Order(99)//定义加载顺序,值越大越先加载
@Data
public class RedisMq {
private RedisMqStream stream;
}
RedisMqConsumers
@Data
public class RedisMqConsumers {
private String consumerName;
private String listenerClass;
}
RedisMqGroup
@Data
public class RedisMqGroup {
private String groupName;
private List<RedisMqConsumers> consumers;
}
RedisMqStream
@Data
public class RedisMqStream {
private String keyName;
private String errorKeyName;
private RedisMqGroup group;
}
StreamErrorHandler
@Slf4j
public class StreamErrorHandler implements ErrorHandler {
//配置错误
@Override
public void handleError(Throwable t) {
log.error(t.getMessage());
}
}
4.redisConfig(重点)
具体步骤如下:
配置线程池:根据CPU核心数配置线程池,用于处理消息消费任务。
配置监听容器:设置监听容器的批量大小、超时时间、错误处理器等参数。
初始化消费主题和组信息:检查并创建Redis流主题和消费组。
创建监听容器:使用配置好的参数创建监听容器。
订阅消息:通过监听容器订阅指定主题的消息,并指定消息处理器。
启动监听:启动监听容器,开始消费消息。
@Component
public class RedisAutoConfiguration {
@Resource
private RedisStreamUtil redisStreamUtil;
@Resource
private RedisMq redisMq;
@Bean
@Order(300)
public List<Subscription> subscription(RedisConnectionFactory factory){
List<Subscription> resultList = new ArrayList<>();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("redis-stream-");
executor.initialize();
// 创建选项构建器:初始化 StreamMessageListenerContainerOptions 的构建器,用于
//后续配置:
//设置批量大小:指定每次从Redis流中读取的消息数量为5条,减少频繁的I/O操作。
//设置线程池:指定用于处理消息的线程池,确保消息处理的并发性和性能。
//设置轮询超时时间:指定轮询Redis流的超时时间为1秒,避免长时间等待无消息的情况。
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(100)
.executor(executor)
.pollTimeout(Duration.ofSeconds(5))
.errorHandler(new StreamErrorHandler())
.build();
// 遍历所有RedisMqStream
RedisMq redisMq = SpringUtil.getBean(RedisMq.class);
RedisMqStream redisMqStream = redisMq.getStream();
String keyName = redisMqStream.getKeyName();
RedisMqGroup group = redisMqStream.getGroup();
if (Objects.isNull(redisMqStream) || Objects.isNull(redisMqStream.getKeyName())|| Objects.isNull(redisMqStream.getGroup())) {
log.warn("RedisMqStream or keyName or group is null, please check your configuration.");
return resultList;
}
initStream(keyName, group.getGroupName());
// 初始化Stream,创建Stream并创建对应的消费组
// 创建Stream监听容器
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer =
StreamMessageListenerContainer.create(factory, options);
// 遍历每个消费者配置
for (RedisMqConsumers redisMqConsumers : group.getConsumers()) {
Consumer consumer = Consumer.from(group.getGroupName(), redisMqConsumers.getConsumerName());
String listenerClass = redisMqConsumers.getListenerClass();
if (Objects.isNull(listenerClass)){
log.warn("redis stream listenerClass is null");
continue;
}
// 动态创建监听器实例
StreamListener listener = null;
try {
Class<?> aClass = Class.forName(listenerClass);
listener = (StreamListener) aClass.getConstructor().newInstance();
} catch (Exception e) {
log.warn(e.getMessage());
}
/**
* 订阅
*/
if (null == listener) {
// log.error("添加订阅监听器【{}】失败", listenerClass);
continue;
}
Subscription subscription = listenerContainer.receive(consumer,
StreamOffset.create(keyName, ReadOffset.lastConsumed()), listener);
resultList.add(subscription);
log.info("添加订阅监听器【{}】成功", listenerClass);
}
// 启动监听容器
listenerContainer.start();
return resultList;
}
private void initStream(String key, String group){
boolean hasKey = RedisUtil.hasKey(key);
if(!hasKey){
Map<String,Object> map = new HashMap<>(1);
map.put("field","value");
//创建主题
String result = RedisUtil.addMap(key, map);
//创建消费组
RedisUtil.createGroup(key,group);
//将初始化的值删除掉
RedisUtil.del(key,result);
}
}
}
4.创建工具类
@Component
public class RedisUtil {
private static RedisTemplate redisTemplate;
@Autowired
public static void setRedisTemplate(RedisTemplate redisTemplate) {
RedisUtil.redisTemplate = redisTemplate;
}
public static boolean sendMessage(String key, String message){
// 创建消息记录
ObjectRecord<String, String> record = StreamRecords.newRecord()
.ofObject(message) // 消息内容
.withStreamKey(key); // Stream 键
// 发送消息
RecordId recordId = redisTemplate.opsForStream().add(record);
return !Objects.isNull(recordId);
}
public static boolean sendMessage(String key, Map<String, String> message){
RecordId recordId = null;
try {
// 发送消息
recordId = redisTemplate.opsForStream().add(key, message);
}catch (Exception e){
return false;
}
return !Objects.isNull(recordId);
}
/**
* 读取消息
* @param: key
* @return java.util.List<org.springframework.data.redis.connection.stream.MapRecord<java.lang.String,java.lang.Object,java.lang.Object>>
*/
public static List<MapRecord<String, Object, Object>> read(String key){
return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
}
public static List<MapRecord<String, Object, Object>> batchRead(String key, String consumerGroup, String consumerName){
// 从 Stream 中读取消息
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
Consumer.from(consumerGroup, consumerName),
StreamReadOptions.empty().count(10),
StreamOffset.create(key, ReadOffset.lastConsumed())
);
return messages;
}
public static List<MapRecord<String, String, String>> batchRead(String key, Integer ...size){
int batchSize = Objects.isNull(size) ? 10 : size[0];
// 从 Stream 中读取消息
List<MapRecord<String, String, String>> messages = redisTemplate.opsForStream().read(
StreamReadOptions.empty().count(batchSize),
StreamOffset.create(key, ReadOffset.from("0-0"))
);
return messages;
}
/**
* 查询指定 Stream 中的消息数量
*
* @param streamKey Stream 的名称
* @return 消息数量
*/
public static long getStreamMessageCount(String streamKey) {
// 使用 RedisTemplate 执行 XLEN 命令
Long messageCount = redisTemplate.opsForStream().size(streamKey);
return messageCount != null ? messageCount : 0;
}
/**
* 确认消费
* @param streamKey
* @param consumerGroup
* @param recordIds
* @return java.lang.Long
*/
public static boolean acknowledge(String streamKey, String consumerGroup, RecordId... recordIds){
Long acknowledge = redisTemplate.opsForStream().acknowledge(streamKey, consumerGroup, recordIds);
return !Objects.isNull(acknowledge);
}
/**
* 创建消费组
* @param streamKey stream-key值
* @param consumerGroup 消费组
* @return java.lang.String
*/
public static boolean createGroup(String streamKey, String consumerGroup){
try {
String group = redisTemplate.opsForStream().createGroup(streamKey, consumerGroup);
return !StrUtil.isBlank(group);
}catch (Exception e){
return false;
}
}
/**
* 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
* @param: key
* @param: recordIds
* @return java.lang.Long
*/
public static Long del(String key, RecordId... recordIds){
return redisTemplate.opsForStream().delete(key, recordIds);
}
public static Long del(String key, String... recordIds){
return redisTemplate.opsForStream().delete(key, recordIds);
}
/**
* 获取消费者信息
* @param key stream-key值
* @param group 消费组
* @return org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers
*/
public static StreamInfo.XInfoConsumers queryConsumers(String key, String group){
return redisTemplate.opsForStream().consumers(key, group);
}
/**
* 添加Map消息
* @param key stream对应的key
* @param value 消息数据
* @return
*/
public static String addMap(String key, Map<String, Object> value){
return redisTemplate.opsForStream().add(key, value).getValue();
}
public static void batchInsertWithPipeline(Map<String, String> keyValuePairs, long timeout) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
keyValuePairs.forEach((key, value) -> {
redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.SECONDS);
});
return null;
});
}
/**
* 批量插入键值对
*
* @param keyValuePairs 键值对Map
*/
public static void batchInsert(Map<String, Object> keyValuePairs) {
redisTemplate.opsForValue().multiSet(keyValuePairs);
}
}
5.创建Listener 监听订阅消息 (订阅)
@Slf4j
@Component
public class RedisListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
// stream的key值
String streamKey = message.getStream();
//消息ID
RecordId recordId = message.getId();
//消息内容
Map<String, String> msg = message.getValue();
log.info("接收到【{}】消息===={}",streamKey,msg);
for (Map.Entry<String, String> entry : msg.entrySet()) {
String value = entry.getValue();
boolean sendMessage = true;
List<AttendanceCardRecordApiDTO> attendanceCardRecordApiDTOS = null;
RedisMq redisMq = SpringUtil.getBean(RedisMq.class);
try {
//TODO 业务实现
}catch (Exception e){
// 推送到异常队列
sendMessage = RedisUtil.sendMessage(redisMq.getStream().getErrorKeyName(), value);
log.error(e.getMessage());
}finally {
// 确认消息消息成功
if (sendMessage){
RedisUtil.acknowledge(streamKey, redisMq.getStream().getGroup().getGroupName(), recordId);
RedisUtil.del(streamKey, recordId);
}
}
}
}
}
6.通过工具类使用
//通过工具类使用
boolean sendMessage = RedisUtil.sendMessage(redisMq.getStream().getKeyName(), message);