Browse Source

fix: 解决 redis mq 消息丢失问题

gaibu 2 năm trước cách đây
mục cha
commit
46f3790492

+ 14 - 7
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java

@@ -8,9 +8,11 @@ import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
 import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
 import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
 import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
-import cn.iocoder.yudao.framework.mq.scheduler.PendingMessageScheduler;
+import cn.iocoder.yudao.framework.mq.scheduler.RedisPendingMessageResendJob;
 import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
 import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.data.redis.connection.RedisServerCommands;
@@ -25,6 +27,7 @@ import org.springframework.data.redis.listener.ChannelTopic;
 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
 import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
 import java.util.List;
 import java.util.Properties;
@@ -35,6 +38,7 @@ import java.util.Properties;
  * @author 芋道源码
  */
 @Slf4j
+@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
 public class YudaoMQAutoConfiguration {
 
@@ -70,17 +74,19 @@ public class YudaoMQAutoConfiguration {
     }
 
     /**
-     *
-     * @return
+     * 创建 Redis Stream 重新消费的任务
      */
     @Bean
-    public PendingMessageScheduler pendingMessageScheduler(){
-        return new PendingMessageScheduler();
+    public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractStreamMessageListener<?>> listeners,
+                                                                     RedisMQTemplate redisTemplate,
+                                                                     @Value("${spring.application.name}") String groupName,
+                                                                     RedissonClient redissonClient) {
+        return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
     }
 
     /**
      * 创建 Redis Stream 集群消费的容器
-     *
+     * <p>
      * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
      */
     @Bean(initMethod = "start", destroyMethod = "stop")
@@ -108,7 +114,8 @@ public class YudaoMQAutoConfiguration {
             // 创建 listener 对应的消费者分组
             try {
                 redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
-            } catch (Exception ignore) {}
+            } catch (Exception ignore) {
+            }
             // 设置 listener 对应的 redisTemplate
             listener.setRedisMQTemplate(redisMQTemplate);
             // 创建 Consumer 对象

+ 38 - 36
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java → yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java

@@ -1,12 +1,11 @@
 package cn.iocoder.yudao.framework.mq.scheduler;
 
+import cn.hutool.core.collection.CollUtil;
 import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
 import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.connection.stream.Consumer;
 import org.springframework.data.redis.connection.stream.MapRecord;
 import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
@@ -14,51 +13,53 @@ import org.springframework.data.redis.connection.stream.ReadOffset;
 import org.springframework.data.redis.connection.stream.StreamOffset;
 import org.springframework.data.redis.connection.stream.StreamRecords;
 import org.springframework.data.redis.core.StreamOperations;
-import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 /**
- * 这个定时器用于处理,crash 之后的消费者未消费完的消息
+ * 这个任务用于处理,crash 之后的消费者未消费完的消息
  */
 @Slf4j
-@EnableScheduling
-public class PendingMessageScheduler {
+public class RedisPendingMessageResendJob {
     private static final String LOCK_KEY = "redis:pending:msg:lock";
-    @Autowired
-    private List<AbstractStreamMessageListener<?>> listeners;
-    @Autowired
-    private RedisMQTemplate redisTemplate;
-    @Value("${spring.application.name}")
-    private String groupName;
-    @Autowired
-    private RedissonClient redissonClient;
+
+    private final List<AbstractStreamMessageListener<?>> listeners;
+    private final RedisMQTemplate redisTemplate;
+    private final String groupName;
+    private final RedissonClient redissonClient;
+
+    public RedisPendingMessageResendJob(List<AbstractStreamMessageListener<?>> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient) {
+        this.listeners = listeners;
+        this.redisTemplate = redisTemplate;
+        this.groupName = groupName;
+        this.redissonClient = redissonClient;
+    }
 
     /**
-     * 一分钟执行一次
+     * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题
      */
-    @Scheduled(fixedRate = 60 * 1000)
-    public void processPendingMessage() {
-        final RLock lock = redissonClient.getLock(LOCK_KEY);
-        try {
-            // 尝试加锁,最多等待 30 秒,上锁以后 60 秒自动解锁
-            boolean lockFlag = lock.tryLock(30, 60, TimeUnit.SECONDS);
-            if (lockFlag) {
+    @Scheduled(cron = "35 * * * * ?")
+    public void messageResend() {
+        RLock lock = redissonClient.getLock(LOCK_KEY);
+        log.info("[messageResend][尝试获取锁]");
+        // 尝试加锁
+        if (lock.tryLock()) {
+            try {
                 execute();
+            } catch (Exception ex) {
+                log.error("[messageResend][执行异常]", ex);
+            } finally {
+                lock.unlock();
             }
-        } catch (InterruptedException e) {
-            log.error("获取锁失败", e);
         }
-
     }
 
     private void execute() {
         StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
 
-        for (AbstractStreamMessageListener<?> listener : listeners) {
+        listeners.forEach(listener -> {
             PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName);
             // 每个消费者的pending消息数量
             Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
@@ -69,17 +70,18 @@ public class PendingMessageScheduler {
 
                 // 从消费者的pending队列中读取消息
                 List<MapRecord<String, Object, Object>> retVal = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0")));
+                if (CollUtil.isNotEmpty(retVal)) {
+                    for (MapRecord<String, Object, Object> record : retVal) {
+                        // 重新投递消息
+                        redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
+                                .ofObject(record.getValue()) // 设置内容
+                                .withStreamKey(listener.getStreamKey()));
 
-                for (MapRecord<String, Object, Object> record : retVal) {
-                    // 重新投递消息
-                    redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
-                            .ofObject(record.getValue()) // 设置内容
-                            .withStreamKey(listener.getStreamKey()));
-
-                    // ack 消息消费完成
-                    redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record);
+                        // ack 消息消费完成
+                        redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record);
+                    }
                 }
             });
-        }
+        });
     }
 }