Browse Source

增强 mq starter 组件,支持 interceptor 拦截器机制

YunaiV 3 years ago
parent
commit
a231582637

+ 15 - 9
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.framework.mq.config;
 
 import cn.hutool.system.SystemUtil;
 import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
 import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
 import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
 import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
@@ -9,7 +10,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.connection.stream.Consumer;
 import org.springframework.data.redis.connection.stream.ObjectRecord;
 import org.springframework.data.redis.connection.stream.ReadOffset;
@@ -33,8 +33,12 @@ import java.util.List;
 public class YudaoMQAutoConfiguration {
 
     @Bean
-    public RedisMQTemplate redisMQTemplate(StringRedisTemplate stringRedisTemplate) {
-        return new RedisMQTemplate(stringRedisTemplate);
+    public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
+                                           List<RedisMessageInterceptor> interceptors) {
+        RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
+        // 添加拦截器
+        interceptors.forEach(redisMQTemplate::addInterceptor);
+        return redisMQTemplate;
     }
 
     // ========== 消费者相关 ==========
@@ -44,13 +48,14 @@ public class YudaoMQAutoConfiguration {
      */
     @Bean
     public RedisMessageListenerContainer redisMessageListenerContainer(
-            RedisConnectionFactory factory, List<AbstractChannelMessageListener<?>> listeners) {
+            RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) {
         // 创建 RedisMessageListenerContainer 对象
         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
         // 设置 RedisConnection 工厂。
-        container.setConnectionFactory(factory);
+        container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
         // 添加监听器
         listeners.forEach(listener -> {
+            listener.setRedisMQTemplate(redisMQTemplate);
             container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
             log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
                     listener.getChannel(), listener.getClass().getName());
@@ -65,7 +70,8 @@ public class YudaoMQAutoConfiguration {
      */
     @Bean(initMethod = "start", destroyMethod = "stop")
     public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
-            RedisTemplate<String, Object> redisTemplate, List<AbstractStreamMessageListener<?>> listeners) {
+            RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) {
+        RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
         // 第一步,创建 StreamMessageListenerContainer 容器
         // 创建 options 配置
         StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
@@ -74,8 +80,8 @@ public class YudaoMQAutoConfiguration {
                         .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
                         .build();
         // 创建 container 对象
-        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer.create(
-                redisTemplate.getRequiredConnectionFactory(), containerOptions);
+        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
+                StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions);
 
         // 第二步,注册监听器,消费对应的 Stream 主题
         String consumerName = buildConsumerName();
@@ -85,7 +91,7 @@ public class YudaoMQAutoConfiguration {
                 redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
             } catch (Exception ignore) {}
             // 设置 listener 对应的 redisTemplate
-            listener.setRedisTemplate(redisTemplate);
+            listener.setRedisMQTemplate(redisMQTemplate);
             // 创建 Consumer 对象
             Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
             // 设置 Consumer 消费进度,以最小消费进度为准

+ 49 - 4
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java

@@ -1,13 +1,19 @@
 package cn.iocoder.yudao.framework.mq.core;
 
 import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
 import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
 import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
 import lombok.AllArgsConstructor;
+import lombok.Getter;
 import org.springframework.data.redis.connection.stream.RecordId;
 import org.springframework.data.redis.connection.stream.StreamRecords;
 import org.springframework.data.redis.core.RedisTemplate;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Redis MQ 操作模板类
  *
@@ -16,7 +22,13 @@ import org.springframework.data.redis.core.RedisTemplate;
 @AllArgsConstructor
 public class RedisMQTemplate {
 
+    @Getter
     private final RedisTemplate<String, ?> redisTemplate;
+    /**
+     * 拦截器数组
+     */
+    @Getter
+    private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
 
     /**
      * 发送 Redis 消息,基于 Redis pub/sub 实现
@@ -24,7 +36,13 @@ public class RedisMQTemplate {
      * @param message 消息
      */
     public <T extends AbstractChannelMessage> void send(T message) {
-        redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
+        try {
+            sendMessageBefore(message);
+            // 发送消息
+            redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
+        } finally {
+            sendMessageAfter(message);
+        }
     }
 
     /**
@@ -34,9 +52,36 @@ public class RedisMQTemplate {
      * @return 消息记录的编号对象
      */
     public <T extends AbstractStreamMessage> RecordId send(T message) {
-        return redisTemplate.opsForStream().add(StreamRecords.newRecord()
-                .ofObject(JsonUtils.toJsonString(message)) // 设置内容
-                .withStreamKey(message.getStreamKey())); // 设置 stream key
+        try {
+            sendMessageBefore(message);
+            // 发送消息
+            return redisTemplate.opsForStream().add(StreamRecords.newRecord()
+                    .ofObject(JsonUtils.toJsonString(message)) // 设置内容
+                    .withStreamKey(message.getStreamKey())); // 设置 stream key
+        } finally {
+            sendMessageAfter(message);
+        }
+    }
+
+    /**
+     * 添加拦截器
+     *
+     * @param interceptor 拦截器
+     */
+    public void addInterceptor(RedisMessageInterceptor interceptor) {
+        interceptors.add(interceptor);
+    }
+
+    private void sendMessageBefore(AbstractRedisMessage message) {
+        // 正序
+        interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
+    }
+
+    private void sendMessageAfter(AbstractRedisMessage message) {
+        // 倒序
+        for (int i = interceptors.size() - 1; i >= 0; i--) {
+            interceptors.get(i).sendMessageAfter(message);
+        }
     }
 
 }

+ 26 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java

@@ -0,0 +1,26 @@
+package cn.iocoder.yudao.framework.mq.core.interceptor;
+
+import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
+
+/**
+ * {@link AbstractRedisMessage} 消息拦截器
+ * 通过拦截器,作为插件机制,实现拓展。
+ * 例如说,多租户场景下的 MQ 消息处理
+ *
+ * @author 芋道源码
+ */
+public interface RedisMessageInterceptor {
+
+    default void sendMessageBefore(AbstractRedisMessage message) {
+    }
+
+    default void sendMessageAfter(AbstractRedisMessage message) {
+    }
+
+    default void consumeMessageBefore(AbstractRedisMessage message) {
+    }
+
+    default void consumeMessageAfter(AbstractRedisMessage message) {
+    }
+
+}

+ 33 - 1
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java

@@ -2,11 +2,16 @@ package cn.iocoder.yudao.framework.mq.core.pubsub;
 
 import cn.hutool.core.util.TypeUtil;
 import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
+import lombok.Setter;
 import lombok.SneakyThrows;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
 
 import java.lang.reflect.Type;
+import java.util.List;
 
 /**
  * Redis Pub/Sub 监听器抽象类,用于实现广播消费
@@ -25,6 +30,11 @@ public abstract class AbstractChannelMessageListener<T extends AbstractChannelMe
      * Redis Channel
      */
     private final String channel;
+    /**
+     * RedisMQTemplate
+     */
+    @Setter
+    private RedisMQTemplate redisMQTemplate;
 
     @SneakyThrows
     protected AbstractChannelMessageListener() {
@@ -44,7 +54,13 @@ public abstract class AbstractChannelMessageListener<T extends AbstractChannelMe
     @Override
     public final void onMessage(Message message, byte[] bytes) {
         T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
-        this.onMessage(messageObj);
+        try {
+            consumeMessageBefore(messageObj);
+            // 消费消息
+            this.onMessage(messageObj);
+        } finally {
+            consumeMessageAfter(messageObj);
+        }
     }
 
     /**
@@ -68,4 +84,20 @@ public abstract class AbstractChannelMessageListener<T extends AbstractChannelMe
         return (Class<T>) type;
     }
 
+    private void consumeMessageBefore(AbstractRedisMessage message) {
+        assert redisMQTemplate != null;
+        List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
+        // 正序
+        interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
+    }
+
+    private void consumeMessageAfter(AbstractRedisMessage message) {
+        assert redisMQTemplate != null;
+        List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
+        // 倒序
+        for (int i = interceptors.size() - 1; i >= 0; i--) {
+            interceptors.get(i).consumeMessageAfter(message);
+        }
+    }
+
 }

+ 2 - 1
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java

@@ -1,5 +1,6 @@
 package cn.iocoder.yudao.framework.mq.core.stream;
 
+import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
 /**
@@ -7,7 +8,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
  *
  * @author 芋道源码
  */
-public abstract class AbstractStreamMessage {
+public abstract class AbstractStreamMessage extends AbstractRedisMessage {
 
     /**
      * 获得 Redis Stream Key

+ 36 - 11
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java

@@ -2,15 +2,18 @@ package cn.iocoder.yudao.framework.mq.core.stream;
 
 import cn.hutool.core.util.TypeUtil;
 import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.connection.stream.ObjectRecord;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.stream.StreamListener;
 
 import java.lang.reflect.Type;
+import java.util.List;
 
 /**
  * Redis Stream 监听器抽象类,用于实现集群消费
@@ -39,10 +42,10 @@ public abstract class AbstractStreamMessageListener<T extends AbstractStreamMess
     @Getter
     private String group;
     /**
-     * RedisTemplate
+     * RedisMQTemplate
      */
     @Setter
-    private RedisTemplate<String, ?> redisTemplate;
+    private RedisMQTemplate redisMQTemplate;
 
     @SneakyThrows
     protected AbstractStreamMessageListener() {
@@ -54,14 +57,20 @@ public abstract class AbstractStreamMessageListener<T extends AbstractStreamMess
     public void onMessage(ObjectRecord<String, String> message) {
         // 消费消息
         T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
-        this.onMessage(messageObj);
-        // ack 消息消费完成
-        redisTemplate.opsForStream().acknowledge(group, message);
-        // TODO 芋艿:需要额外考虑以下几个点:
-        // 1. 处理异常的情况
-        // 2. 发送日志;以及事务的结合
-        // 3. 消费日志;以及通用的幂等性
-        // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
+        try {
+            consumeMessageBefore(messageObj);
+            // 消费消息
+            this.onMessage(messageObj);
+            // ack 消息消费完成
+            redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
+            // TODO 芋艿:需要额外考虑以下几个点:
+            // 1. 处理异常的情况
+            // 2. 发送日志;以及事务的结合
+            // 3. 消费日志;以及通用的幂等性
+            // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
+        } finally {
+            consumeMessageAfter(messageObj);
+        }
     }
 
     /**
@@ -85,4 +94,20 @@ public abstract class AbstractStreamMessageListener<T extends AbstractStreamMess
         return (Class<T>) type;
     }
 
+    private void consumeMessageBefore(AbstractRedisMessage message) {
+        assert redisMQTemplate != null;
+        List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
+        // 正序
+        interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
+    }
+
+    private void consumeMessageAfter(AbstractRedisMessage message) {
+        assert redisMQTemplate != null;
+        List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
+        // 倒序
+        for (int i = interceptors.size() - 1; i >= 0; i--) {
+            interceptors.get(i).consumeMessageAfter(message);
+        }
+    }
+
 }