Browse Source

!332 redis mq消息丢失问题
Merge pull request !332 from 与或非/issues/I5RFO2

芋道源码 2 years ago
parent
commit
f469c5caaf

+ 19 - 3
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java

@@ -8,8 +8,11 @@ import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
 import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
 import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
 import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
+import cn.iocoder.yudao.framework.mq.job.RedisPendingMessageResendJob;
 import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
 import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.data.redis.connection.RedisServerCommands;
@@ -24,7 +27,7 @@ import org.springframework.data.redis.listener.ChannelTopic;
 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
 import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
-import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
 import java.util.List;
 import java.util.Properties;
@@ -35,6 +38,7 @@ import java.util.Properties;
  * @author 芋道源码
  */
 @Slf4j
+@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
 public class YudaoMQAutoConfiguration {
 
@@ -69,9 +73,20 @@ public class YudaoMQAutoConfiguration {
         return container;
     }
 
+    /**
+     * 创建 Redis Stream 重新消费的任务
+     */
+    @Bean
+    public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractStreamMessageListener<?>> listeners,
+                                                                     RedisMQTemplate redisTemplate,
+                                                                     @Value("${spring.application.name}") String groupName,
+                                                                     RedissonClient redissonClient) {
+        return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
+    }
+
     /**
      * 创建 Redis Stream 集群消费的容器
-     *
+     * <p>
      * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
      */
     @Bean(initMethod = "start", destroyMethod = "stop")
@@ -99,7 +114,8 @@ public class YudaoMQAutoConfiguration {
             // 创建 listener 对应的消费者分组
             try {
                 redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
-            } catch (Exception ignore) {}
+            } catch (Exception ignore) {
+            }
             // 设置 listener 对应的 redisTemplate
             listener.setRedisMQTemplate(redisMQTemplate);
             // 创建 Consumer 对象

+ 80 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java

@@ -0,0 +1,80 @@
+package cn.iocoder.yudao.framework.mq.job;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import org.springframework.data.redis.connection.stream.Consumer;
+import org.springframework.data.redis.connection.stream.MapRecord;
+import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
+import org.springframework.data.redis.connection.stream.ReadOffset;
+import org.springframework.data.redis.connection.stream.StreamOffset;
+import org.springframework.data.redis.connection.stream.StreamRecords;
+import org.springframework.data.redis.core.StreamOperations;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 这个任务用于处理,crash 之后的消费者未消费完的消息
+ */
+@Slf4j
+@AllArgsConstructor
+public class RedisPendingMessageResendJob {
+
+    private static final String LOCK_KEY = "redis:pending:msg:lock";
+
+    private final List<AbstractStreamMessageListener<?>> listeners;
+    private final RedisMQTemplate redisTemplate;
+    private final String groupName;
+    private final RedissonClient redissonClient;
+
+    /**
+     * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题
+     */
+    @Scheduled(cron = "35 * * * * ?")
+    public void messageResend() {
+        RLock lock = redissonClient.getLock(LOCK_KEY);
+        // 尝试加锁
+        if (lock.tryLock()) {
+            try {
+                execute();
+            } catch (Exception ex) {
+                log.error("[messageResend][执行异常]", ex);
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    private void execute() {
+        StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
+        listeners.forEach(listener -> {
+            PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName);
+            // 每个消费者的 pending 队列消息数量
+            Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
+            pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
+                log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
+
+                // 从消费者的 pending 队列中读取消息
+                List<MapRecord<String, Object, Object>> records = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0")));
+                if (CollUtil.isEmpty(records)) {
+                    return;
+                }
+                for (MapRecord<String, Object, Object> record : records) {
+                    // 重新投递消息
+                    redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
+                            .ofObject(record.getValue()) // 设置内容
+                            .withStreamKey(listener.getStreamKey()));
+
+                    // ack 消息消费完成
+                    redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record);
+                }
+            });
+        });
+    }
+}