Ver Fonte

!749 增加 WebSocket 的全新支持
Merge pull request !749 from 芋道源码/feature/websocket

芋道源码 há 1 ano atrás
pai
commit
ae763f727f
59 ficheiros alterados com 1705 adições e 573 exclusões
  1. 6 0
      yudao-dependencies/pom.xml
  2. 3 16
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java
  3. 31 0
      yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQProducerAutoConfiguration.java
  4. 2 1
      yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
  5. 7 0
      yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/SecurityProperties.java
  6. 0 2
      yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java
  7. 6 2
      yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/filter/TokenAuthenticationFilter.java
  8. 16 8
      yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/util/SecurityFrameworkUtils.java
  9. 49 2
      yudao-framework/yudao-spring-boot-starter-websocket/pom.xml
  10. 0 14
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java
  11. 13 8
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java
  12. 152 9
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java
  13. 0 24
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java
  14. 0 9
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java
  15. 0 24
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java
  16. 0 36
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java
  17. 0 31
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java
  18. 0 49
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java
  19. 83 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java
  20. 31 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java
  21. 29 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java
  22. 42 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java
  23. 24 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java
  24. 104 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java
  25. 52 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java
  26. 35 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java
  27. 28 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java
  28. 67 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java
  29. 20 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java
  30. 37 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java
  31. 39 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java
  32. 62 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java
  33. 34 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java
  34. 23 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java
  35. 57 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java
  36. 35 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java
  37. 30 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java
  38. 61 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java
  39. 49 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java
  40. 53 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java
  41. 125 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java
  42. 67 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java
  43. 3 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java
  44. 1 0
      yudao-framework/yudao-spring-boot-starter-websocket/《芋道 Spring Boot WebSocket 入门》.md
  45. 54 0
      yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java
  46. 5 5
      yudao-module-infra/yudao-module-infra-biz/pom.xml
  47. 34 0
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java
  48. 48 0
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java
  49. 0 45
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java
  50. 0 16
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java
  51. 0 86
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java
  52. 0 178
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java
  53. 27 0
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java
  54. 24 0
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java
  55. 2 1
      yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.java
  56. 3 2
      yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/auth/AuthController.java
  57. 19 0
      yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java
  58. 2 2
      yudao-server/src/main/resources/application-local.yaml
  59. 11 3
      yudao-server/src/main/resources/application.yaml

+ 6 - 0
yudao-dependencies/pom.xml

@@ -175,6 +175,12 @@
                 <version>${revision}</version>
             </dependency>
 
+            <dependency>
+                <groupId>cn.iocoder.boot</groupId>
+                <artifactId>yudao-spring-boot-starter-websocket</artifactId>
+                <version>${revision}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.github.xiaoymin</groupId>
                 <artifactId>knife4j-openapi3-spring-boot-starter</artifactId>

+ 3 - 16
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java → yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java

@@ -5,7 +5,6 @@ import cn.hutool.core.util.StrUtil;
 import cn.hutool.system.SystemUtil;
 import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
 import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
 import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
 import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
 import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
@@ -23,7 +22,6 @@ import org.springframework.data.redis.connection.stream.ReadOffset;
 import org.springframework.data.redis.connection.stream.StreamOffset;
 import org.springframework.data.redis.core.RedisCallback;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.data.redis.listener.ChannelTopic;
 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
@@ -33,30 +31,19 @@ import java.util.List;
 import java.util.Properties;
 
 /**
- * 消息队列配置类
+ * Redis 消息队列 Consumer 配置类
  *
  * @author 芋道源码
  */
 @Slf4j
 @EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
-public class YudaoRedisMQAutoConfiguration {
-
-    @Bean
-    public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
-                                           List<RedisMessageInterceptor> interceptors) {
-        RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
-        // 添加拦截器
-        interceptors.forEach(redisMQTemplate::addInterceptor);
-        return redisMQTemplate;
-    }
-
-    // ========== 消费者相关 ==========
+public class YudaoRedisMQConsumerAutoConfiguration {
 
     /**
      * 创建 Redis Pub/Sub 广播消费的容器
      */
-    @Bean(initMethod = "start", destroyMethod = "stop")
+    @Bean
     @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
     public RedisMessageListenerContainer redisMessageListenerContainer(
             RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {

+ 31 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQProducerAutoConfiguration.java

@@ -0,0 +1,31 @@
+package cn.iocoder.yudao.framework.mq.redis.config;
+
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.redis.core.StringRedisTemplate;
+
+import java.util.List;
+
+/**
+ * Redis 消息队列 Producer 配置类
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
+public class YudaoRedisMQProducerAutoConfiguration {
+
+    @Bean
+    public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
+                                           List<RedisMessageInterceptor> interceptors) {
+        RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
+        // 添加拦截器
+        interceptors.forEach(redisMQTemplate::addInterceptor);
+        return redisMQTemplate;
+    }
+
+}

+ 2 - 1
yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

@@ -1,2 +1,3 @@
-cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration
+cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQProducerAutoConfiguration
+cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration
 cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration

+ 7 - 0
yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/SecurityProperties.java

@@ -19,6 +19,13 @@ public class SecurityProperties {
      */
     @NotEmpty(message = "Token Header 不能为空")
     private String tokenHeader = "Authorization";
+    /**
+     * HTTP 请求时,访问令牌的请求参数
+     *
+     * 初始目的:解决 WebSocket 无法通过 header 传参,只能通过 token 参数拼接
+     */
+    @NotEmpty(message = "Token Parameter 不能为空")
+    private String tokenParameter = "token";
 
     /**
      * mock 模式的开关

+ 0 - 2
yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java

@@ -129,8 +129,6 @@ public class YudaoWebSecurityConfigurerAdapter {
                 .antMatchers(buildAppApi("/**")).permitAll()
                 // 1.5 验证码captcha 允许匿名访问
                 .antMatchers("/captcha/get", "/captcha/check").permitAll()
-                // 1.6 webSocket 允许匿名访问
-                .antMatchers("/websocket/message").permitAll()
                 // ②:每个项目的自定义规则
                 .and().authorizeRequests(registry -> // 下面,循环设置自定义规则
                         authorizeRequestsCustomizers.forEach(customizer -> customizer.customize(registry)))

+ 6 - 2
yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/filter/TokenAuthenticationFilter.java

@@ -41,7 +41,8 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter {
     @SuppressWarnings("NullableProblems")
     protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
             throws ServletException, IOException {
-        String token = SecurityFrameworkUtils.obtainAuthorization(request, securityProperties.getTokenHeader());
+        String token = SecurityFrameworkUtils.obtainAuthorization(request,
+                securityProperties.getTokenHeader(), securityProperties.getTokenParameter());
         if (StrUtil.isNotEmpty(token)) {
             Integer userType = WebFrameworkUtils.getLoginUserType(request);
             try {
@@ -74,7 +75,10 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter {
                 return null;
             }
             // 用户类型不匹配,无权限
-            if (ObjectUtil.notEqual(accessToken.getUserType(), userType)) {
+            // 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比对用户类型
+            // TODO 芋艿:ws 要不要区分开?
+            if (userType != null
+                    && ObjectUtil.notEqual(accessToken.getUserType(), userType)) {
                 throw new AccessDeniedException("错误的用户类型");
             }
             // 构建登录用户

+ 16 - 8
yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/util/SecurityFrameworkUtils.java

@@ -1,5 +1,6 @@
 package cn.iocoder.yudao.framework.security.core.util;
 
+import cn.hutool.core.util.StrUtil;
 import cn.iocoder.yudao.framework.security.core.LoginUser;
 import cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils;
 import org.springframework.lang.Nullable;
@@ -20,6 +21,9 @@ import java.util.Collections;
  */
 public class SecurityFrameworkUtils {
 
+    /**
+     * HEADER 认证头 value 的前缀
+     */
     public static final String AUTHORIZATION_BEARER = "Bearer";
 
     private SecurityFrameworkUtils() {}
@@ -28,19 +32,23 @@ public class SecurityFrameworkUtils {
      * 从请求中,获得认证 Token
      *
      * @param request 请求
-     * @param header 认证 Token 对应的 Header 名字
+     * @param headerName 认证 Token 对应的 Header 名字
+     * @param parameterName 认证 Token 对应的 Parameter 名字
      * @return 认证 Token
      */
-    public static String obtainAuthorization(HttpServletRequest request, String header) {
-        String authorization = request.getHeader(header);
-        if (!StringUtils.hasText(authorization)) {
-            return null;
+    public static String obtainAuthorization(HttpServletRequest request,
+                                             String headerName, String parameterName) {
+        // 1. 获得 Token。优先级:Header > Parameter
+        String token = request.getHeader(headerName);
+        if (StrUtil.isEmpty(token)) {
+            token = request.getParameter(parameterName);
         }
-        int index = authorization.indexOf(AUTHORIZATION_BEARER + " ");
-        if (index == -1) { // 未找到
+        if (!StringUtils.hasText(token)) {
             return null;
         }
-        return authorization.substring(index + 7).trim();
+        // 2. 去除 Token 中带的 Bearer
+        int index = token.indexOf(AUTHORIZATION_BEARER + " ");
+        return index >= 0 ? token.substring(index + 7).trim() : token;
     }
 
     /**

+ 49 - 2
yudao-framework/yudao-spring-boot-starter-websocket/pom.xml

@@ -12,26 +12,73 @@
     <packaging>jar</packaging>
 
     <name>${project.artifactId}</name>
-    <description>WebSocket</description>
+    <description>WebSocket 框架,支持多节点的广播</description>
     <url>https://github.com/YunaiV/ruoyi-vue-pro</url>
 
 
     <dependencies>
-
         <dependency>
             <groupId>cn.iocoder.boot</groupId>
             <artifactId>yudao-common</artifactId>
         </dependency>
 
+        <!-- Web 相关 -->
         <dependency>
+            <!-- 为什么是 websocket 依赖 security 呢?而不是 security 拓展 websocket 呢?
+                 因为 websocket 和 LoginUser 当前登录的用户有一定的相关性,具体可见 WebSocketSessionManagerImpl 逻辑。
+                 如果让 security 拓展 websocket 的话,会导致 websocket 组件的封装很散,进而增大理解成本。
+            -->
             <groupId>cn.iocoder.boot</groupId>
             <artifactId>yudao-spring-boot-starter-security</artifactId>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-websocket</artifactId>
         </dependency>
+
+        <!-- Web 相关 -->
+        <dependency>
+            <!-- 为什么是 websocket 依赖 security 呢?而不是 security 拓展 websocket 呢?
+                 因为 websocket 和 LoginUser 当前登录的用户有一定的相关性,具体可见 WebSocketSessionManagerImpl 逻辑。
+                 如果让 security 拓展 websocket 的话,会导致 websocket 组件的封装很散,进而增大理解成本。
+            -->
+            <groupId>cn.iocoder.boot</groupId>
+            <artifactId>yudao-spring-boot-starter-security</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- 消息队列相关 -->
+        <dependency>
+            <groupId>cn.iocoder.boot</groupId>
+            <artifactId>yudao-spring-boot-starter-mq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.amqp</groupId>
+            <artifactId>spring-rabbit</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <!-- 业务组件 -->
+        <dependency>
+            <!-- 为什么要依赖 tenant 组件?
+                因为广播某个类型的用户时候,需要根据租户过滤下,避免广播到别的租户!
+            -->
+            <groupId>cn.iocoder.boot</groupId>
+            <artifactId>yudao-spring-boot-starter-biz-tenant</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
 </project>

+ 0 - 14
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java

@@ -1,14 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.config;
-
-import cn.iocoder.yudao.framework.websocket.core.UserHandshakeInterceptor;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.web.socket.server.HandshakeInterceptor;
-
-@EnableConfigurationProperties(WebSocketProperties.class)
-public class WebSocketHandlerConfig {
-    @Bean
-    public HandshakeInterceptor handshakeInterceptor() {
-        return new UserHandshakeInterceptor();
-    }
-}

+ 13 - 8
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java

@@ -4,6 +4,9 @@ import lombok.Data;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.validation.annotation.Validated;
 
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+
 /**
  * WebSocket 配置项
  *
@@ -15,15 +18,17 @@ import org.springframework.validation.annotation.Validated;
 public class WebSocketProperties {
 
     /**
-     * 路径
-     */
-    private String path = "";
-    /**
-     * 默认最多允许同时在线用户数
+     * WebSocket 的连接路径
      */
-    private int maxOnlineCount = 0;
+    @NotEmpty(message = "WebSocket 的连接路径不能为空")
+    private String path = "/ws";
+
     /**
-     * 是否保存session
+     * 消息发送器的类型
+     *
+     * 可选值:local、redis、rocketmq、kafka、rabbitmq
      */
-    private boolean sessionMap = true;
+    @NotNull(message = "WebSocket 的消息发送者不能为空")
+    private String senderType = "local";
+
 }

+ 152 - 9
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java

@@ -1,11 +1,34 @@
 package cn.iocoder.yudao.framework.websocket.config;
 
+import cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler;
+import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
+import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor;
+import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageConsumer;
+import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.sender.local.LocalWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer;
+import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageConsumer;
+import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer;
+import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
 import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
 import org.springframework.web.socket.server.HandshakeInterceptor;
 
@@ -16,19 +39,139 @@ import java.util.List;
  *
  * @author xingyu4j
  */
-@AutoConfiguration
-// 允许使用 yudao.websocket.enable=false 禁用websocket
-@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true)
+@AutoConfiguration(before = YudaoRedisMQConsumerAutoConfiguration.class) // before YudaoRedisMQConsumerAutoConfiguration 的原因是,需要保证 RedisWebSocketMessageConsumer 先创建,才能创建 RedisMessageListenerContainer
+@EnableWebSocket // 开启 websocket
+@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket
 @EnableConfigurationProperties(WebSocketProperties.class)
 public class YudaoWebSocketAutoConfiguration {
+
     @Bean
-    @ConditionalOnMissingBean
-    public WebSocketConfigurer webSocketConfigurer(List<HandshakeInterceptor> handshakeInterceptor,
+    public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors,
                                                    WebSocketHandler webSocketHandler,
                                                    WebSocketProperties webSocketProperties) {
-
         return registry -> registry
+                // 添加 WebSocketHandler
                 .addHandler(webSocketHandler, webSocketProperties.getPath())
-                .addInterceptors(handshakeInterceptor.toArray(new HandshakeInterceptor[0]));
+                .addInterceptors(handshakeInterceptors)
+                // 允许跨域,否则前端连接会直接断开
+                .setAllowedOriginPatterns("*");
+    }
+
+    @Bean
+    public HandshakeInterceptor handshakeInterceptor() {
+        return new LoginUserHandshakeInterceptor();
+    }
+
+    @Bean
+    public WebSocketHandler webSocketHandler(WebSocketSessionManager sessionManager,
+                                             List<? extends WebSocketMessageListener<?>> messageListeners) {
+        // 1. 创建 JsonWebSocketMessageHandler 对象,处理消息
+        JsonWebSocketMessageHandler messageHandler = new JsonWebSocketMessageHandler(messageListeners);
+        // 2. 创建 WebSocketSessionHandlerDecorator 对象,处理连接
+        return new WebSocketSessionHandlerDecorator(messageHandler, sessionManager);
+    }
+
+    @Bean
+    public WebSocketSessionManager webSocketSessionManager() {
+        return new WebSocketSessionManagerImpl();
+    }
+
+    // ==================== Sender 相关 ====================
+
+    @Configuration
+    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true)
+    public class LocalWebSocketMessageSenderConfiguration {
+
+        @Bean
+        public LocalWebSocketMessageSender localWebSocketMessageSender(WebSocketSessionManager sessionManager) {
+            return new LocalWebSocketMessageSender(sessionManager);
+        }
+
+    }
+
+    @Configuration
+    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "redis", matchIfMissing = true)
+    public class RedisWebSocketMessageSenderConfiguration {
+
+        @Bean
+        public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager,
+                                                                       RedisMQTemplate redisMQTemplate) {
+            return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate);
+        }
+
+        @Bean
+        public RedisWebSocketMessageConsumer redisWebSocketMessageConsumer(
+                RedisWebSocketMessageSender redisWebSocketMessageSender) {
+            return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender);
+        }
+
+    }
+
+    @Configuration
+    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rocketmq", matchIfMissing = true)
+    public class RocketMQWebSocketMessageSenderConfiguration {
+
+        @Bean
+        public RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender(
+                WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate,
+                @Value("${yudao.websocket.sender-rocketmq.topic}") String topic) {
+            return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic);
+        }
+
+        @Bean
+        public RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer(
+                RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) {
+            return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender);
+        }
+
     }
-}
+
+    @Configuration
+    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rabbitmq", matchIfMissing = true)
+    public class RabbitMQWebSocketMessageSenderConfiguration {
+
+        @Bean
+        public RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender(
+                WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate,
+                TopicExchange websocketTopicExchange) {
+            return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange);
+        }
+
+        @Bean
+        public RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer(
+                RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) {
+            return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender);
+        }
+
+        /**
+         * 创建 Topic Exchange
+         */
+        @Bean
+        public TopicExchange websocketTopicExchange(@Value("${yudao.websocket.sender-rabbitmq.exchange}") String exchange) {
+            return new TopicExchange(exchange,
+                    true,  // durable: 是否持久化
+                    false);  // exclusive: 是否排它
+        }
+
+    }
+
+    @Configuration
+    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "kafka", matchIfMissing = true)
+    public class KafkaWebSocketMessageSenderConfiguration {
+
+        @Bean
+        public KafkaWebSocketMessageSender kafkaWebSocketMessageSender(
+                WebSocketSessionManager sessionManager, KafkaTemplate<Object, Object> kafkaTemplate,
+                @Value("${yudao.websocket.sender-kafka.topic}") String topic) {
+            return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic);
+        }
+
+        @Bean
+        public KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer(
+                KafkaWebSocketMessageSender kafkaWebSocketMessageSender) {
+            return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender);
+        }
+
+    }
+
+}

+ 0 - 24
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java

@@ -1,24 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import cn.iocoder.yudao.framework.security.core.LoginUser;
-import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
-import org.springframework.http.server.ServerHttpRequest;
-import org.springframework.http.server.ServerHttpResponse;
-import org.springframework.web.socket.WebSocketHandler;
-import org.springframework.web.socket.server.HandshakeInterceptor;
-
-import java.util.Map;
-
-public class UserHandshakeInterceptor implements HandshakeInterceptor {
-    @Override
-    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
-        LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();
-        attributes.put(WebSocketKeyDefine.LOGIN_USER, loginUser);
-        return true;
-    }
-
-    @Override
-    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
-
-    }
-}

+ 0 - 9
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java

@@ -1,9 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-
-import lombok.Data;
-
-@Data
-public class WebSocketKeyDefine {
-    public static final String LOGIN_USER ="LOGIN_USER";
-}

+ 0 - 24
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java

@@ -1,24 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.List;
-
-@Data
-@Accessors(chain = true)
-public class WebSocketMessageDO {
-    /**
-     * 接收消息的seesion
-     */
-    private List<Object> seesionKeyList;
-    /**
-     * 发送消息
-     */
-    private String msgText;
-
-    public static WebSocketMessageDO build(List<Object> seesionKeyList, String msgText) {
-        return new WebSocketMessageDO().setMsgText(msgText).setSeesionKeyList(seesionKeyList);
-    }
-
-}

+ 0 - 36
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java

@@ -1,36 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import org.springframework.web.socket.WebSocketSession;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-public final class WebSocketSessionHandler {
-    private WebSocketSessionHandler() {
-    }
-
-    private static final Map<String, WebSocketSession> SESSION_MAP = new ConcurrentHashMap<>();
-
-    public static void addSession(Object sessionKey, WebSocketSession session) {
-        SESSION_MAP.put(sessionKey.toString(), session);
-    }
-
-    public static void removeSession(Object sessionKey) {
-        SESSION_MAP.remove(sessionKey.toString());
-    }
-
-    public static WebSocketSession getSession(Object sessionKey) {
-        return SESSION_MAP.get(sessionKey.toString());
-    }
-
-    public static Collection<WebSocketSession> getSessions() {
-        return SESSION_MAP.values();
-    }
-
-    public static Set<String> getSessionKeys() {
-        return SESSION_MAP.keySet();
-    }
-
-}

+ 0 - 31
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java

@@ -1,31 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.web.socket.TextMessage;
-import org.springframework.web.socket.WebSocketSession;
-
-import java.io.IOException;
-
-@Slf4j
-public class WebSocketUtils {
-    public static boolean sendMessage(WebSocketSession seesion, String message) {
-        if (seesion == null) {
-            log.error("seesion 不存在");
-            return false;
-        }
-        if (seesion.isOpen()) {
-            try {
-                seesion.sendMessage(new TextMessage(message));
-            } catch (IOException e) {
-                log.error("WebSocket 消息发送异常 Session={} | msg= {} | exception={}", seesion, message, e);
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public static boolean sendMessage(Object sessionKey, String message) {
-        WebSocketSession session = WebSocketSessionHandler.getSession(sessionKey);
-        return sendMessage(session, message);
-    }
-}

+ 0 - 49
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java

@@ -1,49 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import cn.iocoder.yudao.framework.security.core.LoginUser;
-import org.springframework.web.socket.CloseStatus;
-import org.springframework.web.socket.WebSocketHandler;
-import org.springframework.web.socket.WebSocketSession;
-import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
-
-public class YudaoWebSocketHandlerDecorator extends WebSocketHandlerDecorator {
-    public YudaoWebSocketHandlerDecorator(WebSocketHandler delegate) {
-        super(delegate);
-    }
-
-    /**
-     * websocket 连接时执行的动作
-     * @param session websocket session 对象
-     * @throws Exception 异常对象
-     */
-    @Override
-    public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
-        Object sessionKey = sessionKeyGen(session);
-        WebSocketSessionHandler.addSession(sessionKey, session);
-    }
-
-    /**
-     * websocket 关闭连接时执行的动作
-     * @param session websocket session 对象
-     * @param closeStatus 关闭状态对象
-     * @throws Exception 异常对象
-     */
-    @Override
-    public void afterConnectionClosed(final WebSocketSession session, CloseStatus closeStatus) throws Exception {
-        Object sessionKey = sessionKeyGen(session);
-        WebSocketSessionHandler.removeSession(sessionKey);
-    }
-
-    public Object sessionKeyGen(WebSocketSession webSocketSession) {
-
-        Object obj = webSocketSession.getAttributes().get(WebSocketKeyDefine.LOGIN_USER);
-
-        if (obj instanceof LoginUser) {
-            LoginUser loginUser = (LoginUser) obj;
-            // userId 作为唯一区分
-            return String.valueOf(loginUser.getId());
-        }
-
-        return null;
-    }
-}

+ 83 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java

@@ -0,0 +1,83 @@
+package cn.iocoder.yudao.framework.websocket.core.handler;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.util.TypeUtil;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
+import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
+import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage;
+import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+/**
+ * JSON 格式 {@link WebSocketHandler} 实现类
+ *
+ * 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class JsonWebSocketMessageHandler extends TextWebSocketHandler {
+
+    /**
+     * type 与 WebSocketMessageListener 的映射
+     */
+    private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {
+        listenersList.forEach((Consumer<WebSocketMessageListener>)
+                listener -> listeners.put(listener.getType(), listener));
+    }
+
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        // 1.1 空消息,跳过
+        if (message.getPayloadLength() == 0) {
+            return;
+        }
+        // 1.2 ping 心跳消息,直接返回 pong 消息。
+        if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {
+            session.sendMessage(new TextMessage("pong"));
+            return;
+        }
+
+        // 2.1 解析消息
+        try {
+            JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);
+            if (jsonMessage == null) {
+                log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());
+                return;
+            }
+            if (StrUtil.isEmpty(jsonMessage.getType())) {
+                log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());
+                return;
+            }
+            // 2.2 获得对应的 WebSocketMessageListener
+            WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());
+            if (messageListener == null) {
+                log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());
+                return;
+            }
+            // 2.3 处理消息
+            Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);
+            Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);
+            Long tenantId = WebSocketFrameworkUtils.getTenantId(session);
+            TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));
+        } catch (Throwable ex) {
+            log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());
+        }
+    }
+
+}

+ 31 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java

@@ -0,0 +1,31 @@
+package cn.iocoder.yudao.framework.websocket.core.listener;
+
+import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+/**
+ * WebSocket 消息监听器接口
+ *
+ * 目的:前端发送消息给后端后,处理对应 {@link #getType()} 类型的消息
+ *
+ * @param <T> 泛型,消息类型
+ */
+public interface WebSocketMessageListener<T> {
+
+    /**
+     * 处理消息
+     *
+     * @param session Session
+     * @param message 消息
+     */
+    void onMessage(WebSocketSession session, T message);
+
+    /**
+     * 获得消息类型
+     *
+     * @see JsonWebSocketMessage#getType()
+     * @return 消息类型
+     */
+    String getType();
+
+}

+ 29 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java

@@ -0,0 +1,29 @@
+package cn.iocoder.yudao.framework.websocket.core.message;
+
+import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * JSON 格式的 WebSocket 消息帧
+ *
+ * @author 芋道源码
+ */
+@Data
+public class JsonWebSocketMessage implements Serializable {
+
+    /**
+     * 消息类型
+     *
+     * 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类
+     */
+    private String type;
+    /**
+     * 消息内容
+     *
+     * 要求 JSON 对象
+     */
+    private String content;
+
+}

+ 42 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java

@@ -0,0 +1,42 @@
+package cn.iocoder.yudao.framework.websocket.core.security;
+
+import cn.iocoder.yudao.framework.security.core.LoginUser;
+import cn.iocoder.yudao.framework.security.core.filter.TokenAuthenticationFilter;
+import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
+import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+import java.util.Map;
+
+/**
+ * 登录用户的 {@link HandshakeInterceptor} 实现类
+ *
+ * 流程如下:
+ * 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过
+ * 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中
+ *
+ * @author 芋道源码
+ */
+public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {
+
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
+                                   WebSocketHandler wsHandler, Map<String, Object> attributes) {
+        LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();
+        if (loginUser != null) {
+            WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);
+        }
+        return true;
+    }
+
+    @Override
+    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
+                               WebSocketHandler wsHandler, Exception exception) {
+        // do nothing
+    }
+
+}

+ 24 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java

@@ -0,0 +1,24 @@
+package cn.iocoder.yudao.framework.websocket.core.security;
+
+import cn.iocoder.yudao.framework.security.config.AuthorizeRequestsCustomizer;
+import cn.iocoder.yudao.framework.websocket.config.WebSocketProperties;
+import lombok.RequiredArgsConstructor;
+import org.springframework.security.config.annotation.web.builders.HttpSecurity;
+import org.springframework.security.config.annotation.web.configurers.ExpressionUrlAuthorizationConfigurer;
+
+/**
+ * WebSocket 的权限自定义
+ *
+ * @author 芋道源码
+ */
+@RequiredArgsConstructor
+public class WebSocketAuthorizeRequestsCustomizer extends AuthorizeRequestsCustomizer {
+
+    private final WebSocketProperties webSocketProperties;
+
+    @Override
+    public void customize(ExpressionUrlAuthorizationConfigurer<HttpSecurity>.ExpressionInterceptUrlRegistry registry) {
+        registry.antMatchers(webSocketProperties.getPath()).permitAll();
+    }
+
+}

+ 104 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java

@@ -0,0 +1,104 @@
+package cn.iocoder.yudao.framework.websocket.core.sender;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * WebSocketMessageSender 实现类
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+@RequiredArgsConstructor
+public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender {
+
+    private final WebSocketSessionManager sessionManager;
+
+    @Override
+    public void send(Integer userType, Long userId, String messageType, String messageContent) {
+        send(null, userType, userId, messageType, messageContent);
+    }
+
+    @Override
+    public void send(Integer userType, String messageType, String messageContent) {
+        send(null, userType, null, messageType, messageContent);
+    }
+
+    @Override
+    public void send(String sessionId, String messageType, String messageContent) {
+        send(sessionId, null, null, messageType, messageContent);
+    }
+
+    /**
+     * 发送消息
+     *
+     * @param sessionId Session 编号
+     * @param userType 用户类型
+     * @param userId 用户编号
+     * @param messageType 消息类型
+     * @param messageContent 消息内容
+     */
+    public void send(String sessionId, Integer userType, Long userId, String messageType, String messageContent) {
+        // 1. 获得 Session 列表
+        List<WebSocketSession> sessions = Collections.emptyList();
+        if (StrUtil.isNotEmpty(sessionId)) {
+            WebSocketSession session = sessionManager.getSession(sessionId);
+            if (session != null) {
+                sessions = Collections.singletonList(session);
+            }
+        } else if (userType != null && userId != null) {
+            sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType, userId);
+        } else if (userType != null) {
+            sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType);
+        }
+        if (CollUtil.isEmpty(sessions)) {
+            log.info("[send][sessionId({}) userType({}) userId({}) messageType({}) messageContent({}) 未匹配到会话]",
+                    sessionId, userType, userId, messageType, messageContent);
+        }
+        // 2. 执行发送
+        doSend(sessions, messageType, messageContent);
+    }
+
+    /**
+     * 发送消息的具体实现
+     *
+     * @param sessions Session 列表
+     * @param messageType 消息类型
+     * @param messageContent 消息内容
+     */
+    public void doSend(Collection<WebSocketSession> sessions, String messageType, String messageContent) {
+        JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent);
+        String payload = JsonUtils.toJsonString(message); // 关键,使用 JSON 序列化
+        sessions.forEach(session -> {
+            // 1. 各种校验,保证 Session 可以被发送
+            if (session == null) {
+                log.error("[doSend][session 为空, message({})]", message);
+                return;
+            }
+            if (!session.isOpen()) {
+                log.error("[doSend][session({}) 已关闭, message({})]", session.getId(), message);
+                return;
+            }
+            // 2. 执行发送
+            try {
+                session.sendMessage(new TextMessage(payload));
+                log.info("[doSend][session({}) 发送消息成功,message({})]", session.getId(), message);
+            } catch (IOException ex) {
+                log.error("[doSend][session({}) 发送消息失败,message({})]", session.getId(), message, ex);
+            }
+        });
+    }
+
+}

+ 52 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java

@@ -0,0 +1,52 @@
+package cn.iocoder.yudao.framework.websocket.core.sender;
+
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+
+/**
+ * WebSocket 消息的发送器接口
+ *
+ * @author 芋道源码
+ */
+public interface WebSocketMessageSender {
+
+    /**
+     * 发送消息给指定用户
+     *
+     * @param userType 用户类型
+     * @param userId 用户编号
+     * @param messageType 消息类型
+     * @param messageContent 消息内容,JSON 格式
+     */
+    void send(Integer userType, Long userId, String messageType, String messageContent);
+
+    /**
+     * 发送消息给指定用户类型
+     *
+     * @param userType 用户类型
+     * @param messageType 消息类型
+     * @param messageContent 消息内容,JSON 格式
+     */
+    void send(Integer userType, String messageType, String messageContent);
+
+    /**
+     * 发送消息给指定 Session
+     *
+     * @param sessionId Session 编号
+     * @param messageType 消息类型
+     * @param messageContent 消息内容,JSON 格式
+     */
+    void send(String sessionId, String messageType, String messageContent);
+
+    default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
+        send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
+    }
+
+    default void sendObject(Integer userType, String messageType, Object messageContent) {
+        send(userType, messageType, JsonUtils.toJsonString(messageContent));
+    }
+
+    default void sendObject(String sessionId, String messageType, Object messageContent) {
+        send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
+    }
+
+}

+ 35 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java

@@ -0,0 +1,35 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.kafka;
+
+import lombok.Data;
+
+/**
+ * Kafka 广播 WebSocket 的消息
+ *
+ * @author 芋道源码
+ */
+@Data
+public class KafkaWebSocketMessage {
+
+    /**
+     * Session 编号
+     */
+    private String sessionId;
+    /**
+     * 用户类型
+     */
+    private Integer userType;
+    /**
+     * 用户编号
+     */
+    private Long userId;
+
+    /**
+     * 消息类型
+     */
+    private String messageType;
+    /**
+     * 消息内容
+     */
+    private String messageContent;
+
+}

+ 28 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java

@@ -0,0 +1,28 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.kafka.annotation.KafkaListener;
+
+/**
+ * {@link KafkaWebSocketMessage} 广播消息的消费者,真正把消息发送出去
+ *
+ * @author 芋道源码
+ */
+@RequiredArgsConstructor
+public class KafkaWebSocketMessageConsumer {
+
+    private final KafkaWebSocketMessageSender rabbitMQWebSocketMessageSender;
+
+    @RabbitHandler
+    @KafkaListener(
+            topics = "${yudao.websocket.sender-kafka.topic}",
+            // 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的
+            groupId = "${yudao.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}")
+    public void onMessage(KafkaWebSocketMessage message) {
+        rabbitMQWebSocketMessageSender.send(message.getSessionId(),
+                message.getUserType(), message.getUserId(),
+                message.getMessageType(), message.getMessageContent());
+    }
+
+}

+ 67 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java

@@ -0,0 +1,67 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.kafka;
+
+import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.core.KafkaTemplate;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * 基于 Kafka 的 {@link WebSocketMessageSender} 实现类
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender {
+
+    private final KafkaTemplate<Object, Object> kafkaTemplate;
+
+    private final String topic;
+
+    public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager,
+                                       KafkaTemplate<Object, Object> kafkaTemplate,
+                                       String topic) {
+        super(sessionManager);
+        this.kafkaTemplate = kafkaTemplate;
+        this.topic = topic;
+    }
+
+    @Override
+    public void send(Integer userType, Long userId, String messageType, String messageContent) {
+        sendKafkaMessage(null, userId, userType, messageType, messageContent);
+    }
+
+    @Override
+    public void send(Integer userType, String messageType, String messageContent) {
+        sendKafkaMessage(null, null, userType, messageType, messageContent);
+    }
+
+    @Override
+    public void send(String sessionId, String messageType, String messageContent) {
+        sendKafkaMessage(sessionId, null, null, messageType, messageContent);
+    }
+
+    /**
+     * 通过 Kafka 广播消息
+     *
+     * @param sessionId Session 编号
+     * @param userId 用户编号
+     * @param userType 用户类型
+     * @param messageType 消息类型
+     * @param messageContent 消息内容
+     */
+    private void sendKafkaMessage(String sessionId, Long userId, Integer userType,
+                                  String messageType, String messageContent) {
+        KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage()
+                .setSessionId(sessionId).setUserId(userId).setUserType(userType)
+                .setMessageType(messageType).setMessageContent(messageContent);
+        try {
+            kafkaTemplate.send(topic, mqMessage).get();
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e);
+        }
+    }
+
+}

+ 20 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java

@@ -0,0 +1,20 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.local;
+
+import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
+
+/**
+ * 本地的 {@link WebSocketMessageSender} 实现类
+ *
+ * 注意:仅仅适合单机场景!!!
+ *
+ * @author 芋道源码
+ */
+public class LocalWebSocketMessageSender extends AbstractWebSocketMessageSender {
+
+    public LocalWebSocketMessageSender(WebSocketSessionManager sessionManager) {
+        super(sessionManager);
+    }
+
+}

+ 37 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java

@@ -0,0 +1,37 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * RabbitMQ 广播 WebSocket 的消息
+ *
+ * @author 芋道源码
+ */
+@Data
+public class RabbitMQWebSocketMessage implements Serializable {
+
+    /**
+     * Session 编号
+     */
+    private String sessionId;
+    /**
+     * 用户类型
+     */
+    private Integer userType;
+    /**
+     * 用户编号
+     */
+    private Long userId;
+
+    /**
+     * 消息类型
+     */
+    private String messageType;
+    /**
+     * 消息内容
+     */
+    private String messageContent;
+
+}

+ 39 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java

@@ -0,0 +1,39 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.amqp.core.ExchangeTypes;
+import org.springframework.amqp.rabbit.annotation.*;
+
+/**
+ * {@link RabbitMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去
+ *
+ * @author 芋道源码
+ */
+@RabbitListener(
+        bindings = @QueueBinding(
+                value = @Queue(
+                        // 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的
+                        name = "${yudao.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}",
+                        // Consumer 关闭时,该队列就可以被自动删除了
+                        autoDelete = "true"
+                ),
+                exchange = @Exchange(
+                        name = "${yudao.websocket.sender-rabbitmq.exchange}",
+                        type = ExchangeTypes.TOPIC,
+                        declare = "false"
+                )
+        )
+)
+@RequiredArgsConstructor
+public class RabbitMQWebSocketMessageConsumer {
+
+    private final RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender;
+
+    @RabbitHandler
+    public void onMessage(RabbitMQWebSocketMessage message) {
+        rabbitMQWebSocketMessageSender.send(message.getSessionId(),
+                message.getUserType(), message.getUserId(),
+                message.getMessageType(), message.getMessageContent());
+    }
+
+}

+ 62 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java

@@ -0,0 +1,62 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq;
+
+import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+
+/**
+ * 基于 RabbitMQ 的 {@link WebSocketMessageSender} 实现类
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender {
+
+    private final RabbitTemplate rabbitTemplate;
+
+    private final TopicExchange topicExchange;
+
+    public RabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
+                                          RabbitTemplate rabbitTemplate,
+                                          TopicExchange topicExchange) {
+        super(sessionManager);
+        this.rabbitTemplate = rabbitTemplate;
+        this.topicExchange = topicExchange;
+    }
+
+    @Override
+    public void send(Integer userType, Long userId, String messageType, String messageContent) {
+        sendRabbitMQMessage(null, userId, userType, messageType, messageContent);
+    }
+
+    @Override
+    public void send(Integer userType, String messageType, String messageContent) {
+        sendRabbitMQMessage(null, null, userType, messageType, messageContent);
+    }
+
+    @Override
+    public void send(String sessionId, String messageType, String messageContent) {
+        sendRabbitMQMessage(sessionId, null, null, messageType, messageContent);
+    }
+
+    /**
+     * 通过 RabbitMQ 广播消息
+     *
+     * @param sessionId Session 编号
+     * @param userId 用户编号
+     * @param userType 用户类型
+     * @param messageType 消息类型
+     * @param messageContent 消息内容
+     */
+    private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType,
+                                     String messageType, String messageContent) {
+        RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage()
+                .setSessionId(sessionId).setUserId(userId).setUserType(userType)
+                .setMessageType(messageType).setMessageContent(messageContent);
+        rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage);
+    }
+
+}

+ 34 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java

@@ -0,0 +1,34 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.redis;
+
+import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
+import lombok.Data;
+
+/**
+ * Redis 广播 WebSocket 的消息
+ */
+@Data
+public class RedisWebSocketMessage extends AbstractRedisChannelMessage {
+
+    /**
+     * Session 编号
+     */
+    private String sessionId;
+    /**
+     * 用户类型
+     */
+    private Integer userType;
+    /**
+     * 用户编号
+     */
+    private Long userId;
+
+    /**
+     * 消息类型
+     */
+    private String messageType;
+    /**
+     * 消息内容
+     */
+    private String messageContent;
+
+}

+ 23 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java

@@ -0,0 +1,23 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.redis;
+
+import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * {@link RedisWebSocketMessage} 广播消息的消费者,真正把消息发送出去
+ *
+ * @author 芋道源码
+ */
+@RequiredArgsConstructor
+public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener<RedisWebSocketMessage> {
+
+    private final RedisWebSocketMessageSender redisWebSocketMessageSender;
+
+    @Override
+    public void onMessage(RedisWebSocketMessage message) {
+        redisWebSocketMessageSender.send(message.getSessionId(),
+                message.getUserType(), message.getUserId(),
+                message.getMessageType(), message.getMessageContent());
+    }
+
+}

+ 57 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java

@@ -0,0 +1,57 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.redis;
+
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 基于 Redis 的 {@link WebSocketMessageSender} 实现类
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender {
+
+    private final RedisMQTemplate redisMQTemplate;
+
+    public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager,
+                                       RedisMQTemplate redisMQTemplate) {
+        super(sessionManager);
+        this.redisMQTemplate = redisMQTemplate;
+    }
+
+    @Override
+    public void send(Integer userType, Long userId, String messageType, String messageContent) {
+        sendRedisMessage(null, userId, userType, messageType, messageContent);
+    }
+
+    @Override
+    public void send(Integer userType, String messageType, String messageContent) {
+        sendRedisMessage(null, null, userType, messageType, messageContent);
+    }
+
+    @Override
+    public void send(String sessionId, String messageType, String messageContent) {
+        sendRedisMessage(sessionId, null, null, messageType, messageContent);
+    }
+
+    /**
+     * 通过 Redis 广播消息
+     *
+     * @param sessionId Session 编号
+     * @param userId 用户编号
+     * @param userType 用户类型
+     * @param messageType 消息类型
+     * @param messageContent 消息内容
+     */
+    private void sendRedisMessage(String sessionId, Long userId, Integer userType,
+                                  String messageType, String messageContent) {
+        RedisWebSocketMessage mqMessage = new RedisWebSocketMessage()
+                .setSessionId(sessionId).setUserId(userId).setUserType(userType)
+                .setMessageType(messageType).setMessageContent(messageContent);
+        redisMQTemplate.send(mqMessage);
+    }
+
+}

+ 35 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java

@@ -0,0 +1,35 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq;
+
+import lombok.Data;
+
+/**
+ * RocketMQ 广播 WebSocket 的消息
+ *
+ * @author 芋道源码
+ */
+@Data
+public class RocketMQWebSocketMessage {
+
+    /**
+     * Session 编号
+     */
+    private String sessionId;
+    /**
+     * 用户类型
+     */
+    private Integer userType;
+    /**
+     * 用户编号
+     */
+    private Long userId;
+
+    /**
+     * 消息类型
+     */
+    private String messageType;
+    /**
+     * 消息内容
+     */
+    private String messageContent;
+
+}

+ 30 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java

@@ -0,0 +1,30 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+
+/**
+ * {@link RocketMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去
+ *
+ * @author 芋道源码
+ */
+@RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic
+        topic = "${yudao.websocket.sender-rocketmq.topic}",
+        consumerGroup = "${yudao.websocket.sender-rocketmq.consumer-group}",
+        messageModel = MessageModel.BROADCASTING // 设置为广播模式,保证每个实例都能收到消息
+)
+@RequiredArgsConstructor
+public class RocketMQWebSocketMessageConsumer implements RocketMQListener<RocketMQWebSocketMessage> {
+
+    private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender;
+
+    @Override
+    public void onMessage(RocketMQWebSocketMessage message) {
+        rocketMQWebSocketMessageSender.send(message.getSessionId(),
+                message.getUserType(), message.getUserId(),
+                message.getMessageType(), message.getMessageContent());
+    }
+
+}

+ 61 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java

@@ -0,0 +1,61 @@
+package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq;
+
+import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+
+/**
+ * 基于 RocketMQ 的 {@link WebSocketMessageSender} 实现类
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender {
+
+    private final RocketMQTemplate rocketMQTemplate;
+
+    private final String topic;
+
+    public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
+                                          RocketMQTemplate rocketMQTemplate,
+                                          String topic) {
+        super(sessionManager);
+        this.rocketMQTemplate = rocketMQTemplate;
+        this.topic = topic;
+    }
+
+    @Override
+    public void send(Integer userType, Long userId, String messageType, String messageContent) {
+        sendRocketMQMessage(null, userId, userType, messageType, messageContent);
+    }
+
+    @Override
+    public void send(Integer userType, String messageType, String messageContent) {
+        sendRocketMQMessage(null, null, userType, messageType, messageContent);
+    }
+
+    @Override
+    public void send(String sessionId, String messageType, String messageContent) {
+        sendRocketMQMessage(sessionId, null, null, messageType, messageContent);
+    }
+
+    /**
+     * 通过 RocketMQ 广播消息
+     *
+     * @param sessionId Session 编号
+     * @param userId 用户编号
+     * @param userType 用户类型
+     * @param messageType 消息类型
+     * @param messageContent 消息内容
+     */
+    private void sendRocketMQMessage(String sessionId, Long userId, Integer userType,
+                                     String messageType, String messageContent) {
+        RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage()
+                .setSessionId(sessionId).setUserId(userId).setUserType(userType)
+                .setMessageType(messageType).setMessageContent(messageContent);
+        rocketMQTemplate.syncSend(topic, mqMessage);
+    }
+
+}

+ 49 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java

@@ -0,0 +1,49 @@
+package cn.iocoder.yudao.framework.websocket.core.session;
+
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
+import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
+
+/**
+ * {@link WebSocketHandler} 的装饰类,实现了以下功能:
+ *
+ * 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理
+ * 2. 封装 {@link WebSocketSession} 支持并发操作
+ *
+ * @author 芋道源码
+ */
+public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator {
+
+    /**
+     * 发送时间的限制,单位:毫秒
+     */
+    private static final Integer SEND_TIME_LIMIT = 1000 * 5;
+    /**
+     * 发送消息缓冲上线,单位:bytes
+     */
+    private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100;
+
+    private final WebSocketSessionManager sessionManager;
+
+    public WebSocketSessionHandlerDecorator(WebSocketHandler delegate,
+                                            WebSocketSessionManager sessionManager) {
+        super(delegate);
+        this.sessionManager = sessionManager;
+    }
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) {
+        // 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149
+        session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT);
+        // 添加到 WebSocketSessionManager 中
+        sessionManager.addSession(session);
+    }
+
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
+        sessionManager.removeSession(session);
+    }
+
+}

+ 53 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java

@@ -0,0 +1,53 @@
+package cn.iocoder.yudao.framework.websocket.core.session;
+
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.Collection;
+
+/**
+ * {@link WebSocketSession} 管理器的接口
+ *
+ * @author 芋道源码
+ */
+public interface WebSocketSessionManager {
+
+    /**
+     * 添加 Session
+     *
+     * @param session Session
+     */
+    void addSession(WebSocketSession session);
+
+    /**
+     * 移除 Session
+     *
+     * @param session Session
+     */
+    void removeSession(WebSocketSession session);
+
+    /**
+     * 获得指定编号的 Session
+     *
+     * @param id Session 编号
+     * @return Session
+     */
+    WebSocketSession getSession(String id);
+
+    /**
+     * 获得指定用户类型的 Session 列表
+     *
+     * @param userType 用户类型
+     * @return Session 列表
+     */
+    Collection<WebSocketSession> getSessionList(Integer userType);
+
+    /**
+     * 获得指定用户编号的 Session 列表
+     *
+     * @param userType 用户类型
+     * @param userId 用户编号
+     * @return Session 列表
+     */
+    Collection<WebSocketSession> getSessionList(Integer userType, Long userId);
+
+}

+ 125 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java

@@ -0,0 +1,125 @@
+package cn.iocoder.yudao.framework.websocket.core.session;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.iocoder.yudao.framework.security.core.LoginUser;
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * 默认的 {@link WebSocketSessionManager} 实现类
+ *
+ * @author 芋道源码
+ */
+public class WebSocketSessionManagerImpl implements WebSocketSessionManager {
+
+    /**
+     * id 与 WebSocketSession 映射
+     *
+     * key:Session 编号
+     */
+    private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();
+
+    /**
+     * user 与 WebSocketSession 映射
+     *
+     * key1:用户类型
+     * key2:用户编号
+     */
+    private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions
+            = new ConcurrentHashMap<>();
+
+    @Override
+    public void addSession(WebSocketSession session) {
+        // 添加到 idSessions 中
+        idSessions.put(session.getId(), session);
+        // 添加到 userSessions 中
+        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
+        if (user == null) {
+            return;
+        }
+        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
+        if (userSessionsMap == null) {
+            userSessionsMap = new ConcurrentHashMap<>();
+            if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {
+                userSessionsMap = userSessions.get(user.getUserType());
+            }
+        }
+        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
+        if (sessions == null) {
+            sessions = new CopyOnWriteArrayList<>();
+            if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {
+                sessions = userSessionsMap.get(user.getId());
+            }
+        }
+        sessions.add(session);
+    }
+
+    @Override
+    public void removeSession(WebSocketSession session) {
+        // 移除从 idSessions 中
+        idSessions.remove(session.getId(), session);
+        // 移除从 idSessions 中
+        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
+        if (user == null) {
+            return;
+        }
+        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
+        if (userSessionsMap == null) {
+            return;
+        }
+        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
+        sessions.removeIf(session0 -> session0.getId().equals(session.getId()));
+        if (CollUtil.isEmpty(sessions)) {
+            userSessionsMap.remove(user.getId(), sessions);
+        }
+    }
+
+    @Override
+    public WebSocketSession getSession(String id) {
+        return idSessions.get(id);
+    }
+
+    @Override
+    public Collection<WebSocketSession> getSessionList(Integer userType) {
+        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
+        if (CollUtil.isEmpty(userSessionsMap)) {
+            return new ArrayList<>();
+        }
+        LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容
+        Long contextTenantId = TenantContextHolder.getTenantId();
+        for (List<WebSocketSession> sessions : userSessionsMap.values()) {
+            if (CollUtil.isEmpty(sessions)) {
+                continue;
+            }
+            // 特殊:如果租户不匹配,则直接排除
+            if (contextTenantId != null) {
+                Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));
+                if (!contextTenantId.equals(userTenantId)) {
+                    continue;
+                }
+            }
+            result.addAll(sessions);
+        }
+        return result;
+    }
+
+    @Override
+    public Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {
+        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
+        if (CollUtil.isEmpty(userSessionsMap)) {
+            return new ArrayList<>();
+        }
+        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);
+        return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();
+    }
+
+}

+ 67 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java

@@ -0,0 +1,67 @@
+package cn.iocoder.yudao.framework.websocket.core.util;
+
+import cn.iocoder.yudao.framework.security.core.LoginUser;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.Map;
+
+/**
+ * 专属于 web 包的工具类
+ *
+ * @author 芋道源码
+ */
+public class WebSocketFrameworkUtils {
+
+    public static final String ATTRIBUTE_LOGIN_USER = "LOGIN_USER";
+
+    /**
+     * 设置当前用户
+     *
+     * @param loginUser 登录用户
+     * @param attributes Session
+     */
+    public static void setLoginUser(LoginUser loginUser, Map<String, Object> attributes) {
+        attributes.put(ATTRIBUTE_LOGIN_USER, loginUser);
+    }
+
+    /**
+     * 获取当前用户
+     *
+     * @return 当前用户
+     */
+    public static LoginUser getLoginUser(WebSocketSession session) {
+        return (LoginUser) session.getAttributes().get(ATTRIBUTE_LOGIN_USER);
+    }
+
+    /**
+     * 获得当前用户的编号
+     *
+     * @return 用户编号
+     */
+    public static Long getLoginUserId(WebSocketSession session) {
+        LoginUser loginUser = getLoginUser(session);
+        return loginUser != null ? loginUser.getId() : null;
+    }
+
+    /**
+     * 获得当前用户的类型
+     *
+     * @return 用户编号
+     */
+    public static Integer getLoginUserType(WebSocketSession session) {
+        LoginUser loginUser = getLoginUser(session);
+        return loginUser != null ? loginUser.getUserType() : null;
+    }
+
+    /**
+     * 获得当前用户的租户编号
+     *
+     * @param session Session
+     * @return 租户编号
+     */
+    public static Long getTenantId(WebSocketSession session) {
+        LoginUser loginUser = getLoginUser(session);
+        return loginUser != null ? loginUser.getTenantId() : null;
+    }
+
+}

+ 3 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java

@@ -1 +1,4 @@
+/**
+ * WebSocket 框架,支持多节点的广播
+ */
 package cn.iocoder.yudao.framework.websocket;

+ 1 - 0
yudao-framework/yudao-spring-boot-starter-websocket/《芋道 Spring Boot WebSocket 入门》.md

@@ -0,0 +1 @@
+<http://www.iocoder.cn/Spring-Boot/WebSocket/?yudao>

+ 54 - 0
yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java

@@ -0,0 +1,54 @@
+package cn.iocoder.yudao.module.infra.api.websocket;
+
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+
+/**
+ * WebSocket 发送器的 API 接口
+ *
+ * 对 WebSocketMessageSender 进行封装,提供给其它模块使用
+ *
+ * @author 芋道源码
+ */
+public interface WebSocketSenderApi {
+
+    /**
+     * 发送消息给指定用户
+     *
+     * @param userType 用户类型
+     * @param userId 用户编号
+     * @param messageType 消息类型
+     * @param messageContent 消息内容,JSON 格式
+     */
+    void send(Integer userType, Long userId, String messageType, String messageContent);
+
+    /**
+     * 发送消息给指定用户类型
+     *
+     * @param userType 用户类型
+     * @param messageType 消息类型
+     * @param messageContent 消息内容,JSON 格式
+     */
+    void send(Integer userType, String messageType, String messageContent);
+
+    /**
+     * 发送消息给指定 Session
+     *
+     * @param sessionId Session 编号
+     * @param messageType 消息类型
+     * @param messageContent 消息内容,JSON 格式
+     */
+    void send(String sessionId, String messageType, String messageContent);
+
+    default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
+        send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
+    }
+
+    default void sendObject(Integer userType, String messageType, Object messageContent) {
+        send(userType, messageType, JsonUtils.toJsonString(messageContent));
+    }
+
+    default void sendObject(String sessionId, String messageType, Object messageContent) {
+        send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
+    }
+
+}

+ 5 - 5
yudao-module-infra/yudao-module-infra-biz/pom.xml

@@ -46,6 +46,11 @@
             <artifactId>yudao-spring-boot-starter-security</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>cn.iocoder.boot</groupId>
+            <artifactId>yudao-spring-boot-starter-websocket</artifactId>
+        </dependency>
+
         <!-- DB 相关 -->
         <dependency>
             <groupId>cn.iocoder.boot</groupId>
@@ -116,11 +121,6 @@
             <artifactId>yudao-spring-boot-starter-file</artifactId>
         </dependency>
 
-        <!-- WebSocket -->
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-websocket</artifactId>
-        </dependency>
     </dependencies>
 
 </project>

+ 34 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java

@@ -0,0 +1,34 @@
+package cn.iocoder.yudao.module.infra.api.websocket;
+
+import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * WebSocket 发送器的 API 实现类
+ *
+ * @author 芋道源码
+ */
+@Component
+public class WebSocketSenderApiImpl implements WebSocketSenderApi {
+
+    @Resource
+    private WebSocketMessageSender webSocketMessageSender;
+
+    @Override
+    public void send(Integer userType, Long userId, String messageType, String messageContent) {
+        webSocketMessageSender.send(userType, userId, messageType, messageContent);
+    }
+
+    @Override
+    public void send(Integer userType, String messageType, String messageContent) {
+        webSocketMessageSender.send(userType, messageType, messageContent);
+    }
+
+    @Override
+    public void send(String sessionId, String messageType, String messageContent) {
+        webSocketMessageSender.send(sessionId, messageType, messageContent);
+    }
+
+}

+ 48 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java

@@ -0,0 +1,48 @@
+package cn.iocoder.yudao.module.infra.websocket;
+
+import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
+import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
+import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
+import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils;
+import cn.iocoder.yudao.module.infra.websocket.message.DemoReceiveMessage;
+import cn.iocoder.yudao.module.infra.websocket.message.DemoSendMessage;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.WebSocketSession;
+
+import javax.annotation.Resource;
+
+/**
+ * WebSocket 示例:单发消息
+ *
+ * @author 芋道源码
+ */
+@Component
+public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {
+
+    @Resource
+    private WebSocketMessageSender webSocketMessageSender;
+
+    @Override
+    public void onMessage(WebSocketSession session, DemoSendMessage message) {
+        Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);
+        // 情况一:单发
+        if (message.getToUserId() != null) {
+            DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId)
+                    .setText(message.getText()).setSingle(true);
+            webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 给指定用户
+                    "demo-message-receive", toMessage);
+            return;
+        }
+        // 情况二:群发
+        DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId)
+                .setText(message.getText()).setSingle(false);
+        webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 给所有用户
+                "demo-message-receive", toMessage);
+    }
+
+    @Override
+    public String getType() {
+        return "demo-message-send";
+    }
+
+}

+ 0 - 45
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java

@@ -1,45 +0,0 @@
-package cn.iocoder.yudao.module.infra.websocket;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.concurrent.Semaphore;
-
-/**
- * 信号量相关处理
- *
- */
-@Slf4j
-public class SemaphoreUtils {
-
-    /**
-     * 获取信号量
-     *
-     * @param semaphore
-     * @return
-     */
-    public static boolean tryAcquire(Semaphore semaphore) {
-        boolean flag = false;
-
-        try {
-            flag = semaphore.tryAcquire();
-        } catch (Exception e) {
-            log.error("获取信号量异常", e);
-        }
-
-        return flag;
-    }
-
-    /**
-     * 释放信号量
-     *
-     * @param semaphore
-     */
-    public static void release(Semaphore semaphore) {
-
-        try {
-            semaphore.release();
-        } catch (Exception e) {
-            log.error("释放信号量异常", e);
-        }
-    }
-}

+ 0 - 16
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java

@@ -1,16 +0,0 @@
-package cn.iocoder.yudao.module.infra.websocket;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
-/**
- * websocket 配置
- */
-@Configuration
-public class WebSocketConfig {
-    @Bean
-    public ServerEndpointExporter serverEndpointExporter() {
-        return new ServerEndpointExporter();
-    }
-}

+ 0 - 86
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java

@@ -1,86 +0,0 @@
-package cn.iocoder.yudao.module.infra.websocket;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import javax.websocket.*;
-import javax.websocket.server.ServerEndpoint;
-import java.util.concurrent.Semaphore;
-
-/**
- * websocket 消息处理
- */
-@Component
-@ServerEndpoint("/websocket/message")
-@Slf4j
-public class WebSocketServer {
-
-    /**
-     * 默认最多允许同时在线用户数100
-     */
-    public static int socketMaxOnlineCount = 100;
-
-    private static final Semaphore SOCKET_SEMAPHORE = new Semaphore(socketMaxOnlineCount);
-
-    /**
-     * 连接建立成功调用的方法
-     */
-    @OnOpen
-    public void onOpen(Session session) throws Exception {
-        // 尝试获取信号量
-        boolean semaphoreFlag = SemaphoreUtils.tryAcquire(SOCKET_SEMAPHORE);
-        if (!semaphoreFlag) {
-            // 未获取到信号量
-            log.error("当前在线人数超过限制数:{}", socketMaxOnlineCount);
-            WebSocketUsers.sendMessage(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
-            session.close();
-        } else {
-            String userId = WebSocketUsers.getParam("userId", session);
-            if (userId != null) {
-                // 添加用户
-                WebSocketUsers.addSession(userId, session);
-                log.info("用户【userId={}】建立连接,当前连接用户总数:{}", userId, WebSocketUsers.getUsers().size());
-                WebSocketUsers.sendMessage(session, "接收内容:连接成功");
-            } else {
-                WebSocketUsers.sendMessage(session, "接收内容:连接失败");
-            }
-        }
-    }
-
-    /**
-     * 连接关闭时处理
-     */
-    @OnClose
-    public void onClose(Session session) {
-        log.info("用户【sessionId={}】关闭连接!", session.getId());
-        // 移除用户
-        WebSocketUsers.removeSession(session);
-        // 获取到信号量则需释放
-        SemaphoreUtils.release(SOCKET_SEMAPHORE);
-    }
-
-    /**
-     * 抛出异常时处理
-     */
-    @OnError
-    public void onError(Session session, Throwable exception) throws Exception {
-        if (session.isOpen()) {
-            // 关闭连接
-            session.close();
-        }
-        String sessionId = session.getId();
-        log.info("用户【sessionId={}】连接异常!异常信息:{}", sessionId, exception);
-        // 移出用户
-        WebSocketUsers.removeSession(session);
-        // 获取到信号量则需释放
-        SemaphoreUtils.release(SOCKET_SEMAPHORE);
-    }
-
-    /**
-     * 收到客户端消息时调用的方法
-     */
-    @OnMessage
-    public void onMessage(Session session, String message) {
-        WebSocketUsers.sendMessage(session, "接收内容:" + message);
-    }
-}

+ 0 - 178
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java

@@ -1,178 +0,0 @@
-package cn.iocoder.yudao.module.infra.websocket;
-
-import cn.hutool.core.map.MapUtil;
-import cn.hutool.core.util.StrUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.bouncycastle.util.Strings;
-
-import javax.validation.constraints.NotNull;
-import javax.websocket.Session;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * websocket 客户端用户
- */
-@Slf4j
-public class WebSocketUsers {
-
-    /**
-     * 用户集
-     *  TODO 需要登录用户的session?
-     */
-    private static final Map<String, Session> SESSION_MAP = new ConcurrentHashMap<>();
-
-    /**
-     * 存储用户
-     *
-     * @param userId  唯一键
-     * @param session 用户信息
-     */
-    public static void addSession(String userId, Session session) {
-        SESSION_MAP.put(userId, session);
-    }
-
-    /**
-     * 移除用户
-     *
-     * @param session 用户信息
-     * @return 移除结果
-     */
-    public static boolean removeSession(Session session) {
-        String key = null;
-        boolean flag = SESSION_MAP.containsValue(session);
-        if (flag) {
-            Set<Map.Entry<String, Session>> entries = SESSION_MAP.entrySet();
-            for (Map.Entry<String, Session> entry : entries) {
-                Session value = entry.getValue();
-                if (value.equals(session)) {
-                    key = entry.getKey();
-                    break;
-                }
-            }
-        } else {
-            return true;
-        }
-        return removeSession(key);
-    }
-
-    /**
-     * 移出用户
-     *
-     * @param userId 用户id
-     */
-    public static boolean removeSession(String userId) {
-        log.info("用户【userId={}】退出", userId);
-        Session remove = SESSION_MAP.remove(userId);
-        if (remove != null) {
-            boolean containsValue = SESSION_MAP.containsValue(remove);
-            log.info("用户【userId={}】退出{},当前连接用户总数:{}", userId, containsValue ? "失败" : "成功", SESSION_MAP.size());
-            return containsValue;
-        } else {
-            return true;
-        }
-    }
-
-    /**
-     * 获取在线用户列表
-     *
-     * @return 返回用户集合
-     */
-    public static Map<String, Session> getUsers() {
-        return SESSION_MAP;
-    }
-
-    /**
-     * 向所有在线人发送消息
-     *
-     * @param message 消息内容
-     */
-    public static void sendMessageToAll(String message) {
-        SESSION_MAP.forEach((userId, session) -> {
-            if (session.isOpen()) {
-                sendMessage(session, message);
-            }
-        });
-    }
-
-    /**
-     * 异步发送文本消息
-     *
-     * @param session 用户session
-     * @param message 消息内容
-     */
-    public static void sendMessageAsync(Session session, String message) {
-        if (session.isOpen()) {
-            // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程?
-            session.getAsyncRemote().sendText(message);
-        } else {
-            log.warn("用户【session={}】不在线", session.getId());
-        }
-    }
-
-    /**
-     * 同步发送文本消息
-     *
-     * @param session 用户session
-     * @param message 消息内容
-     */
-    public static void sendMessage(Session session, String message) {
-        try {
-            if (session.isOpen()) {
-                // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程?
-                session.getBasicRemote().sendText(message);
-            } else {
-                log.warn("用户【session={}】不在线", session.getId());
-            }
-        } catch (IOException e) {
-            log.error("发送消息异常", e);
-        }
-
-    }
-
-    /**
-     * 根据用户id发送消息
-     *
-     * @param userId  用户id
-     * @param message 消息内容
-     */
-    public static void sendMessage(String userId, String message) {
-        Session session = SESSION_MAP.get(userId);
-        //判断是否存在该用户的session,并且是否在线
-        if (session == null || !session.isOpen()) {
-            return;
-        }
-        sendMessage(session, message);
-    }
-
-
-    /**
-     * 获取session中的指定参数值
-     *
-     * @param key     参数key
-     * @param session 用户session
-     */
-    public static String getParam(@NotNull String key, Session session) {
-        //TODO 目前只针对获取一个key的值,后期根据情况拓展多个 或者直接在onClose onOpen上获取参数?
-        String value = null;
-        Map<String, List<String>> parameters = session.getRequestParameterMap();
-        if (MapUtil.isNotEmpty(parameters)) {
-            value = parameters.get(key).get(0);
-        } else {
-            String queryString = session.getQueryString();
-            if (!StrUtil.isEmpty(queryString)) {
-                String[] params = Strings.split(queryString, '&');
-                for (String paramPair : params) {
-                    String[] nameValues = Strings.split(paramPair, '=');
-                    if (key.equals(nameValues[0])) {
-                        value = nameValues[1];
-                    }
-                }
-            }
-        }
-        return value;
-    }
-}

+ 27 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java

@@ -0,0 +1,27 @@
+package cn.iocoder.yudao.module.infra.websocket.message;
+
+import lombok.Data;
+
+/**
+ * 示例:server -> client 同步消息
+ *
+ * @author 芋道源码
+ */
+@Data
+public class DemoReceiveMessage {
+
+    /**
+     * 接收人的编号
+     */
+    private Long fromUserId;
+    /**
+     * 内容
+     */
+    private String text;
+
+    /**
+     * 是否单聊
+     */
+    private Boolean single;
+
+}

+ 24 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java

@@ -0,0 +1,24 @@
+package cn.iocoder.yudao.module.infra.websocket.message;
+
+import lombok.Data;
+
+/**
+ * 示例:client -> server 发送消息
+ *
+ * @author 芋道源码
+ */
+@Data
+public class DemoSendMessage {
+
+    /**
+     * 发送给谁
+     *
+     * 如果为空,说明发送给所有人
+     */
+    private Long toUserId;
+    /**
+     * 内容
+     */
+    private String text;
+
+}

+ 2 - 1
yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.java

@@ -53,7 +53,8 @@ public class AppAuthController {
     @PermitAll
     @Operation(summary = "登出系统")
     public CommonResult<Boolean> logout(HttpServletRequest request) {
-        String token = SecurityFrameworkUtils.obtainAuthorization(request, securityProperties.getTokenHeader());
+        String token = SecurityFrameworkUtils.obtainAuthorization(request,
+                securityProperties.getTokenHeader(), securityProperties.getTokenParameter());
         if (StrUtil.isNotBlank(token)) {
             authService.logout(token);
         }

+ 3 - 2
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/auth/AuthController.java

@@ -7,6 +7,7 @@ import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
 import cn.iocoder.yudao.framework.common.pojo.CommonResult;
 import cn.iocoder.yudao.framework.operatelog.core.annotations.OperateLog;
 import cn.iocoder.yudao.framework.security.config.SecurityProperties;
+import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
 import cn.iocoder.yudao.module.system.controller.admin.auth.vo.*;
 import cn.iocoder.yudao.module.system.convert.auth.AuthConvert;
 import cn.iocoder.yudao.module.system.dal.dataobject.permission.MenuDO;
@@ -38,7 +39,6 @@ import java.util.Set;
 import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
 import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
 import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils.getLoginUserId;
-import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils.obtainAuthorization;
 
 @Tag(name = "管理后台 - 认证")
 @RestController
@@ -76,7 +76,8 @@ public class AuthController {
     @Operation(summary = "登出系统")
     @OperateLog(enable = false) // 避免 Post 请求被记录操作日志
     public CommonResult<Boolean> logout(HttpServletRequest request) {
-        String token = obtainAuthorization(request, securityProperties.getTokenHeader());
+        String token = SecurityFrameworkUtils.obtainAuthorization(request,
+                securityProperties.getTokenHeader(), securityProperties.getTokenParameter());
         if (StrUtil.isNotBlank(token)) {
             authService.logout(token, LoginLogTypeEnum.LOGOUT_SELF.getType());
         }

+ 19 - 0
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java

@@ -1,12 +1,16 @@
 package cn.iocoder.yudao.module.system.controller.admin.notice;
 
+import cn.hutool.core.lang.Assert;
+import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
 import cn.iocoder.yudao.framework.common.pojo.CommonResult;
 import cn.iocoder.yudao.framework.common.pojo.PageResult;
+import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi;
 import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeCreateReqVO;
 import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticePageReqVO;
 import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeRespVO;
 import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeUpdateReqVO;
 import cn.iocoder.yudao.module.system.convert.notice.NoticeConvert;
+import cn.iocoder.yudao.module.system.dal.dataobject.notice.NoticeDO;
 import cn.iocoder.yudao.module.system.service.notice.NoticeService;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import io.swagger.v3.oas.annotations.Parameter;
@@ -29,6 +33,9 @@ public class NoticeController {
     @Resource
     private NoticeService noticeService;
 
+    @Resource
+    private WebSocketSenderApi webSocketSenderApi;
+
     @PostMapping("/create")
     @Operation(summary = "创建通知公告")
     @PreAuthorize("@ss.hasPermission('system:notice:create')")
@@ -69,4 +76,16 @@ public class NoticeController {
         return success(NoticeConvert.INSTANCE.convert(noticeService.getNotice(id)));
     }
 
+    @PostMapping("/push")
+    @Operation(summary = "推送通知公告", description = "只发送给 websocket 连接在线的用户")
+    @Parameter(name = "id", description = "编号", required = true, example = "1024")
+    @PreAuthorize("@ss.hasPermission('system:notice:update')")
+    public CommonResult<Boolean> push(@RequestParam("id") Long id) {
+        NoticeDO notice = noticeService.getNotice(id);
+        Assert.notNull(notice, "公告不能为空");
+        // 通过 websocket 推送给在线的用户
+        webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), "notice-push", notice);
+        return success(true);
+    }
+
 }

+ 2 - 2
yudao-server/src/main/resources/application-local.yaml

@@ -123,8 +123,8 @@ spring:
   rabbitmq:
     host: 127.0.0.1 # RabbitMQ 服务的地址
     port: 5672 # RabbitMQ 服务的端口
-    username: guest # RabbitMQ 服务的账号
-    password: guest # RabbitMQ 服务的密码
+    username: rabbit # RabbitMQ 服务的账号
+    password: rabbit # RabbitMQ 服务的密码
   # Kafka 配置项,对应 KafkaProperties 配置类
   kafka:
     bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔

+ 11 - 3
yudao-server/src/main/resources/application.yaml

@@ -146,9 +146,17 @@ yudao:
       - /admin-api/mp/open/** # 微信公众号开放平台,微信回调接口,不需要登录
   websocket:
     enable: true # websocket的开关
-    path: /websocket/message # 路径
-    maxOnlineCount: 0 # 最大连接人数
-    sessionMap: true # 保存sessionMap
+    path: /infra/ws # 路径
+    sender-type: local # 消息发送的类型,可选值为 local、redis、rocketmq、kafka、rabbitmq
+    sender-rocketmq:
+      topic: ${spring.application.name}-websocket # 消息发送的 RocketMQ Topic
+      consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 RocketMQ Consumer Group
+    sender-rabbitmq:
+      exchange: ${spring.application.name}-websocket-exchange # 消息发送的 RabbitMQ Exchange
+      queue: ${spring.application.name}-websocket-queue # 消息发送的 RabbitMQ Queue
+    sender-kafka:
+      topic: ${spring.application.name}-websocket # 消息发送的 Kafka Topic
+      consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 Kafka Consumer Group
   swagger:
     title: 芋道快速开发平台
     description: 提供管理后台、用户 App 的所有功能