فهرست منبع

1. 升级 springboot 到最新的版本,解决 spring data redis 存储的 bug
2. 梳理 StreamMessageListenerContainer Bean 的创建

YunaiV 4 سال پیش
والد
کامیت
9c76fd4b69

+ 5 - 25
pom.xml

@@ -22,15 +22,15 @@
         <maven.compiler.target>${java.version}</maven.compiler.target>
         <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
         <!-- 统一依赖管理 -->
-        <spring.boot.version>2.4.2</spring.boot.version>
+        <spring.boot.version>2.4.4</spring.boot.version>
         <!-- Web 相关 -->
         <knife4j.version>3.0.2</knife4j.version>
         <swagger-annotations.version>1.5.22</swagger-annotations.version>
         <!-- DB 相关 -->
         <mysql-connector-java.version>5.1.46</mysql-connector-java.version>
         <druid.version>1.2.4</druid.version>
-        <mybatis-plus.version>3.4.1</mybatis-plus.version>
-        <redisson.version>3.14.1</redisson.version>
+        <mybatis-plus.version>3.4.2</mybatis-plus.version>
+        <redisson.version>3.15.1</redisson.version>
         <!-- Config 配置中心相关 -->
         <apollo.version>1.7.0</apollo.version>
         <!-- 服务保障相关 -->
@@ -42,7 +42,7 @@
         <!-- 工具类相关 -->
         <lombok.version>1.16.14</lombok.version>
         <mapstruct.version>1.4.1.Final</mapstruct.version>
-        <hutool.version>5.5.6</hutool.version>
+        <hutool.version>5.6.1</hutool.version>
         <easyexcel.verion>2.2.7</easyexcel.verion>
         <velocity.version>2.2</velocity.version>
         <screw.version>1.0.5</screw.version>
@@ -249,27 +249,7 @@
 
         <dependency>
             <groupId>cn.hutool</groupId>
-            <artifactId>hutool-core</artifactId>
-            <version>${hutool.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>cn.hutool</groupId>
-            <artifactId>hutool-extra</artifactId>
-            <version>${hutool.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>cn.hutool</groupId>
-            <artifactId>hutool-captcha</artifactId>
-            <version>${hutool.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>cn.hutool</groupId>
-            <artifactId>hutool-http</artifactId>
-            <version>${hutool.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>cn.hutool</groupId>
-            <artifactId>hutool-crypto</artifactId>
+            <artifactId>hutool-all</artifactId>
             <version>${hutool.version}</version>
         </dependency>
 

+ 50 - 45
src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java

@@ -1,21 +1,22 @@
 package cn.iocoder.dashboard.framework.redis.config;
 
-import cn.hutool.core.net.NetUtil;
+import cn.hutool.system.SystemUtil;
 import cn.iocoder.dashboard.framework.redis.core.pubsub.AbstractChannelMessageListener;
 import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
-import org.springframework.data.redis.connection.stream.*;
+import org.springframework.data.redis.connection.stream.Consumer;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.connection.stream.ReadOffset;
+import org.springframework.data.redis.connection.stream.StreamOffset;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.listener.ChannelTopic;
 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
 import org.springframework.data.redis.serializer.RedisSerializer;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
-import org.springframework.util.ErrorHandler;
 
-import java.time.Duration;
 import java.util.List;
 
 /**
@@ -25,6 +26,9 @@ import java.util.List;
 @Slf4j
 public class RedisConfig {
 
+    /**
+     * 创建 RedisTemplate Bean,使用 JSON 序列化方式
+     */
     @Bean
     public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
         // 创建 RedisTemplate 对象
@@ -33,11 +37,16 @@ public class RedisConfig {
         template.setConnectionFactory(factory);
         // 使用 String 序列化方式,序列化 KEY 。
         template.setKeySerializer(RedisSerializer.string());
+        template.setHashKeySerializer(RedisSerializer.string());
         // 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。
         template.setValueSerializer(RedisSerializer.json());
+        template.setHashValueSerializer(RedisSerializer.json());
         return template;
     }
 
+    /**
+     * 创建 Redis Pub/Sub 广播消费的容器
+     */
     @Bean
     public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,
                                                                        List<AbstractChannelMessageListener<?>> listeners) {
@@ -54,52 +63,48 @@ public class RedisConfig {
         return container;
     }
 
+    /**
+     * 创建 Redis Stream 集群消费的容器
+     *
+     * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
+     */
     @Bean(initMethod = "start", destroyMethod = "stop")
-    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
-            RedisConnectionFactory factory, List<AbstractStreamMessageListener<?>> listeners) {
-        // 创建配置对象
-        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>>
-                streamMessageListenerContainerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
-                    .builder()
-                    // 一次性最多拉取多少条消息
-                    .batchSize(10)
-                    // 执行消息轮询的执行器
-                    // .executor(this.threadPoolTaskExecutor)
-                    // 消息消费异常的handler
-                    .errorHandler(new ErrorHandler() {
-                        @Override
-                        public void handleError(Throwable t) {
-                            // throw new RuntimeException(t);
-                            t.printStackTrace();
-                        }
-                    })
-                    // 超时时间,设置为0,表示不超时(超时后会抛出异常)
-                    .pollTimeout(Duration.ZERO)
-                    // 序列化器
-                    .serializer(RedisSerializer.string())
-                    .targetType(String.class)
-                    .build();
+    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisTemplate<String, Object> redisTemplate,
+                                                                                                                    List<AbstractStreamMessageListener<?>> listeners) {
+        // 第一步,创建 StreamMessageListenerContainer 容器
+        // 创建 options 配置
+        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
+                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
+                        .batchSize(10) // 一次性最多拉取多少条消息
+                        .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
+                        .build();
+        // 创建 container 对象
+        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer.create(
+                redisTemplate.getRequiredConnectionFactory(), containerOptions);
 
-        // 根据配置对象创建监听容器对象
-        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
-                .create(factory, streamMessageListenerContainerOptions);
-
-        RedisTemplate<String, Object> redisTemplate = redisTemplate(factory);
-
-        // 使用监听容器对象开始监听消费(使用的是手动确认方式)
-        String consumerName = NetUtil.getLocalHostName(); // TODO 需要优化下,晚点参考下 rocketmq consumer 的
-        for (AbstractStreamMessageListener<?> listener : listeners) {
+        // 第二步,注册监听器,消费对应的 Stream 主题
+        String consumerName = buildConsumerName();
+        listeners.forEach(listener -> {
+            // 创建 listener 对应的消费者分组
             try {
                 redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
-            } catch (Exception ignore) {
-//                ignore.printStackTrace();
-            }
-
-            container.receive(Consumer.from(listener.getGroup(), consumerName),
-                    StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()), listener);
-        }
-
+            } catch (Exception ignore) {}
+            // 创建 Consumer 对象
+            Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
+            // 设置 Consumer 消费进度,以最小消费进度为准
+            StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
+            // 设置 Consumer 监听
+            StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
+                    .builder(streamOffset).consumer(consumer)
+                    .autoAcknowledge(false) // 不自动 ack
+                    .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
+            container.register(builder.build(), listener);
+        });
         return container;
     }
 
+    private static String buildConsumerName() {
+        return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
+    }
+
 }

+ 3 - 0
src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java

@@ -46,6 +46,9 @@ public abstract class AbstractStreamMessageListener<T extends StreamMessage>
     @Override
     public void onMessage(ObjectRecord<String, String> message) {
         System.out.println(message);
+        if (true) {
+//            throw new IllegalStateException("测试下");
+        }
     }
 
     /**

+ 1 - 1
src/main/java/cn/iocoder/dashboard/framework/redis/core/util/RedisMessageUtils.java

@@ -31,7 +31,7 @@ public class RedisMessageUtils {
      * @param message 消息
      * @return 消息记录的编号对象
      */
-    public static <T extends StreamMessage> RecordId sendStreamMessage(RedisTemplate<String, String> redisTemplate, T message) {
+    public static <T extends StreamMessage> RecordId sendStreamMessage(RedisTemplate<String, ?> redisTemplate, T message) {
         return redisTemplate.opsForStream().add(StreamRecords.newRecord()
                 .ofObject(JsonUtils.toJsonString(message)) // 设置内容
                 .withStreamKey(message.getStreamKey())); // 设置 stream key

+ 11 - 5
src/test-integration/java/cn/iocoder/dashboard/framework/redis/core/stream/RedisStreamTest.java

@@ -9,6 +9,7 @@ import cn.iocoder.dashboard.modules.system.mq.message.mail.SysMailSendMessage;
 import cn.iocoder.dashboard.modules.system.mq.message.sms.SysSmsSendMessage;
 import org.junit.jupiter.api.Test;
 import org.springframework.context.annotation.Import;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.core.StringRedisTemplate;
 
 import javax.annotation.Resource;
@@ -31,13 +32,18 @@ public class RedisStreamTest  {
         @Resource
         private StringRedisTemplate stringRedisTemplate;
 
+        @Resource
+        private RedisTemplate<String, Object> redisTemplate;
+
         @Test
         public void testProducer01() {
-            // 创建消息
-            SysSmsSendMessage message = new SysSmsSendMessage();
-            message.setMobile("15601691300").setTemplateCode("test");
-            // 发送消息
-            RedisMessageUtils.sendStreamMessage(stringRedisTemplate, message);
+            for (int i = 0; i < 100; i++) {
+                // 创建消息
+                SysSmsSendMessage message = new SysSmsSendMessage();
+                message.setMobile("15601691300").setTemplateCode("test:" + i);
+                // 发送消息
+                RedisMessageUtils.sendStreamMessage(stringRedisTemplate, message);
+            }
         }
 
         @Test