Преглед изворни кода

fix: 解决 redis mq 消息丢失问题

gaibu пре 2 година
родитељ
комит
a6c92816f0

+ 20 - 1
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java

@@ -3,6 +3,8 @@ package cn.iocoder.yudao.framework.mq.scheduler;
 import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
 import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
 import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.connection.stream.Consumer;
@@ -17,6 +19,7 @@ import org.springframework.scheduling.annotation.Scheduled;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 这个定时器用于处理,crash 之后的消费者未消费完的消息
@@ -24,19 +27,35 @@ import java.util.Map;
 @Slf4j
 @EnableScheduling
 public class PendingMessageScheduler {
-
+    private static final String LOCK_KEY = "redis:pending:msg:lock";
     @Autowired
     private List<AbstractStreamMessageListener<?>> listeners;
     @Autowired
     private RedisMQTemplate redisTemplate;
     @Value("${spring.application.name}")
     private String groupName;
+    @Autowired
+    private RedissonClient redissonClient;
 
     /**
      * 一分钟执行一次
      */
     @Scheduled(fixedRate = 60 * 1000)
     public void processPendingMessage() {
+        final RLock lock = redissonClient.getLock(LOCK_KEY);
+        try {
+            // 尝试加锁,最多等待 30 秒,上锁以后 60 秒自动解锁
+            boolean lockFlag = lock.tryLock(30, 60, TimeUnit.SECONDS);
+            if (lockFlag) {
+                execute();
+            }
+        } catch (InterruptedException e) {
+            log.error("获取锁失败", e);
+        }
+
+    }
+
+    private void execute() {
         StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
 
         for (AbstractStreamMessageListener<?> listener : listeners) {