Browse Source

websocket:重新封装 websocket 组件,支持 token 认证,并增加 WebSocketMessageListener 方便处理消息

YunaiV 1 year ago
parent
commit
1941a7b3e6
27 changed files with 539 additions and 540 deletions
  1. 6 0
      yudao-dependencies/pom.xml
  2. 6 2
      yudao-framework/yudao-spring-boot-starter-websocket/pom.xml
  3. 0 14
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java
  4. 3 10
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java
  5. 37 8
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java
  6. 0 24
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java
  7. 0 9
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java
  8. 0 24
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java
  9. 0 36
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java
  10. 0 31
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java
  11. 0 49
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java
  12. 80 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java
  13. 31 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java
  14. 27 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java
  15. 42 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java
  16. 24 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java
  17. 49 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java
  18. 53 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java
  19. 113 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java
  20. 59 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java
  21. 3 0
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java
  22. 5 5
      yudao-module-infra/yudao-module-infra-biz/pom.xml
  23. 0 45
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java
  24. 0 16
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java
  25. 0 86
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java
  26. 0 178
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java
  27. 1 3
      yudao-server/src/main/resources/application.yaml

+ 6 - 0
yudao-dependencies/pom.xml

@@ -175,6 +175,12 @@
                 <version>${revision}</version>
             </dependency>
 
+            <dependency>
+                <groupId>cn.iocoder.boot</groupId>
+                <artifactId>yudao-spring-boot-starter-websocket</artifactId>
+                <version>${revision}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.github.xiaoymin</groupId>
                 <artifactId>knife4j-openapi3-spring-boot-starter</artifactId>

+ 6 - 2
yudao-framework/yudao-spring-boot-starter-websocket/pom.xml

@@ -12,18 +12,22 @@
     <packaging>jar</packaging>
 
     <name>${project.artifactId}</name>
-    <description>WebSocket</description>
+    <description>WebSocket 框架,支持多节点的广播</description>
     <url>https://github.com/YunaiV/ruoyi-vue-pro</url>
 
 
     <dependencies>
-
         <dependency>
             <groupId>cn.iocoder.boot</groupId>
             <artifactId>yudao-common</artifactId>
         </dependency>
 
+        <!-- Web 相关 -->
         <dependency>
+            <!-- 为什么是 websocket 依赖 security 呢?而不是 security 拓展 websocket 呢?
+                 因为 websocket 和 LoginUser 当前登录的用户有一定的相关性,具体可见 WebSocketSessionManagerImpl 逻辑。
+                 如果让 security 拓展 websocket 的话,会导致 websocket 组件的封装很散,进而增大理解成本。
+            -->
             <groupId>cn.iocoder.boot</groupId>
             <artifactId>yudao-spring-boot-starter-security</artifactId>
         </dependency>

+ 0 - 14
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java

@@ -1,14 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.config;
-
-import cn.iocoder.yudao.framework.websocket.core.UserHandshakeInterceptor;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.web.socket.server.HandshakeInterceptor;
-
-@EnableConfigurationProperties(WebSocketProperties.class)
-public class WebSocketHandlerConfig {
-    @Bean
-    public HandshakeInterceptor handshakeInterceptor() {
-        return new UserHandshakeInterceptor();
-    }
-}

+ 3 - 10
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java

@@ -15,15 +15,8 @@ import org.springframework.validation.annotation.Validated;
 public class WebSocketProperties {
 
     /**
-     * 路径
+     * WebSocket 的连接路径
      */
-    private String path = "";
-    /**
-     * 默认最多允许同时在线用户数
-     */
-    private int maxOnlineCount = 0;
-    /**
-     * 是否保存session
-     */
-    private boolean sessionMap = true;
+    private String path = "/ws";
+
 }

+ 37 - 8
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java

@@ -1,11 +1,17 @@
 package cn.iocoder.yudao.framework.websocket.config;
 
+import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler;
+import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
+import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
+import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
 import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
 import org.springframework.web.socket.server.HandshakeInterceptor;
 
@@ -17,18 +23,41 @@ import java.util.List;
  * @author xingyu4j
  */
 @AutoConfiguration
-// 允许使用 yudao.websocket.enable=false 禁用websocket
-@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true)
+@EnableWebSocket // 开启 websocket
+@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket
+
 @EnableConfigurationProperties(WebSocketProperties.class)
 public class YudaoWebSocketAutoConfiguration {
+
     @Bean
-    @ConditionalOnMissingBean
-    public WebSocketConfigurer webSocketConfigurer(List<HandshakeInterceptor> handshakeInterceptor,
+    public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors,
                                                    WebSocketHandler webSocketHandler,
                                                    WebSocketProperties webSocketProperties) {
-
         return registry -> registry
+                // 添加 WebSocketHandler
                 .addHandler(webSocketHandler, webSocketProperties.getPath())
-                .addInterceptors(handshakeInterceptor.toArray(new HandshakeInterceptor[0]));
+                .addInterceptors(handshakeInterceptors)
+                // 允许跨域,否则前端连接会直接断开
+                .setAllowedOriginPatterns("*");
     }
-}
+
+    @Bean
+    public HandshakeInterceptor handshakeInterceptor() {
+        return new LoginUserHandshakeInterceptor();
+    }
+
+    @Bean
+    public WebSocketHandler webSocketHandler(WebSocketSessionManager sessionManager,
+                                             List<? extends WebSocketMessageListener<?>> messageListeners) {
+        // 1. 创建 JsonWebSocketMessageHandler 对象,处理消息
+        JsonWebSocketMessageHandler messageHandler = new JsonWebSocketMessageHandler(messageListeners);
+        // 2. 创建 WebSocketSessionHandlerDecorator 对象,处理连接
+        return new WebSocketSessionHandlerDecorator(messageHandler, sessionManager);
+    }
+
+    @Bean
+    public WebSocketSessionManager webSocketSessionManager() {
+        return new WebSocketSessionManagerImpl();
+    }
+
+}

+ 0 - 24
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java

@@ -1,24 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import cn.iocoder.yudao.framework.security.core.LoginUser;
-import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
-import org.springframework.http.server.ServerHttpRequest;
-import org.springframework.http.server.ServerHttpResponse;
-import org.springframework.web.socket.WebSocketHandler;
-import org.springframework.web.socket.server.HandshakeInterceptor;
-
-import java.util.Map;
-
-public class UserHandshakeInterceptor implements HandshakeInterceptor {
-    @Override
-    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
-        LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();
-        attributes.put(WebSocketKeyDefine.LOGIN_USER, loginUser);
-        return true;
-    }
-
-    @Override
-    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
-
-    }
-}

+ 0 - 9
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java

@@ -1,9 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-
-import lombok.Data;
-
-@Data
-public class WebSocketKeyDefine {
-    public static final String LOGIN_USER ="LOGIN_USER";
-}

+ 0 - 24
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java

@@ -1,24 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.List;
-
-@Data
-@Accessors(chain = true)
-public class WebSocketMessageDO {
-    /**
-     * 接收消息的seesion
-     */
-    private List<Object> seesionKeyList;
-    /**
-     * 发送消息
-     */
-    private String msgText;
-
-    public static WebSocketMessageDO build(List<Object> seesionKeyList, String msgText) {
-        return new WebSocketMessageDO().setMsgText(msgText).setSeesionKeyList(seesionKeyList);
-    }
-
-}

+ 0 - 36
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java

@@ -1,36 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import org.springframework.web.socket.WebSocketSession;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-public final class WebSocketSessionHandler {
-    private WebSocketSessionHandler() {
-    }
-
-    private static final Map<String, WebSocketSession> SESSION_MAP = new ConcurrentHashMap<>();
-
-    public static void addSession(Object sessionKey, WebSocketSession session) {
-        SESSION_MAP.put(sessionKey.toString(), session);
-    }
-
-    public static void removeSession(Object sessionKey) {
-        SESSION_MAP.remove(sessionKey.toString());
-    }
-
-    public static WebSocketSession getSession(Object sessionKey) {
-        return SESSION_MAP.get(sessionKey.toString());
-    }
-
-    public static Collection<WebSocketSession> getSessions() {
-        return SESSION_MAP.values();
-    }
-
-    public static Set<String> getSessionKeys() {
-        return SESSION_MAP.keySet();
-    }
-
-}

+ 0 - 31
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java

@@ -1,31 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.web.socket.TextMessage;
-import org.springframework.web.socket.WebSocketSession;
-
-import java.io.IOException;
-
-@Slf4j
-public class WebSocketUtils {
-    public static boolean sendMessage(WebSocketSession seesion, String message) {
-        if (seesion == null) {
-            log.error("seesion 不存在");
-            return false;
-        }
-        if (seesion.isOpen()) {
-            try {
-                seesion.sendMessage(new TextMessage(message));
-            } catch (IOException e) {
-                log.error("WebSocket 消息发送异常 Session={} | msg= {} | exception={}", seesion, message, e);
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public static boolean sendMessage(Object sessionKey, String message) {
-        WebSocketSession session = WebSocketSessionHandler.getSession(sessionKey);
-        return sendMessage(session, message);
-    }
-}

+ 0 - 49
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java

@@ -1,49 +0,0 @@
-package cn.iocoder.yudao.framework.websocket.core;
-
-import cn.iocoder.yudao.framework.security.core.LoginUser;
-import org.springframework.web.socket.CloseStatus;
-import org.springframework.web.socket.WebSocketHandler;
-import org.springframework.web.socket.WebSocketSession;
-import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
-
-public class YudaoWebSocketHandlerDecorator extends WebSocketHandlerDecorator {
-    public YudaoWebSocketHandlerDecorator(WebSocketHandler delegate) {
-        super(delegate);
-    }
-
-    /**
-     * websocket 连接时执行的动作
-     * @param session websocket session 对象
-     * @throws Exception 异常对象
-     */
-    @Override
-    public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
-        Object sessionKey = sessionKeyGen(session);
-        WebSocketSessionHandler.addSession(sessionKey, session);
-    }
-
-    /**
-     * websocket 关闭连接时执行的动作
-     * @param session websocket session 对象
-     * @param closeStatus 关闭状态对象
-     * @throws Exception 异常对象
-     */
-    @Override
-    public void afterConnectionClosed(final WebSocketSession session, CloseStatus closeStatus) throws Exception {
-        Object sessionKey = sessionKeyGen(session);
-        WebSocketSessionHandler.removeSession(sessionKey);
-    }
-
-    public Object sessionKeyGen(WebSocketSession webSocketSession) {
-
-        Object obj = webSocketSession.getAttributes().get(WebSocketKeyDefine.LOGIN_USER);
-
-        if (obj instanceof LoginUser) {
-            LoginUser loginUser = (LoginUser) obj;
-            // userId 作为唯一区分
-            return String.valueOf(loginUser.getId());
-        }
-
-        return null;
-    }
-}

+ 80 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java

@@ -0,0 +1,80 @@
+package cn.iocoder.yudao.framework.websocket.core.handler;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.util.TypeUtil;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
+import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+/**
+ * JSON 格式 {@link WebSocketHandler} 实现类
+ *
+ * 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class JsonWebSocketMessageHandler extends TextWebSocketHandler {
+
+    /**
+     * type 与 WebSocketMessageListener 的映射
+     */
+    private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {
+        listenersList.forEach((Consumer<WebSocketMessageListener>)
+                listener -> listeners.put(listener.getType(), listener));
+    }
+
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        // 1.1 空消息,跳过
+        if (message.getPayloadLength() == 0) {
+            return;
+        }
+        // 1.2 ping 心跳消息,直接返回 pong 消息。
+        if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {
+            session.sendMessage(new TextMessage("pong"));
+            return;
+        }
+
+        // 2.1 解析消息
+        try {
+            JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);
+            if (jsonMessage == null) {
+                log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());
+                return;
+            }
+            if (StrUtil.isEmpty(jsonMessage.getType())) {
+                log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());
+                return;
+            }
+            // 2.2 获得对应的 WebSocketMessageListener
+            WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());
+            if (messageListener == null) {
+                log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());
+                return;
+            }
+            // 2.3 处理消息
+            Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);
+            Object messageObj = JsonUtils.parseObject(jsonMessage.getMessage(), type);
+            messageListener.onMessage(session, messageObj);
+        } catch (Throwable ex) {
+            log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());
+        }
+    }
+
+}

+ 31 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java

@@ -0,0 +1,31 @@
+package cn.iocoder.yudao.framework.websocket.core.listener;
+
+import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+/**
+ * WebSocket 消息监听器接口
+ *
+ * 目的:前端发送消息给后端后,处理对应 {@link #getType()} 类型的消息
+ *
+ * @param <T> 泛型,消息类型
+ */
+public interface WebSocketMessageListener<T> {
+
+    /**
+     * 处理消息
+     *
+     * @param session Session
+     * @param message 消息
+     */
+    void onMessage(WebSocketSession session, T message);
+
+    /**
+     * 获得消息类型
+     *
+     * @see JsonWebSocketMessage#getType()
+     * @return 消息类型
+     */
+    String getType();
+
+}

+ 27 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java

@@ -0,0 +1,27 @@
+package cn.iocoder.yudao.framework.websocket.core.message;
+
+import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
+import lombok.Data;
+
+/**
+ * JSON 格式的 WebSocket 消息帧
+ *
+ * @author 芋道源码
+ */
+@Data
+public class JsonWebSocketMessage {
+
+    /**
+     * 消息类型
+     *
+     * 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类
+     */
+    private String type;
+    /**
+     * 消息内容
+     *
+     * 要求 JSON 对象
+     */
+    private String message;
+
+}

+ 42 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java

@@ -0,0 +1,42 @@
+package cn.iocoder.yudao.framework.websocket.core.security;
+
+import cn.iocoder.yudao.framework.security.core.LoginUser;
+import cn.iocoder.yudao.framework.security.core.filter.TokenAuthenticationFilter;
+import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
+import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+import java.util.Map;
+
+/**
+ * 登录用户的 {@link HandshakeInterceptor} 实现类
+ *
+ * 流程如下:
+ * 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过
+ * 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中
+ *
+ * @author 芋道源码
+ */
+public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {
+
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
+                                   WebSocketHandler wsHandler, Map<String, Object> attributes) {
+        LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();
+        if (loginUser != null) {
+            WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);
+        }
+        return true;
+    }
+
+    @Override
+    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
+                               WebSocketHandler wsHandler, Exception exception) {
+        // do nothing
+    }
+
+}

+ 24 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java

@@ -0,0 +1,24 @@
+package cn.iocoder.yudao.framework.websocket.core.security;
+
+import cn.iocoder.yudao.framework.security.config.AuthorizeRequestsCustomizer;
+import cn.iocoder.yudao.framework.websocket.config.WebSocketProperties;
+import lombok.RequiredArgsConstructor;
+import org.springframework.security.config.annotation.web.builders.HttpSecurity;
+import org.springframework.security.config.annotation.web.configurers.ExpressionUrlAuthorizationConfigurer;
+
+/**
+ * WebSocket 的权限自定义
+ *
+ * @author 芋道源码
+ */
+@RequiredArgsConstructor
+public class WebSocketAuthorizeRequestsCustomizer extends AuthorizeRequestsCustomizer {
+
+    private final WebSocketProperties webSocketProperties;
+
+    @Override
+    public void customize(ExpressionUrlAuthorizationConfigurer<HttpSecurity>.ExpressionInterceptUrlRegistry registry) {
+        registry.antMatchers(webSocketProperties.getPath()).permitAll();
+    }
+
+}

+ 49 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java

@@ -0,0 +1,49 @@
+package cn.iocoder.yudao.framework.websocket.core.session;
+
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
+import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
+
+/**
+ * {@link WebSocketHandler} 的装饰类,实现了以下功能:
+ *
+ * 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理
+ * 2. 封装 {@link WebSocketSession} 支持并发操作
+ *
+ * @author 芋道源码
+ */
+public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator {
+
+    /**
+     * 发送时间的限制,单位:毫秒
+     */
+    private static final Integer SEND_TIME_LIMIT = 1000 * 5;
+    /**
+     * 发送消息缓冲上线,单位:bytes
+     */
+    private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100;
+
+    private final WebSocketSessionManager sessionManager;
+
+    public WebSocketSessionHandlerDecorator(WebSocketHandler delegate,
+                                            WebSocketSessionManager sessionManager) {
+        super(delegate);
+        this.sessionManager = sessionManager;
+    }
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) {
+        // 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149
+        session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT);
+        // 添加到 WebSocketSessionManager 中
+        sessionManager.addSession(session);
+    }
+
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
+        sessionManager.removeSession(session);
+    }
+
+}

+ 53 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java

@@ -0,0 +1,53 @@
+package cn.iocoder.yudao.framework.websocket.core.session;
+
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.Collection;
+
+/**
+ * {@link WebSocketSession} 管理器的接口
+ *
+ * @author 芋道源码
+ */
+public interface WebSocketSessionManager {
+
+    /**
+     * 添加 Session
+     *
+     * @param session Session
+     */
+    void addSession(WebSocketSession session);
+
+    /**
+     * 移除 Session
+     *
+     * @param session Session
+     */
+    void removeSession(WebSocketSession session);
+
+    /**
+     * 获得指定编号的 Session
+     *
+     * @param id Session 编号
+     * @return Session
+     */
+    WebSocketSession getSession(String id);
+
+    /**
+     * 获得指定用户类型的 Session 列表
+     *
+     * @param userType 用户类型
+     * @return Session 列表
+     */
+    Collection<WebSocketSession> getSessionList(Integer userType);
+
+    /**
+     * 获得指定用户编号的 Session 列表
+     *
+     * @param userType 用户类型
+     * @param userId 用户编号
+     * @return Session 列表
+     */
+    Collection<WebSocketSession> getSessionList(Integer userType, Long userId);
+
+}

+ 113 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java

@@ -0,0 +1,113 @@
+package cn.iocoder.yudao.framework.websocket.core.session;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.iocoder.yudao.framework.security.core.LoginUser;
+import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * 默认的 {@link WebSocketSessionManager} 实现类
+ *
+ * @author 芋道源码
+ */
+public class WebSocketSessionManagerImpl implements WebSocketSessionManager {
+
+    /**
+     * id 与 WebSocketSession 映射
+     *
+     * key:Session 编号
+     */
+    private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();
+
+    /**
+     * user 与 WebSocketSession 映射
+     *
+     * key1:用户类型
+     * key2:用户编号
+     */
+    private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions
+            = new ConcurrentHashMap<>();
+
+    @Override
+    public void addSession(WebSocketSession session) {
+        // 添加到 idSessions 中
+        idSessions.put(session.getId(), session);
+        // 添加到 userSessions 中
+        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
+        if (user == null) {
+            return;
+        }
+        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
+        if (userSessionsMap == null) {
+            userSessionsMap = new ConcurrentHashMap<>();
+            if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {
+                userSessionsMap = userSessions.get(user.getUserType());
+            }
+        }
+        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
+        if (sessions == null) {
+            sessions = new CopyOnWriteArrayList<>();
+            if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {
+                sessions = userSessionsMap.get(user.getId());
+            }
+        }
+        sessions.add(session);
+    }
+
+    @Override
+    public void removeSession(WebSocketSession session) {
+        // 移除从 idSessions 中
+        idSessions.remove(session.getId(), session);
+        // 移除从 idSessions 中
+        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
+        if (user == null) {
+            return;
+        }
+        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
+        if (userSessionsMap == null) {
+            return;
+        }
+        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
+        sessions.removeIf(session0 -> session0.getId().equals(session.getId()));
+        if (CollUtil.isEmpty(sessions)) {
+            userSessionsMap.remove(user.getId(), sessions);
+        }
+    }
+
+    @Override
+    public WebSocketSession getSession(String id) {
+        return idSessions.get(id);
+    }
+
+    @Override
+    public Collection<WebSocketSession> getSessionList(Integer userType) {
+        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
+        if (CollUtil.isEmpty(userSessionsMap)) {
+            return new ArrayList<>();
+        }
+        LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容
+        for (List<WebSocketSession> sessions : userSessionsMap.values()) {
+            if (CollUtil.isNotEmpty(sessions)) {
+                continue;
+            }
+            result.addAll(sessions);
+        }
+        return result;
+    }
+
+    @Override
+    public Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {
+        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
+        if (CollUtil.isEmpty(userSessionsMap)) {
+            return new ArrayList<>();
+        }
+        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);
+        return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();
+    }
+
+}

+ 59 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java

@@ -0,0 +1,59 @@
+package cn.iocoder.yudao.framework.websocket.core.util;
+
+import cn.iocoder.yudao.framework.security.core.LoginUser;
+import org.springframework.lang.Nullable;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.Map;
+
+/**
+ * 专属于 web 包的工具类
+ *
+ * @author 芋道源码
+ */
+public class WebSocketFrameworkUtils {
+
+    public static final String ATTRIBUTE_LOGIN_USER = "LOGIN_USER";
+
+    /**
+     * 设置当前用户
+     *
+     * @param loginUser 登录用户
+     * @param attributes Session
+     */
+    public static void setLoginUser(LoginUser loginUser, Map<String, Object> attributes) {
+        attributes.put(ATTRIBUTE_LOGIN_USER, loginUser);
+    }
+
+    /**
+     * 获取当前用户
+     *
+     * @return 当前用户
+     */
+    public static LoginUser getLoginUser(WebSocketSession session) {
+        return (LoginUser) session.getAttributes().get(ATTRIBUTE_LOGIN_USER);
+    }
+
+    /**
+     * 获得当前用户的编号
+     *
+     * @return 用户编号
+     */
+    @Nullable
+    public static Long getLoginUserId(WebSocketSession session) {
+        LoginUser loginUser = getLoginUser(session);
+        return loginUser != null ? loginUser.getId() : null;
+    }
+
+    /**
+     * 获得当前用户的类型
+     *
+     * @return 用户编号
+     */
+    @Nullable
+    public static Integer getLoginUserType(WebSocketSession session) {
+        LoginUser loginUser = getLoginUser(session);
+        return loginUser != null ? loginUser.getUserType() : null;
+    }
+
+}

+ 3 - 0
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java

@@ -1 +1,4 @@
+/**
+ * WebSocket 框架,支持多节点的广播
+ */
 package cn.iocoder.yudao.framework.websocket;

+ 5 - 5
yudao-module-infra/yudao-module-infra-biz/pom.xml

@@ -46,6 +46,11 @@
             <artifactId>yudao-spring-boot-starter-security</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>cn.iocoder.boot</groupId>
+            <artifactId>yudao-spring-boot-starter-websocket</artifactId>
+        </dependency>
+
         <!-- DB 相关 -->
         <dependency>
             <groupId>cn.iocoder.boot</groupId>
@@ -116,11 +121,6 @@
             <artifactId>yudao-spring-boot-starter-file</artifactId>
         </dependency>
 
-        <!-- WebSocket -->
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-websocket</artifactId>
-        </dependency>
     </dependencies>
 
 </project>

+ 0 - 45
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java

@@ -1,45 +0,0 @@
-package cn.iocoder.yudao.module.infra.websocket;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.concurrent.Semaphore;
-
-/**
- * 信号量相关处理
- *
- */
-@Slf4j
-public class SemaphoreUtils {
-
-    /**
-     * 获取信号量
-     *
-     * @param semaphore
-     * @return
-     */
-    public static boolean tryAcquire(Semaphore semaphore) {
-        boolean flag = false;
-
-        try {
-            flag = semaphore.tryAcquire();
-        } catch (Exception e) {
-            log.error("获取信号量异常", e);
-        }
-
-        return flag;
-    }
-
-    /**
-     * 释放信号量
-     *
-     * @param semaphore
-     */
-    public static void release(Semaphore semaphore) {
-
-        try {
-            semaphore.release();
-        } catch (Exception e) {
-            log.error("释放信号量异常", e);
-        }
-    }
-}

+ 0 - 16
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java

@@ -1,16 +0,0 @@
-package cn.iocoder.yudao.module.infra.websocket;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
-/**
- * websocket 配置
- */
-@Configuration
-public class WebSocketConfig {
-    @Bean
-    public ServerEndpointExporter serverEndpointExporter() {
-        return new ServerEndpointExporter();
-    }
-}

+ 0 - 86
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java

@@ -1,86 +0,0 @@
-package cn.iocoder.yudao.module.infra.websocket;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import javax.websocket.*;
-import javax.websocket.server.ServerEndpoint;
-import java.util.concurrent.Semaphore;
-
-/**
- * websocket 消息处理
- */
-@Component
-@ServerEndpoint("/websocket/message")
-@Slf4j
-public class WebSocketServer {
-
-    /**
-     * 默认最多允许同时在线用户数100
-     */
-    public static int socketMaxOnlineCount = 100;
-
-    private static final Semaphore SOCKET_SEMAPHORE = new Semaphore(socketMaxOnlineCount);
-
-    /**
-     * 连接建立成功调用的方法
-     */
-    @OnOpen
-    public void onOpen(Session session) throws Exception {
-        // 尝试获取信号量
-        boolean semaphoreFlag = SemaphoreUtils.tryAcquire(SOCKET_SEMAPHORE);
-        if (!semaphoreFlag) {
-            // 未获取到信号量
-            log.error("当前在线人数超过限制数:{}", socketMaxOnlineCount);
-            WebSocketUsers.sendMessage(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
-            session.close();
-        } else {
-            String userId = WebSocketUsers.getParam("userId", session);
-            if (userId != null) {
-                // 添加用户
-                WebSocketUsers.addSession(userId, session);
-                log.info("用户【userId={}】建立连接,当前连接用户总数:{}", userId, WebSocketUsers.getUsers().size());
-                WebSocketUsers.sendMessage(session, "接收内容:连接成功");
-            } else {
-                WebSocketUsers.sendMessage(session, "接收内容:连接失败");
-            }
-        }
-    }
-
-    /**
-     * 连接关闭时处理
-     */
-    @OnClose
-    public void onClose(Session session) {
-        log.info("用户【sessionId={}】关闭连接!", session.getId());
-        // 移除用户
-        WebSocketUsers.removeSession(session);
-        // 获取到信号量则需释放
-        SemaphoreUtils.release(SOCKET_SEMAPHORE);
-    }
-
-    /**
-     * 抛出异常时处理
-     */
-    @OnError
-    public void onError(Session session, Throwable exception) throws Exception {
-        if (session.isOpen()) {
-            // 关闭连接
-            session.close();
-        }
-        String sessionId = session.getId();
-        log.info("用户【sessionId={}】连接异常!异常信息:{}", sessionId, exception);
-        // 移出用户
-        WebSocketUsers.removeSession(session);
-        // 获取到信号量则需释放
-        SemaphoreUtils.release(SOCKET_SEMAPHORE);
-    }
-
-    /**
-     * 收到客户端消息时调用的方法
-     */
-    @OnMessage
-    public void onMessage(Session session, String message) {
-        WebSocketUsers.sendMessage(session, "接收内容:" + message);
-    }
-}

+ 0 - 178
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java

@@ -1,178 +0,0 @@
-package cn.iocoder.yudao.module.infra.websocket;
-
-import cn.hutool.core.map.MapUtil;
-import cn.hutool.core.util.StrUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.bouncycastle.util.Strings;
-
-import javax.validation.constraints.NotNull;
-import javax.websocket.Session;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * websocket 客户端用户
- */
-@Slf4j
-public class WebSocketUsers {
-
-    /**
-     * 用户集
-     *  TODO 需要登录用户的session?
-     */
-    private static final Map<String, Session> SESSION_MAP = new ConcurrentHashMap<>();
-
-    /**
-     * 存储用户
-     *
-     * @param userId  唯一键
-     * @param session 用户信息
-     */
-    public static void addSession(String userId, Session session) {
-        SESSION_MAP.put(userId, session);
-    }
-
-    /**
-     * 移除用户
-     *
-     * @param session 用户信息
-     * @return 移除结果
-     */
-    public static boolean removeSession(Session session) {
-        String key = null;
-        boolean flag = SESSION_MAP.containsValue(session);
-        if (flag) {
-            Set<Map.Entry<String, Session>> entries = SESSION_MAP.entrySet();
-            for (Map.Entry<String, Session> entry : entries) {
-                Session value = entry.getValue();
-                if (value.equals(session)) {
-                    key = entry.getKey();
-                    break;
-                }
-            }
-        } else {
-            return true;
-        }
-        return removeSession(key);
-    }
-
-    /**
-     * 移出用户
-     *
-     * @param userId 用户id
-     */
-    public static boolean removeSession(String userId) {
-        log.info("用户【userId={}】退出", userId);
-        Session remove = SESSION_MAP.remove(userId);
-        if (remove != null) {
-            boolean containsValue = SESSION_MAP.containsValue(remove);
-            log.info("用户【userId={}】退出{},当前连接用户总数:{}", userId, containsValue ? "失败" : "成功", SESSION_MAP.size());
-            return containsValue;
-        } else {
-            return true;
-        }
-    }
-
-    /**
-     * 获取在线用户列表
-     *
-     * @return 返回用户集合
-     */
-    public static Map<String, Session> getUsers() {
-        return SESSION_MAP;
-    }
-
-    /**
-     * 向所有在线人发送消息
-     *
-     * @param message 消息内容
-     */
-    public static void sendMessageToAll(String message) {
-        SESSION_MAP.forEach((userId, session) -> {
-            if (session.isOpen()) {
-                sendMessage(session, message);
-            }
-        });
-    }
-
-    /**
-     * 异步发送文本消息
-     *
-     * @param session 用户session
-     * @param message 消息内容
-     */
-    public static void sendMessageAsync(Session session, String message) {
-        if (session.isOpen()) {
-            // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程?
-            session.getAsyncRemote().sendText(message);
-        } else {
-            log.warn("用户【session={}】不在线", session.getId());
-        }
-    }
-
-    /**
-     * 同步发送文本消息
-     *
-     * @param session 用户session
-     * @param message 消息内容
-     */
-    public static void sendMessage(Session session, String message) {
-        try {
-            if (session.isOpen()) {
-                // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程?
-                session.getBasicRemote().sendText(message);
-            } else {
-                log.warn("用户【session={}】不在线", session.getId());
-            }
-        } catch (IOException e) {
-            log.error("发送消息异常", e);
-        }
-
-    }
-
-    /**
-     * 根据用户id发送消息
-     *
-     * @param userId  用户id
-     * @param message 消息内容
-     */
-    public static void sendMessage(String userId, String message) {
-        Session session = SESSION_MAP.get(userId);
-        //判断是否存在该用户的session,并且是否在线
-        if (session == null || !session.isOpen()) {
-            return;
-        }
-        sendMessage(session, message);
-    }
-
-
-    /**
-     * 获取session中的指定参数值
-     *
-     * @param key     参数key
-     * @param session 用户session
-     */
-    public static String getParam(@NotNull String key, Session session) {
-        //TODO 目前只针对获取一个key的值,后期根据情况拓展多个 或者直接在onClose onOpen上获取参数?
-        String value = null;
-        Map<String, List<String>> parameters = session.getRequestParameterMap();
-        if (MapUtil.isNotEmpty(parameters)) {
-            value = parameters.get(key).get(0);
-        } else {
-            String queryString = session.getQueryString();
-            if (!StrUtil.isEmpty(queryString)) {
-                String[] params = Strings.split(queryString, '&');
-                for (String paramPair : params) {
-                    String[] nameValues = Strings.split(paramPair, '=');
-                    if (key.equals(nameValues[0])) {
-                        value = nameValues[1];
-                    }
-                }
-            }
-        }
-        return value;
-    }
-}

+ 1 - 3
yudao-server/src/main/resources/application.yaml

@@ -146,9 +146,7 @@ yudao:
       - /admin-api/mp/open/** # 微信公众号开放平台,微信回调接口,不需要登录
   websocket:
     enable: true # websocket的开关
-    path: /websocket/message # 路径
-    maxOnlineCount: 0 # 最大连接人数
-    sessionMap: true # 保存sessionMap
+    path: /infra/ws # 路径
   swagger:
     title: 芋道快速开发平台
     description: 提供管理后台、用户 App 的所有功能