|
@@ -14,6 +14,8 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.redisson.api.RedissonClient;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
import org.springframework.data.redis.connection.RedisServerCommands;
|
|
|
import org.springframework.data.redis.connection.stream.Consumer;
|
|
@@ -57,6 +59,8 @@ public class YudaoMQAutoConfiguration {
|
|
|
* 创建 Redis Pub/Sub 广播消费的容器
|
|
|
*/
|
|
|
@Bean(initMethod = "start", destroyMethod = "stop")
|
|
|
+ @ConditionalOnBean(AbstractChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
|
|
+ @ConditionalOnProperty(prefix = "yudao.mq.redis.pubsub", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.pubsub.enable=false 禁用多租户
|
|
|
public RedisMessageListenerContainer redisMessageListenerContainer(
|
|
|
RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) {
|
|
|
// 创建 RedisMessageListenerContainer 对象
|
|
@@ -77,6 +81,8 @@ public class YudaoMQAutoConfiguration {
|
|
|
* 创建 Redis Stream 重新消费的任务
|
|
|
*/
|
|
|
@Bean
|
|
|
+ @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
|
|
+ @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户
|
|
|
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractStreamMessageListener<?>> listeners,
|
|
|
RedisMQTemplate redisTemplate,
|
|
|
@Value("${spring.application.name}") String groupName,
|
|
@@ -90,6 +96,8 @@ public class YudaoMQAutoConfiguration {
|
|
|
* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
|
|
|
*/
|
|
|
@Bean(initMethod = "start", destroyMethod = "stop")
|
|
|
+ @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
|
|
+ @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户
|
|
|
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
|
|
RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) {
|
|
|
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|