Browse Source

!711 增加 RocketMQ、Kafka、RabbitMQ 消息队列的支持
Merge pull request !711 from 芋道源码/feature/mq-optimize

芋道源码 1 year ago
parent
commit
497d826ac4
61 changed files with 917 additions and 303 deletions
  1. 8 5
      yudao-dependencies/pom.xml
  2. 3 1
      yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/core/KeyValue.java
  3. 16 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/pom.xml
  4. 16 1
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java
  5. 37 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaEnvironmentPostProcessor.java
  6. 47 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaProducerInterceptor.java
  7. 23 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQInitializer.java
  8. 31 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQMessagePostProcessor.java
  9. 3 3
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/redis/TenantRedisMessageInterceptor.java
  10. 46 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java
  11. 53 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java
  12. 36 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java
  13. 269 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java
  14. 2 0
      yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories
  15. 19 2
      yudao-framework/yudao-spring-boot-starter-mq/pom.xml
  16. 0 21
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java
  17. 0 21
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java
  18. 1 3
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java
  19. 29 0
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java
  20. 4 0
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java
  21. 4 0
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java
  22. 16 22
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java
  23. 7 7
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java
  24. 2 2
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java
  25. 4 4
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java
  26. 1 1
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java
  27. 23 0
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java
  28. 6 6
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java
  29. 23 0
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessage.java
  30. 6 6
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java
  31. 6 0
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/package-info.java
  32. 0 62
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java
  33. 2 1
      yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
  34. 1 0
      yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md
  35. 1 0
      yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md
  36. 1 0
      yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md
  37. 1 0
      yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md
  38. 31 0
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/CouponTakeByRegisterConsumer.java
  39. 0 29
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/UserCreateConsumer.java
  40. 4 0
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/package-info.java
  41. 0 29
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/coupon/UserCreateMessage.java
  42. 4 0
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/package-info.java
  43. 4 0
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/producer/package-info.java
  44. 4 0
      yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/package-info.java
  45. 21 0
      yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/user/MemberUserCreateMessage.java
  46. 4 3
      yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.http
  47. 4 0
      yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/consumer/package-info.java
  48. 4 0
      yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/package-info.java
  49. 0 29
      yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/user/MemberUserCreateMessage.java
  50. 4 0
      yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/package-info.java
  51. 4 4
      yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/user/MemberUserProducer.java
  52. 4 4
      yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/sms/SmsTemplateController.http
  53. 5 4
      yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java
  54. 5 3
      yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsSendConsumer.java
  55. 1 9
      yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java
  56. 1 9
      yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsSendMessage.java
  57. 3 3
      yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/mail/MailProducer.java
  58. 3 3
      yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java
  59. 17 0
      yudao-server/src/main/resources/application-dev.yaml
  60. 17 0
      yudao-server/src/main/resources/application-local.yaml
  61. 26 6
      yudao-server/src/main/resources/application.yaml

+ 8 - 5
yudao-dependencies/pom.xml

@@ -30,6 +30,8 @@
         <mybatis-plus-join.version>1.4.7</mybatis-plus-join.version>
         <redisson.version>3.18.0</redisson.version>
         <dm8.jdbc.version>8.1.3.62</dm8.jdbc.version>
+        <!-- 消息队列 -->
+        <rocketmq-spring.version>2.2.3</rocketmq-spring.version>
         <!-- 服务保障相关 -->
         <lock4j.version>2.2.5</lock4j.version>
         <resilience4j.version>1.7.1</resilience4j.version>
@@ -96,11 +98,6 @@
                 <artifactId>yudao-spring-boot-starter-biz-operatelog</artifactId>
                 <version>${revision}</version>
             </dependency>
-            <dependency>
-                <groupId>cn.iocoder.boot</groupId>
-                <artifactId>yudao-spring-boot-starter-biz-trade</artifactId>
-                <version>${revision}</version>
-            </dependency>
             <dependency>
                 <groupId>cn.iocoder.boot</groupId>
                 <artifactId>yudao-spring-boot-starter-biz-dict</artifactId>
@@ -260,6 +257,12 @@
                 <version>${revision}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-spring-boot-starter</artifactId>
+                <version>${rocketmq-spring.version}</version>
+            </dependency>
+
             <!-- 服务保障相关 -->
             <dependency>
                 <groupId>cn.iocoder.boot</groupId>

+ 3 - 1
yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/core/KeyValue.java

@@ -4,6 +4,8 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.io.Serializable;
+
 /**
  * Key Value 的键值对
  *
@@ -12,7 +14,7 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
-public class KeyValue<K, V> {
+public class KeyValue<K, V> implements Serializable {
 
     private K key;
     private V value;

+ 16 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/pom.xml

@@ -48,6 +48,22 @@
         <dependency>
             <groupId>cn.iocoder.boot</groupId>
             <artifactId>yudao-spring-boot-starter-mq</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.amqp</groupId>
+            <artifactId>spring-rabbit</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <optional>true</optional>
         </dependency>
 
         <!-- Test 测试相关 -->

+ 16 - 1
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java

@@ -6,7 +6,9 @@ import cn.iocoder.yudao.framework.redis.config.YudaoCacheProperties;
 import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnoreAspect;
 import cn.iocoder.yudao.framework.tenant.core.db.TenantDatabaseInterceptor;
 import cn.iocoder.yudao.framework.tenant.core.job.TenantJobAspect;
-import cn.iocoder.yudao.framework.tenant.core.mq.TenantRedisMessageInterceptor;
+import cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq.TenantRabbitMQInitializer;
+import cn.iocoder.yudao.framework.tenant.core.mq.redis.TenantRedisMessageInterceptor;
+import cn.iocoder.yudao.framework.tenant.core.mq.rocketmq.TenantRocketMQInitializer;
 import cn.iocoder.yudao.framework.tenant.core.redis.TenantRedisCacheManager;
 import cn.iocoder.yudao.framework.tenant.core.security.TenantSecurityWebFilter;
 import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkService;
@@ -18,6 +20,7 @@ import cn.iocoder.yudao.module.system.api.tenant.TenantApi;
 import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
 import com.baomidou.mybatisplus.extension.plugins.inner.TenantLineInnerInterceptor;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.boot.web.servlet.FilterRegistrationBean;
@@ -92,6 +95,18 @@ public class YudaoTenantAutoConfiguration {
         return new TenantRedisMessageInterceptor();
     }
 
+    @Bean
+    @ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
+    public TenantRabbitMQInitializer tenantRabbitMQInitializer() {
+        return new TenantRabbitMQInitializer();
+    }
+
+    @Bean
+    @ConditionalOnClass(name = "org.apache.rocketmq.spring.core.RocketMQTemplate")
+    public TenantRocketMQInitializer tenantRocketMQInitializer() {
+        return new TenantRocketMQInitializer();
+    }
+
     // ========== Job ==========
 
     @Bean

+ 37 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaEnvironmentPostProcessor.java

@@ -0,0 +1,37 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.kafka;
+
+import cn.hutool.core.util.StrUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.env.EnvironmentPostProcessor;
+import org.springframework.core.env.ConfigurableEnvironment;
+
+/**
+ * 多租户的 Kafka 的 {@link EnvironmentPostProcessor} 实现类
+ *
+ * Kafka Producer 发送消息时,增加 {@link TenantKafkaProducerInterceptor} 拦截器
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class TenantKafkaEnvironmentPostProcessor implements EnvironmentPostProcessor {
+
+    private static final String PROPERTY_KEY_INTERCEPTOR_CLASSES = "spring.kafka.producer.properties.interceptor.classes";
+
+    @Override
+    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
+        // 添加 TenantKafkaProducerInterceptor 拦截器
+        try {
+            String value = environment.getProperty(PROPERTY_KEY_INTERCEPTOR_CLASSES);
+            if (StrUtil.isEmpty(value)) {
+                value = TenantKafkaProducerInterceptor.class.getName();
+            } else {
+                value += "," + TenantKafkaProducerInterceptor.class.getName();
+            }
+            environment.getSystemProperties().put(PROPERTY_KEY_INTERCEPTOR_CLASSES, value);
+        } catch (NoClassDefFoundError ignore) {
+            // 如果触发 NoClassDefFoundError 异常,说明 TenantKafkaProducerInterceptor 类不存在,即没引入 kafka-spring 依赖
+        }
+    }
+
+}

+ 47 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaProducerInterceptor.java

@@ -0,0 +1,47 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.kafka;
+
+import cn.hutool.core.util.ReflectUtil;
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Headers;
+import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
+
+import java.util.Map;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * Kafka 消息队列的多租户 {@link ProducerInterceptor} 实现类
+ *
+ * 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
+ * 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现
+ *
+ * @author 芋道源码
+ */
+public class TenantKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {
+
+    @Override
+    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
+        Long tenantId = TenantContextHolder.getTenantId();
+        if (tenantId != null) {
+            Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers"); // private 属性,没有 get 方法,智能反射
+            headers.add(HEADER_TENANT_ID, tenantId.toString().getBytes());
+        }
+        return record;
+    }
+
+    @Override
+    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
+
+}

+ 23 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQInitializer.java

@@ -0,0 +1,23 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq;
+
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+/**
+ * 多租户的 RabbitMQ 初始化器
+ *
+ * @author 芋道源码
+ */
+public class TenantRabbitMQInitializer implements BeanPostProcessor {
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+        if (bean instanceof RabbitTemplate) {
+            RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
+            rabbitTemplate.addBeforePublishPostProcessors(new TenantRabbitMQMessagePostProcessor());
+        }
+        return bean;
+    }
+
+}

+ 31 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQMessagePostProcessor.java

@@ -0,0 +1,31 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq;
+
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessagePostProcessor;
+import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * RabbitMQ 消息队列的多租户 {@link ProducerInterceptor} 实现类
+ *
+ * 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
+ * 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现
+ *
+ * @author 芋道源码
+ */
+public class TenantRabbitMQMessagePostProcessor implements MessagePostProcessor {
+
+    @Override
+    public Message postProcessMessage(Message message) throws AmqpException {
+        Long tenantId = TenantContextHolder.getTenantId();
+        if (tenantId != null) {
+            message.getMessageProperties().getHeaders().put(HEADER_TENANT_ID, tenantId);
+        }
+        return message;
+    }
+
+}

+ 3 - 3
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java → yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/redis/TenantRedisMessageInterceptor.java

@@ -1,8 +1,8 @@
-package cn.iocoder.yudao.framework.tenant.core.mq;
+package cn.iocoder.yudao.framework.tenant.core.mq.redis;
 
 import cn.hutool.core.util.StrUtil;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
 import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
 
 import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;

+ 46 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java

@@ -0,0 +1,46 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
+
+import cn.hutool.core.lang.Assert;
+import cn.hutool.core.util.StrUtil;
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
+
+import java.util.List;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * RocketMQ 消息队列的多租户 {@link ConsumeMessageHook} 实现类
+ *
+ * Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现
+ *
+ * @author 芋道源码
+ */
+public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook {
+
+    @Override
+    public String hookName() {
+        return getClass().getSimpleName();
+    }
+
+    @Override
+    public void consumeMessageBefore(ConsumeMessageContext context) {
+        // 校验,消息必须是单条,不然设置租户可能不正确
+        List<MessageExt> messages = context.getMsgList();
+        Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size());
+        // 设置租户编号
+        String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID);
+        if (StrUtil.isNotEmpty(tenantId)) {
+            TenantContextHolder.setTenantId(Long.parseLong(tenantId));
+        }
+    }
+
+    @Override
+    public void consumeMessageAfter(ConsumeMessageContext context) {
+        TenantContextHolder.clear();
+    }
+
+}

+ 53 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java

@@ -0,0 +1,53 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+/**
+ * 多租户的 RocketMQ 初始化器
+ *
+ * @author 芋道源码
+ */
+public class TenantRocketMQInitializer implements BeanPostProcessor {
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+        if (bean instanceof DefaultRocketMQListenerContainer) {
+            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
+            initTenantConsumer(container.getConsumer());
+        } else if (bean instanceof RocketMQTemplate) {
+            RocketMQTemplate template = (RocketMQTemplate) bean;
+            initTenantProducer(template.getProducer());
+        }
+        return bean;
+    }
+
+    private void initTenantProducer(DefaultMQProducer producer) {
+        if (producer == null) {
+            return;
+        }
+        DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
+        if (producerImpl == null) {
+            return;
+        }
+        producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
+    }
+
+    private void initTenantConsumer(DefaultMQPushConsumer consumer) {
+        if (consumer == null) {
+            return;
+        }
+        DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
+        if (consumerImpl == null) {
+            return;
+        }
+        consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
+    }
+
+}

+ 36 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java

@@ -0,0 +1,36 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
+
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * RocketMQ 消息队列的多租户 {@link SendMessageHook} 实现类
+ *
+ * Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
+ *
+ * @author 芋道源码
+ */
+public class TenantRocketMQSendMessageHook implements SendMessageHook {
+
+    @Override
+    public String hookName() {
+        return getClass().getSimpleName();
+    }
+
+    @Override
+    public void sendMessageBefore(SendMessageContext sendMessageContext) {
+        Long tenantId = TenantContextHolder.getTenantId();
+        if (tenantId == null) {
+            return;
+        }
+        sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString());
+    }
+
+    @Override
+    public void sendMessageAfter(SendMessageContext sendMessageContext) {
+    }
+
+}

+ 269 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java

@@ -0,0 +1,269 @@
+/*
+ * Copyright 2002-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.handler.invocation;
+
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
+import org.springframework.core.DefaultParameterNameDiscoverer;
+import org.springframework.core.MethodParameter;
+import org.springframework.core.ParameterNameDiscoverer;
+import org.springframework.core.ResolvableType;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.handler.HandlerMethod;
+import org.springframework.util.ObjectUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * Extension of {@link HandlerMethod} that invokes the underlying method with
+ * argument values resolved from the current HTTP request through a list of
+ * {@link HandlerMethodArgumentResolver}.
+ *
+ * 针对 rabbitmq-spring 和 kafka-spring,不存在合适的拓展点,可以实现 Consumer 消费前,读取 Header 中的 tenant-id 设置到 {@link TenantContextHolder} 中
+ * TODO 芋艿:持续跟进,看看有没新的拓展点
+ *
+ * @author Rossen Stoyanchev
+ * @author Juergen Hoeller
+ * @since 4.0
+ */
+public class InvocableHandlerMethod extends HandlerMethod {
+
+    private static final Object[] EMPTY_ARGS = new Object[0];
+
+    private HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite();
+
+    private ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
+
+    /**
+     * Create an instance from a {@code HandlerMethod}.
+     */
+    public InvocableHandlerMethod(HandlerMethod handlerMethod) {
+        super(handlerMethod);
+    }
+
+    /**
+     * Create an instance from a bean instance and a method.
+     */
+    public InvocableHandlerMethod(Object bean, Method method) {
+        super(bean, method);
+    }
+
+    /**
+     * Construct a new handler method with the given bean instance, method name and parameters.
+     * @param bean the object bean
+     * @param methodName the method name
+     * @param parameterTypes the method parameter types
+     * @throws NoSuchMethodException when the method cannot be found
+     */
+    public InvocableHandlerMethod(Object bean, String methodName, Class<?>... parameterTypes)
+            throws NoSuchMethodException {
+
+        super(bean, methodName, parameterTypes);
+    }
+
+    /**
+     * Set {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers} to use for resolving method argument values.
+     */
+    public void setMessageMethodArgumentResolvers(HandlerMethodArgumentResolverComposite argumentResolvers) {
+        this.resolvers = argumentResolvers;
+    }
+
+    /**
+     * Set the ParameterNameDiscoverer for resolving parameter names when needed
+     * (e.g. default request attribute name).
+     * <p>Default is a {@link DefaultParameterNameDiscoverer}.
+     */
+    public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDiscoverer) {
+        this.parameterNameDiscoverer = parameterNameDiscoverer;
+    }
+
+    /**
+     * Invoke the method after resolving its argument values in the context of the given message.
+     * <p>Argument values are commonly resolved through
+     * {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers}.
+     * The {@code providedArgs} parameter however may supply argument values to be used directly,
+     * i.e. without argument resolution.
+     * <p>Delegates to {@link #getMethodArgumentValues} and calls {@link #doInvoke} with the
+     * resolved arguments.
+     * @param message the current message being processed
+     * @param providedArgs "given" arguments matched by type, not resolved
+     * @return the raw value returned by the invoked method
+     * @throws Exception raised if no suitable argument resolver can be found,
+     * or if the method raised an exception
+     * @see #getMethodArgumentValues
+     * @see #doInvoke
+     */
+    @Nullable
+    public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
+        Object[] args = getMethodArgumentValues(message, providedArgs);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Arguments: " + Arrays.toString(args));
+        }
+        // 注意:如下是本类的改动点!!!
+        // 情况一:无租户编号的情况
+        Long tenantId= parseTenantId(message);
+        if (tenantId == null) {
+            return doInvoke(args);
+        }
+        // 情况二:有租户的情况下
+        return TenantUtils.execute(tenantId, () -> doInvoke(args));
+    }
+
+    private Long parseTenantId(Message<?> message) {
+        Object tenantId = message.getHeaders().get(HEADER_TENANT_ID);
+        if (tenantId == null) {
+            return null;
+        }
+        if (tenantId instanceof Long) {
+            return (Long) tenantId;
+        }
+        if (tenantId instanceof Number) {
+            return ((Number) tenantId).longValue();
+        }
+        if (tenantId instanceof String) {
+            return Long.parseLong((String) tenantId);
+        }
+        if (tenantId instanceof byte[]) {
+            return Long.parseLong(new String((byte[]) tenantId));
+        }
+        throw new IllegalArgumentException("未知的数据类型:" + tenantId);
+    }
+
+    /**
+     * Get the method argument values for the current message, checking the provided
+     * argument values and falling back to the configured argument resolvers.
+     * <p>The resulting array will be passed into {@link #doInvoke}.
+     * @since 5.1.2
+     */
+    protected Object[] getMethodArgumentValues(Message<?> message, Object... providedArgs) throws Exception {
+        MethodParameter[] parameters = getMethodParameters();
+        if (ObjectUtils.isEmpty(parameters)) {
+            return EMPTY_ARGS;
+        }
+
+        Object[] args = new Object[parameters.length];
+        for (int i = 0; i < parameters.length; i++) {
+            MethodParameter parameter = parameters[i];
+            parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
+            args[i] = findProvidedArgument(parameter, providedArgs);
+            if (args[i] != null) {
+                continue;
+            }
+            if (!this.resolvers.supportsParameter(parameter)) {
+                throw new MethodArgumentResolutionException(
+                        message, parameter, formatArgumentError(parameter, "No suitable resolver"));
+            }
+            try {
+                args[i] = this.resolvers.resolveArgument(parameter, message);
+            }
+            catch (Exception ex) {
+                // Leave stack trace for later, exception may actually be resolved and handled...
+                if (logger.isDebugEnabled()) {
+                    String exMsg = ex.getMessage();
+                    if (exMsg != null && !exMsg.contains(parameter.getExecutable().toGenericString())) {
+                        logger.debug(formatArgumentError(parameter, exMsg));
+                    }
+                }
+                throw ex;
+            }
+        }
+        return args;
+    }
+
+    /**
+     * Invoke the handler method with the given argument values.
+     */
+    @Nullable
+    protected Object doInvoke(Object... args) throws Exception {
+        try {
+            return getBridgedMethod().invoke(getBean(), args);
+        }
+        catch (IllegalArgumentException ex) {
+            assertTargetBean(getBridgedMethod(), getBean(), args);
+            String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
+            throw new IllegalStateException(formatInvokeError(text, args), ex);
+        }
+        catch (InvocationTargetException ex) {
+            // Unwrap for HandlerExceptionResolvers ...
+            Throwable targetException = ex.getTargetException();
+            if (targetException instanceof RuntimeException) {
+                throw (RuntimeException) targetException;
+            }
+            else if (targetException instanceof Error) {
+                throw (Error) targetException;
+            }
+            else if (targetException instanceof Exception) {
+                throw (Exception) targetException;
+            }
+            else {
+                throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException);
+            }
+        }
+    }
+
+    MethodParameter getAsyncReturnValueType(@Nullable Object returnValue) {
+        return new AsyncResultMethodParameter(returnValue);
+    }
+
+    private class AsyncResultMethodParameter extends HandlerMethodParameter {
+
+        @Nullable
+        private final Object returnValue;
+
+        private final ResolvableType returnType;
+
+        public AsyncResultMethodParameter(@Nullable Object returnValue) {
+            super(-1);
+            this.returnValue = returnValue;
+            this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric();
+        }
+
+        protected AsyncResultMethodParameter(AsyncResultMethodParameter original) {
+            super(original);
+            this.returnValue = original.returnValue;
+            this.returnType = original.returnType;
+        }
+
+        @Override
+        public Class<?> getParameterType() {
+            if (this.returnValue != null) {
+                return this.returnValue.getClass();
+            }
+            if (!ResolvableType.NONE.equals(this.returnType)) {
+                return this.returnType.toClass();
+            }
+            return super.getParameterType();
+        }
+
+        @Override
+        public Type getGenericParameterType() {
+            return this.returnType.getType();
+        }
+
+        @Override
+        public AsyncResultMethodParameter clone() {
+            return new AsyncResultMethodParameter(this);
+        }
+    }
+
+}

+ 2 - 0
yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories

@@ -0,0 +1,2 @@
+org.springframework.boot.env.EnvironmentPostProcessor=\
+  cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor

+ 19 - 2
yudao-framework/yudao-spring-boot-starter-mq/pom.xml

@@ -12,7 +12,7 @@
     <packaging>jar</packaging>
 
     <name>${project.artifactId}</name>
-    <description>消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费</description>
+    <description>消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种</description>
     <url>https://github.com/YunaiV/ruoyi-vue-pro</url>
 
     <dependencies>
@@ -21,6 +21,23 @@
             <groupId>cn.iocoder.boot</groupId>
             <artifactId>yudao-spring-boot-starter-redis</artifactId>
         </dependency>
+
+        <!-- 消息队列相关 -->
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.amqp</groupId>
+            <artifactId>spring-rabbit</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <optional>true</optional>
+        </dependency>
     </dependencies>
 
-</project>
+</project>

+ 0 - 21
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java

@@ -1,21 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.pubsub;
-
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Redis Channel Message 抽象类
- *
- * @author 芋道源码
- */
-public abstract class AbstractChannelMessage extends AbstractRedisMessage {
-
-    /**
-     * 获得 Redis Channel
-     *
-     * @return Channel
-     */
-    @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
-    public abstract String getChannel();
-
-}

+ 0 - 21
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java

@@ -1,21 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.stream;
-
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Redis Stream Message 抽象类
- *
- * @author 芋道源码
- */
-public abstract class AbstractStreamMessage extends AbstractRedisMessage {
-
-    /**
-     * 获得 Redis Stream Key
-     *
-     * @return Channel
-     */
-    @JsonIgnore // 避免序列化
-    public abstract String getStreamKey();
-
-}

+ 1 - 3
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java

@@ -1,6 +1,4 @@
 /**
- * 消息队列,基于 Redis 提供:
- * 1. 基于 Pub/Sub 实现广播消费
- * 2. 基于 Stream 实现集群消费
+ * 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种
  */
 package cn.iocoder.yudao.framework.mq;

+ 29 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java

@@ -0,0 +1,29 @@
+package cn.iocoder.yudao.framework.mq.rabbitmq.config;
+
+import cn.hutool.core.util.ReflectUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.utils.SerializationUtils;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+
+import java.lang.reflect.Field;
+
+/**
+ * RabbitMQ 消息队列配置类
+ *
+ * @author 芋道源码
+ */
+@AutoConfiguration
+@Slf4j
+@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
+public class YudaoRabbitMQAutoConfiguration {
+
+    static {
+        // 强制设置 SerializationUtils 的 TRUST_ALL 为 true,避免 RabbitMQ Consumer 反序列化消息报错
+        // 为什么不通过设置 spring.amqp.deserialization.trust.all 呢?因为可能在 SerializationUtils static 初始化后
+        Field trustAllField = ReflectUtil.getField(SerializationUtils.class, "TRUST_ALL");
+        ReflectUtil.removeFinalModify(trustAllField);
+        ReflectUtil.setFieldValue(SerializationUtils.class, trustAllField, true);
+    }
+
+}

+ 4 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java

@@ -0,0 +1,4 @@
+/**
+ * 占位符,无特殊逻辑
+ */
+package cn.iocoder.yudao.framework.mq.rabbitmq.core;

+ 4 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java

@@ -0,0 +1,4 @@
+/**
+ * 消息队列,基于 RabbitMQ 提供
+ */
+package cn.iocoder.yudao.framework.mq.rabbitmq;

+ 16 - 22
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java → yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java

@@ -1,21 +1,20 @@
-package cn.iocoder.yudao.framework.mq.config;
+package cn.iocoder.yudao.framework.mq.redis.config;
 
 import cn.hutool.core.map.MapUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.system.SystemUtil;
 import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
-import cn.iocoder.yudao.framework.mq.job.RedisPendingMessageResendJob;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
+import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
 import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.data.redis.connection.RedisServerCommands;
 import org.springframework.data.redis.connection.stream.Consumer;
@@ -27,7 +26,6 @@ import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.data.redis.listener.ChannelTopic;
 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
@@ -42,7 +40,7 @@ import java.util.Properties;
 @Slf4j
 @EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
-public class YudaoMQAutoConfiguration {
+public class YudaoRedisMQAutoConfiguration {
 
     @Bean
     public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
@@ -59,10 +57,9 @@ public class YudaoMQAutoConfiguration {
      * 创建 Redis Pub/Sub 广播消费的容器
      */
     @Bean(initMethod = "start", destroyMethod = "stop")
-    @ConditionalOnBean(AbstractChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
-    @ConditionalOnProperty(prefix = "yudao.mq.redis.pubsub", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.pubsub.enable=false 禁用多租户
+    @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
     public RedisMessageListenerContainer redisMessageListenerContainer(
-            RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) {
+            RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
         // 创建 RedisMessageListenerContainer 对象
         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
         // 设置 RedisConnection 工厂。
@@ -81,9 +78,8 @@ public class YudaoMQAutoConfiguration {
      * 创建 Redis Stream 重新消费的任务
      */
     @Bean
-    @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
-    @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户
-    public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractStreamMessageListener<?>> listeners,
+    @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
+    public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
                                                                      RedisMQTemplate redisTemplate,
                                                                      @Value("${spring.application.name}") String groupName,
                                                                      RedissonClient redissonClient) {
@@ -92,14 +88,13 @@ public class YudaoMQAutoConfiguration {
 
     /**
      * 创建 Redis Stream 集群消费的容器
-     * <p>
-     * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
+     *
+     * 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>
      */
     @Bean(initMethod = "start", destroyMethod = "stop")
-    @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
-    @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户
+    @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
     public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
-            RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) {
+            RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
         RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
         checkRedisVersion(redisTemplate);
         // 第一步,创建 StreamMessageListenerContainer 容器
@@ -111,8 +106,7 @@ public class YudaoMQAutoConfiguration {
                         .build();
         // 创建 container 对象
         StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
-//                StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions);
-                DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
+                StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
 
         // 第二步,注册监听器,消费对应的 Stream 主题
         String consumerName = buildConsumerName();

+ 7 - 7
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java → yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java

@@ -1,10 +1,10 @@
-package cn.iocoder.yudao.framework.mq.core;
+package cn.iocoder.yudao.framework.mq.redis.core;
 
 import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import org.springframework.data.redis.connection.stream.RecordId;
@@ -35,7 +35,7 @@ public class RedisMQTemplate {
      *
      * @param message 消息
      */
-    public <T extends AbstractChannelMessage> void send(T message) {
+    public <T extends AbstractRedisChannelMessage> void send(T message) {
         try {
             sendMessageBefore(message);
             // 发送消息
@@ -51,7 +51,7 @@ public class RedisMQTemplate {
      * @param message 消息
      * @return 消息记录的编号对象
      */
-    public <T extends AbstractStreamMessage> RecordId send(T message) {
+    public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
         try {
             sendMessageBefore(message);
             // 发送消息

+ 2 - 2
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java → yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java

@@ -1,6 +1,6 @@
-package cn.iocoder.yudao.framework.mq.core.interceptor;
+package cn.iocoder.yudao.framework.mq.redis.core.interceptor;
 
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
 
 /**
  * {@link AbstractRedisMessage} 消息拦截器

+ 4 - 4
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java → yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java

@@ -1,8 +1,8 @@
-package cn.iocoder.yudao.framework.mq.job;
+package cn.iocoder.yudao.framework.mq.redis.core.job;
 
 import cn.hutool.core.collection.CollUtil;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RLock;
@@ -33,7 +33,7 @@ public class RedisPendingMessageResendJob {
      */
     private static final int EXPIRE_TIME = 5 * 60;
 
-    private final List<AbstractStreamMessageListener<?>> listeners;
+    private final List<AbstractRedisStreamMessageListener<?>> listeners;
     private final RedisMQTemplate redisTemplate;
     private final String groupName;
     private final RedissonClient redissonClient;

+ 1 - 1
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java → yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java

@@ -1,4 +1,4 @@
-package cn.iocoder.yudao.framework.mq.core.message;
+package cn.iocoder.yudao.framework.mq.redis.core.message;
 
 import lombok.Data;
 

+ 23 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java

@@ -0,0 +1,23 @@
+package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
+
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Redis Channel Message 抽象类
+ *
+ * @author 芋道源码
+ */
+public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage {
+
+    /**
+     * 获得 Redis Channel,默认使用类名
+     *
+     * @return Channel
+     */
+    @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
+    public String getChannel() {
+        return getClass().getSimpleName();
+    }
+
+}

+ 6 - 6
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java → yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java

@@ -1,10 +1,10 @@
-package cn.iocoder.yudao.framework.mq.core.pubsub;
+package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
 
 import cn.hutool.core.util.TypeUtil;
 import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
 import lombok.Setter;
 import lombok.SneakyThrows;
 import org.springframework.data.redis.connection.Message;
@@ -20,7 +20,7 @@ import java.util.List;
  *
  * @author 芋道源码
  */
-public abstract class AbstractChannelMessageListener<T extends AbstractChannelMessage> implements MessageListener {
+public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {
 
     /**
      * 消息类型
@@ -37,7 +37,7 @@ public abstract class AbstractChannelMessageListener<T extends AbstractChannelMe
     private RedisMQTemplate redisMQTemplate;
 
     @SneakyThrows
-    protected AbstractChannelMessageListener() {
+    protected AbstractRedisChannelMessageListener() {
         this.messageType = getMessageClass();
         this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();
     }

+ 23 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessage.java

@@ -0,0 +1,23 @@
+package cn.iocoder.yudao.framework.mq.redis.core.stream;
+
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Redis Stream Message 抽象类
+ *
+ * @author 芋道源码
+ */
+public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage {
+
+    /**
+     * 获得 Redis Stream Key,默认使用类名
+     *
+     * @return Channel
+     */
+    @JsonIgnore // 避免序列化
+    public String getStreamKey() {
+        return getClass().getSimpleName();
+    }
+
+}

+ 6 - 6
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java → yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java

@@ -1,10 +1,10 @@
-package cn.iocoder.yudao.framework.mq.core.stream;
+package cn.iocoder.yudao.framework.mq.redis.core.stream;
 
 import cn.hutool.core.util.TypeUtil;
 import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
@@ -22,7 +22,7 @@ import java.util.List;
  *
  * @author 芋道源码
  */
-public abstract class AbstractStreamMessageListener<T extends AbstractStreamMessage>
+public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage>
         implements StreamListener<String, ObjectRecord<String, String>> {
 
     /**
@@ -48,7 +48,7 @@ public abstract class AbstractStreamMessageListener<T extends AbstractStreamMess
     private RedisMQTemplate redisMQTemplate;
 
     @SneakyThrows
-    protected AbstractStreamMessageListener() {
+    protected AbstractRedisStreamMessageListener() {
         this.messageType = getMessageClass();
         this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
     }

+ 6 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/package-info.java

@@ -0,0 +1,6 @@
+/**
+ * 消息队列,基于 Redis 提供:
+ * 1. 基于 Pub/Sub 实现广播消费
+ * 2. 基于 Stream 实现集群消费
+ */
+package cn.iocoder.yudao.framework.mq.redis;

+ 0 - 62
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java

@@ -1,62 +0,0 @@
-package org.springframework.data.redis.stream;
-
-import cn.hutool.core.util.ReflectUtil;
-import org.springframework.data.redis.connection.RedisConnectionFactory;
-import org.springframework.data.redis.connection.stream.ByteRecord;
-import org.springframework.data.redis.connection.stream.ReadOffset;
-import org.springframework.data.redis.connection.stream.Record;
-import org.springframework.util.Assert;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Function;
-
-/**
- * 拓展 DefaultStreamMessageListenerContainer 实现,解决 Spring Data Redis + Redisson 结合使用时,Redisson 在 Stream 获得不到数据时,返回 null 而不是空 List,导致 NPE 异常。
- * 对应 issue:https://github.com/spring-projects/spring-data-redis/issues/2147 和 https://github.com/redisson/redisson/issues/4006
- * 目前看下来 Spring Data Redis 不肯加 null 判断,Redisson 暂时也没改返回 null 到空 List 的打算,所以暂时只能自己改,哽咽!
- *
- * @author 芋道源码
- */
-public class DefaultStreamMessageListenerContainerX<K, V extends Record<K, ?>> extends DefaultStreamMessageListenerContainer<K, V> {
-
-    /**
-     * 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现
-     */
-    public static <K, V extends Record<K, ?>> StreamMessageListenerContainer<K, V> create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> options) {
-        Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!");
-        Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!");
-        return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options);
-    }
-
-    public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions<K, V> containerOptions) {
-        super(connectionFactory, containerOptions);
-    }
-
-    /**
-     * 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现
-     */
-    @Override
-    public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
-        return this.doRegisterX(getReadTaskX(streamRequest, listener));
-    }
-
-    @SuppressWarnings("unchecked")
-    private StreamPollTask<K, V> getReadTaskX(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
-        StreamPollTask<K, V> task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener);
-        // 修改 readFunction 方法
-        Function<ReadOffset, List<ByteRecord>> readFunction = (Function<ReadOffset, List<ByteRecord>>) ReflectUtil.getFieldValue(task, "readFunction");
-        ReflectUtil.setFieldValue(task, "readFunction", (Function<ReadOffset, List<ByteRecord>>) readOffset -> {
-            List<ByteRecord> records = readFunction.apply(readOffset);
-            //【重点】保证 records 不是空,避免 NPE 的问题!!!
-            return records != null ? records : Collections.emptyList();
-        });
-        return task;
-    }
-
-    private Subscription doRegisterX(Task task) {
-        return ReflectUtil.invoke(this, "doRegister", task);
-    }
-
-}
-

+ 2 - 1
yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

@@ -1 +1,2 @@
-cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration
+cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration
+cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration

+ 1 - 0
yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md

@@ -0,0 +1 @@
+<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>

+ 1 - 0
yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md

@@ -0,0 +1 @@
+<http://www.iocoder.cn/Spring-Boot/Kafka/?yudao>

+ 1 - 0
yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md

@@ -0,0 +1 @@
+<http://www.iocoder.cn/Spring-Boot/RabbitMQ/?yudao>

+ 1 - 0
yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md

@@ -0,0 +1 @@
+<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>

+ 31 - 0
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/CouponTakeByRegisterConsumer.java

@@ -0,0 +1,31 @@
+package cn.iocoder.yudao.module.promotion.mq.consumer.coupon;
+
+import cn.iocoder.yudao.module.member.message.user.MemberUserCreateMessage;
+import cn.iocoder.yudao.module.promotion.service.coupon.CouponService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * 用户注册时,发送优惠劵的消费者,基 {@link MemberUserCreateMessage} 消息
+ *
+ * @author owen
+ */
+@Component
+@Slf4j
+public class CouponTakeByRegisterConsumer {
+
+    @Resource
+    private CouponService couponService;
+
+    @EventListener
+    @Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步
+    public void onMessage(MemberUserCreateMessage message) {
+        log.info("[onMessage][消息内容({})]", message);
+        couponService.takeCouponByRegister(message.getUserId());
+    }
+
+}

+ 0 - 29
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/UserCreateConsumer.java

@@ -1,29 +0,0 @@
-package cn.iocoder.yudao.module.promotion.mq.consumer.coupon;
-
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
-import cn.iocoder.yudao.module.promotion.mq.message.coupon.UserCreateMessage;
-import cn.iocoder.yudao.module.promotion.service.coupon.CouponService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-
-/**
- * 针对 {@link UserCreateMessage} 的消费者
- *
- * @author owen
- */
-@Component
-@Slf4j
-public class UserCreateConsumer extends AbstractStreamMessageListener<UserCreateMessage> {
-
-    @Resource
-    private CouponService couponService;
-
-    @Override
-    public void onMessage(UserCreateMessage message) {
-        log.info("[onMessage][消息内容({})]", message);
-        couponService.takeCouponByRegister(message.getUserId());
-    }
-
-}

+ 4 - 0
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/package-info.java

@@ -0,0 +1,4 @@
+/**
+ * 消息队列的消费者
+ */
+package cn.iocoder.yudao.module.promotion.mq.consumer;

+ 0 - 29
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/coupon/UserCreateMessage.java

@@ -1,29 +0,0 @@
-package cn.iocoder.yudao.module.promotion.mq.message.coupon;
-
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-import javax.validation.constraints.NotNull;
-
-/**
- * 会员用户创建消息
- *
- * @author owen
- */
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class UserCreateMessage extends AbstractStreamMessage {
-
-    /**
-     * 用户编号
-     */
-    @NotNull(message = "用户编号不能为空")
-    private Long userId;
-
-    @Override
-    public String getStreamKey() {
-        return "member.user.create";
-    }
-
-}

+ 4 - 0
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/package-info.java

@@ -0,0 +1,4 @@
+/**
+ * 消息队列的消息
+ */
+package cn.iocoder.yudao.module.promotion.mq.message;

+ 4 - 0
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/producer/package-info.java

@@ -0,0 +1,4 @@
+/**
+ * 消息队列的生产者
+ */
+package cn.iocoder.yudao.module.promotion.mq.producer;

+ 4 - 0
yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/package-info.java

@@ -0,0 +1,4 @@
+/**
+ * 消息队列的消息
+ */
+package cn.iocoder.yudao.module.member.message;

+ 21 - 0
yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/user/MemberUserCreateMessage.java

@@ -0,0 +1,21 @@
+package cn.iocoder.yudao.module.member.message.user;
+
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * 会员用户创建消息
+ *
+ * @author owen
+ */
+@Data
+public class MemberUserCreateMessage {
+
+    /**
+     * 用户编号
+     */
+    @NotNull(message = "用户编号不能为空")
+    private Long userId;
+
+}

+ 4 - 3
yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.http

@@ -4,7 +4,7 @@ Content-Type: application/json
 tenant-id: {{appTenentId}}
 
 {
-  "mobile": "15601691300",
+  "mobile": "15601691388",
   "password": "admin123"
 }
 
@@ -14,7 +14,7 @@ Content-Type: application/json
 tenant-id: {{appTenentId}}
 
 {
-  "mobile": "15601691399",
+  "mobile": "15601691388",
   "scene": 1
 }
 
@@ -22,9 +22,10 @@ tenant-id: {{appTenentId}}
 POST {{appApi}}/member/auth/sms-login
 Content-Type: application/json
 tenant-id: {{appTenentId}}
+terminal: 30
 
 {
-  "mobile": "15601691301",
+  "mobile": "15601691388",
   "code": 9999
 }
 

+ 4 - 0
yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/consumer/package-info.java

@@ -0,0 +1,4 @@
+/**
+ * 消息队列的消费者
+ */
+package cn.iocoder.yudao.module.member.mq.consumer;

+ 4 - 0
yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/package-info.java

@@ -0,0 +1,4 @@
+/**
+ * 消息队列的消息
+ */
+package cn.iocoder.yudao.module.member.mq.message;

+ 0 - 29
yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/user/MemberUserCreateMessage.java

@@ -1,29 +0,0 @@
-package cn.iocoder.yudao.module.member.mq.message.user;
-
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-import javax.validation.constraints.NotNull;
-
-/**
- * 会员用户创建消息
- *
- * @author owen
- */
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class MemberUserCreateMessage extends AbstractStreamMessage {
-
-    /**
-     * 用户编号
-     */
-    @NotNull(message = "用户编号不能为空")
-    private Long userId;
-
-    @Override
-    public String getStreamKey() {
-        return "member.user.create";
-    }
-
-}

+ 4 - 0
yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/package-info.java

@@ -0,0 +1,4 @@
+/**
+ * 消息队列的生产者
+ */
+package cn.iocoder.yudao.module.member.mq.producer;

+ 4 - 4
yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/user/MemberUserProducer.java

@@ -1,8 +1,8 @@
 package cn.iocoder.yudao.module.member.mq.producer.user;
 
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.module.member.mq.message.user.MemberUserCreateMessage;
+import cn.iocoder.yudao.module.member.message.user.MemberUserCreateMessage;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -17,7 +17,7 @@ import javax.annotation.Resource;
 public class MemberUserProducer {
 
     @Resource
-    private RedisMQTemplate redisMQTemplate;
+    private ApplicationContext applicationContext;
 
     /**
      * 发送 {@link MemberUserCreateMessage} 消息
@@ -25,7 +25,7 @@ public class MemberUserProducer {
      * @param userId 用户编号
      */
     public void sendUserCreateMessage(Long userId) {
-        redisMQTemplate.send(new MemberUserCreateMessage().setUserId(userId));
+        applicationContext.publishEvent(new MemberUserCreateMessage().setUserId(userId));
     }
 
 }

+ 4 - 4
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/sms/SmsTemplateController.http

@@ -6,9 +6,9 @@ tenant-id: {{adminTenentId}}
 
 {
   "templateCode": "test_01",
-  "mobile": "156016913900",
-  "params": {
-    "key01": "value01",
-    "key02": "value02"
+  "mobile": "15601691390",
+  "templateParams": {
+    "operation": "value01",
+    "code": "value02"
   }
 }

+ 5 - 4
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java

@@ -1,10 +1,10 @@
 package cn.iocoder.yudao.module.system.mq.consumer.mail;
 
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
 import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage;
-import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage;
 import cn.iocoder.yudao.module.system.service.mail.MailSendService;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -16,12 +16,13 @@ import javax.annotation.Resource;
  */
 @Component
 @Slf4j
-public class MailSendConsumer extends AbstractStreamMessageListener<MailSendMessage> {
+public class MailSendConsumer {
 
     @Resource
     private MailSendService mailSendService;
 
-    @Override
+    @EventListener
+    @Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步
     public void onMessage(MailSendMessage message) {
         log.info("[onMessage][消息内容({})]", message);
         mailSendService.doSendMail(message);

+ 5 - 3
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsSendConsumer.java

@@ -2,8 +2,9 @@ package cn.iocoder.yudao.module.system.mq.consumer.sms;
 
 import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage;
 import cn.iocoder.yudao.module.system.service.sms.SmsSendService;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -15,12 +16,13 @@ import javax.annotation.Resource;
  */
 @Component
 @Slf4j
-public class SmsSendConsumer extends AbstractStreamMessageListener<SmsSendMessage> {
+public class SmsSendConsumer {
 
     @Resource
     private SmsSendService smsSendService;
 
-    @Override
+    @EventListener
+    @Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步
     public void onMessage(SmsSendMessage message) {
         log.info("[onMessage][消息内容({})]", message);
         smsSendService.doSendSms(message);

+ 1 - 9
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java

@@ -1,8 +1,6 @@
 package cn.iocoder.yudao.module.system.mq.message.mail;
 
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
@@ -13,8 +11,7 @@ import javax.validation.constraints.NotNull;
  * @author 芋道源码
  */
 @Data
-@EqualsAndHashCode(callSuper = true)
-public class MailSendMessage extends AbstractStreamMessage {
+public class MailSendMessage {
 
     /**
      * 邮件日志编号
@@ -47,9 +44,4 @@ public class MailSendMessage extends AbstractStreamMessage {
     @NotEmpty(message = "邮件内容不能为空")
     private String content;
 
-    @Override
-    public String getStreamKey() {
-        return "system.mail.send";
-    }
-
 }

+ 1 - 9
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsSendMessage.java

@@ -1,9 +1,7 @@
 package cn.iocoder.yudao.module.system.mq.message.sms;
 
 import cn.iocoder.yudao.framework.common.core.KeyValue;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
@@ -14,8 +12,7 @@ import java.util.List;
  * @author 芋道源码
  */
 @Data
-@EqualsAndHashCode(callSuper = true)
-public class SmsSendMessage extends AbstractStreamMessage {
+public class SmsSendMessage {
 
     /**
      * 短信日志编号
@@ -42,9 +39,4 @@ public class SmsSendMessage extends AbstractStreamMessage {
      */
     private List<KeyValue<String, Object>> templateParams;
 
-    @Override
-    public String getStreamKey() {
-        return "system.sms.send";
-    }
-
 }

+ 3 - 3
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/mail/MailProducer.java

@@ -1,8 +1,8 @@
 package cn.iocoder.yudao.module.system.mq.producer.mail;
 
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
 import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -18,7 +18,7 @@ import javax.annotation.Resource;
 public class MailProducer {
 
     @Resource
-    private RedisMQTemplate redisMQTemplate;
+    private ApplicationContext applicationContext;
 
     /**
      * 发送 {@link MailSendMessage} 消息
@@ -35,7 +35,7 @@ public class MailProducer {
         MailSendMessage message = new MailSendMessage()
                 .setLogId(sendLogId).setMail(mail).setAccountId(accountId)
                 .setNickname(nickname).setTitle(title).setContent(content);
-        redisMQTemplate.send(message);
+        applicationContext.publishEvent(message);
     }
 
 }

+ 3 - 3
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java

@@ -1,9 +1,9 @@
 package cn.iocoder.yudao.module.system.mq.producer.sms;
 
 import cn.iocoder.yudao.framework.common.core.KeyValue;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
 import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -20,7 +20,7 @@ import java.util.List;
 public class SmsProducer {
 
     @Resource
-    private RedisMQTemplate redisMQTemplate;
+    private ApplicationContext applicationContext;
 
     /**
      * 发送 {@link SmsSendMessage} 消息
@@ -35,7 +35,7 @@ public class SmsProducer {
                                    Long channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
         SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
         message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
-        redisMQTemplate.send(message);
+        applicationContext.publishEvent(message);
     }
 
 }

+ 17 - 0
yudao-server/src/main/resources/application-dev.yaml

@@ -94,6 +94,23 @@ spring:
     jdbc: # 使用 JDBC 的 JobStore 的时候,JDBC 的配置
       initialize-schema: NEVER # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。
 
+--- #################### 消息队列相关 ####################
+
+# rocketmq 配置项,对应 RocketMQProperties 配置类
+rocketmq:
+  name-server: 127.0.0.1:9876 # RocketMQ Namesrv
+
+spring:
+  # RabbitMQ 配置项,对应 RabbitProperties 配置类
+  rabbitmq:
+    host: 127.0.0.1 # RabbitMQ 服务的地址
+    port: 5672 # RabbitMQ 服务的端口
+    username: guest # RabbitMQ 服务的账号
+    password: guest # RabbitMQ 服务的密码
+  # Kafka 配置项,对应 KafkaProperties 配置类
+  kafka:
+    bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
+
 --- #################### 服务保障相关配置 ####################
 
 # Lock4j 配置项

+ 17 - 0
yudao-server/src/main/resources/application-local.yaml

@@ -109,6 +109,23 @@ spring:
     jdbc: # 使用 JDBC 的 JobStore 的时候,JDBC 的配置
       initialize-schema: NEVER # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。
 
+--- #################### 消息队列相关 ####################
+
+# rocketmq 配置项,对应 RocketMQProperties 配置类
+rocketmq:
+  name-server: 127.0.0.1:9876 # RocketMQ Namesrv
+
+spring:
+  # RabbitMQ 配置项,对应 RabbitProperties 配置类
+  rabbitmq:
+    host: 127.0.0.1 # RabbitMQ 服务的地址
+    port: 5672 # RabbitMQ 服务的端口
+    username: guest # RabbitMQ 服务的账号
+    password: guest # RabbitMQ 服务的密码
+  # Kafka 配置项,对应 KafkaProperties 配置类
+  kafka:
+    bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
+
 --- #################### 服务保障相关配置 ####################
 
 # Lock4j 配置项

+ 26 - 6
yudao-server/src/main/resources/application.yaml

@@ -106,6 +106,32 @@ aj:
     req-check-minute-limit: 60 # check 接口一分钟内请求数限制
     req-verify-minute-limit: 60 # verify 接口一分钟内请求数限制
 
+--- #################### 消息队列相关 ####################
+
+# rocketmq 配置项,对应 RocketMQProperties 配置类
+rocketmq:
+  # Producer 配置项
+  producer:
+    group: ${spring.application.name}_PRODUCER # 生产者分组
+
+spring:
+  # Kafka 配置项,对应 KafkaProperties 配置类
+  kafka:
+    # Kafka Producer 配置项
+    producer:
+      acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
+      retries: 3 # 发送失败时,重试发送的次数
+      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
+    # Kafka Consumer 配置项
+    consumer:
+      auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
+      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+      properties:
+        spring.json.trusted.packages: '*'
+    # Kafka Consumer Listener 监听器配置
+    listener:
+      missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
+
 --- #################### 芋道相关配置 ####################
 
 yudao:
@@ -145,12 +171,6 @@ yudao:
       - cn.iocoder.yudao.module.pay.enums.ErrorCodeConstants
       - cn.iocoder.yudao.module.system.enums.ErrorCodeConstants
       - cn.iocoder.yudao.module.mp.enums.ErrorCodeConstants
-  mq:
-    redis:
-      pubsub:
-        enable: false # 是否开启 Redis pubsub 广播消费,默认为 true。这里设置成 false,可以按需开启
-      stream:
-        enable: false  # 是否开启 Redis stream 集群消费,默认为 true。这里设置成 false,可以按需开启
   tenant: # 多租户相关配置项
     enable: true
     ignore-urls: