Prechádzať zdrojové kódy

!317 集成websocket功能
* add 增加 ruoyi-common-websocket 模块 支持token鉴权 支持分布式集群消息同步

zendwang 2 rokov pred
rodič
commit
65ae5ab362
15 zmenil súbory, kde vykonal 571 pridanie a 0 odobranie
  1. 8 0
      ruoyi-admin/src/main/resources/application.yml
  2. 6 0
      ruoyi-common/ruoyi-common-bom/pom.xml
  3. 41 0
      ruoyi-common/ruoyi-common-websocket/pom.xml
  4. 62 0
      ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/config/WebSocketConfig.java
  5. 27 0
      ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/config/properties/WebSocketProperties.java
  6. 28 0
      ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/constant/WebSocketConstants.java
  7. 29 0
      ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/dto/WebSocketMessageDto.java
  8. 104 0
      ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/handler/PlusWebSocketHandler.java
  9. 37 0
      ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/holder/WebSocketSessionHolder.java
  10. 51 0
      ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/interceptor/PlusWebSocketInterceptor.java
  11. 38 0
      ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/listener/WebSocketTopicListener.java
  12. 102 0
      ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/utils/WebSocketUtils.java
  13. 1 0
      ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
  14. 4 0
      ruoyi-modules/ruoyi-demo/pom.xml
  15. 33 0
      ruoyi-modules/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/WeSocketController.java

+ 8 - 0
ruoyi-admin/src/main/resources/application.yml

@@ -280,3 +280,11 @@ management:
       show-details: ALWAYS
     logfile:
       external-file: ./logs/sys-console.log
+
+--- # websocket
+websocket:
+  enabled: true
+  # 路径
+  path: /websocket
+  # 设置访问源地址
+  allowedOrigins: '*'

+ 6 - 0
ruoyi-common/ruoyi-common-bom/pom.xml

@@ -159,6 +159,12 @@
                 <version>${revision}</version>
             </dependency>
 
+            <!-- WebSocket模块 -->
+            <dependency>
+                <groupId>com.ruoyi</groupId>
+                <artifactId>ruoyi-common-websocket</artifactId>
+                <version>${revision}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

+ 41 - 0
ruoyi-common/ruoyi-common-websocket/pom.xml

@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.ruoyi</groupId>
+        <artifactId>ruoyi-common</artifactId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>ruoyi-common-websocket</artifactId>
+
+    <description>
+        ruoyi-common-websocket 模块
+    </description>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.ruoyi</groupId>
+            <artifactId>ruoyi-common-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ruoyi</groupId>
+            <artifactId>ruoyi-common-redis</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ruoyi</groupId>
+            <artifactId>ruoyi-common-satoken</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ruoyi</groupId>
+            <artifactId>ruoyi-common-json</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+    </dependencies>
+</project>

+ 62 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/config/WebSocketConfig.java

@@ -0,0 +1,62 @@
+package com.ruoyi.common.websocket.config;
+
+import cn.hutool.core.util.StrUtil;
+import com.ruoyi.common.websocket.config.properties.WebSocketProperties;
+import com.ruoyi.common.websocket.constant.WebSocketConstants;
+import com.ruoyi.common.websocket.handler.PlusWebSocketHandler;
+import com.ruoyi.common.websocket.interceptor.PlusWebSocketInterceptor;
+import com.ruoyi.common.websocket.listener.WebSocketTopicListener;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+/**
+ * WebSocket 配置
+ *
+ * @author zendwang
+ */
+@AutoConfiguration
+@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
+@EnableConfigurationProperties(WebSocketProperties.class)
+@EnableWebSocket
+public class WebSocketConfig {
+
+    @Bean
+    public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor,
+                                                   WebSocketHandler webSocketHandler,
+                                                   WebSocketProperties webSocketProperties) {
+        if (StrUtil.isBlank(webSocketProperties.getPath())) {
+            webSocketProperties.setPath("/websocket");
+        }
+
+        if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) {
+            webSocketProperties.setAllowedOrigins("*");
+        }
+
+        return registry -> registry
+            .addHandler(webSocketHandler, webSocketProperties.getPath())
+            .addInterceptors(handshakeInterceptor)
+            .setAllowedOrigins(webSocketProperties.getAllowedOrigins());
+    }
+
+    @Bean
+    public HandshakeInterceptor handshakeInterceptor() {
+        return new PlusWebSocketInterceptor();
+    }
+
+    @Bean
+    public WebSocketHandler webSocketHandler() {
+        return new PlusWebSocketHandler();
+    }
+
+    @Bean
+    public WebSocketTopicListener topicListener() {
+        return new WebSocketTopicListener();
+    }
+}

+ 27 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/config/properties/WebSocketProperties.java

@@ -0,0 +1,27 @@
+package com.ruoyi.common.websocket.config.properties;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+/**
+ * WebSocket 配置项
+ *
+ * @author zendwang
+ */
+@ConfigurationProperties("websocket")
+@Data
+public class WebSocketProperties {
+
+    private Boolean enable;
+
+    /**
+     * 路径
+     */
+    private String path;
+
+    /**
+     *  设置访问源地址
+     */
+    private String allowedOrigins;
+}

+ 28 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/constant/WebSocketConstants.java

@@ -0,0 +1,28 @@
+package com.ruoyi.common.websocket.constant;
+
+/**
+ * websocket的常量配置
+ *
+ * @author zendwang
+ */
+public interface WebSocketConstants {
+    /**
+     * websocketSession中的参数的key
+     */
+    String LOGIN_USER_KEY = "loginUser";
+
+    /**
+     * 订阅的频道
+     */
+    String WEB_SOCKET_TOPIC = "global:websocket";
+
+    /**
+     * 前端心跳检查的命令
+     */
+    String PING = "ping";
+
+    /**
+     * 服务端心跳恢复的字符串
+     */
+    String PONG = "pong";
+}

+ 29 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/dto/WebSocketMessageDto.java

@@ -0,0 +1,29 @@
+package com.ruoyi.common.websocket.dto;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * 消息的dto
+ *
+ * @author zendwang
+ */
+@Builder
+@Data
+public class WebSocketMessageDto implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 需要推送到的session key 列表
+     */
+    private List<Long> sessionKeys;
+
+    /**
+     * 需要发送的消息
+     */
+    private String message;
+}

+ 104 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/handler/PlusWebSocketHandler.java

@@ -0,0 +1,104 @@
+package com.ruoyi.common.websocket.handler;
+
+import com.ruoyi.common.core.domain.model.LoginUser;
+import com.ruoyi.common.websocket.dto.WebSocketMessageDto;
+import com.ruoyi.common.websocket.holder.WebSocketSessionHolder;
+import com.ruoyi.common.websocket.utils.WebSocketUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.socket.*;
+import org.springframework.web.socket.handler.AbstractWebSocketHandler;
+
+import java.util.List;
+
+import static com.ruoyi.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
+
+/**
+ * WebSocketHandler 实现类
+ *
+ * @author zendwang
+ */
+@Slf4j
+public class PlusWebSocketHandler extends AbstractWebSocketHandler {
+
+    /**
+     * 连接成功后
+     *
+     * @param session
+     */
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) {
+        LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
+        WebSocketSessionHolder.addSession(loginUser.getUserId(), session);
+        log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
+    }
+
+    /**
+     * 处理发送来的文本消息
+     *
+     * @param session
+     * @param message
+     * @throws Exception
+     */
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
+        log.info("PlusWebSocketHandler, 连接:" + session.getId() + ",已收到消息:" + message.getPayload());
+        List<Long> userIds = List.of(loginUser.getUserId());
+        WebSocketMessageDto webSocketMessageDto = WebSocketMessageDto.builder()
+            .sessionKeys(userIds).message(message.getPayload()).build();
+        WebSocketUtils.publishMessage(webSocketMessageDto);
+    }
+
+    @Override
+    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
+        super.handleBinaryMessage(session, message);
+    }
+
+    /**
+     * 心跳监测的回复
+     *
+     * @param session
+     * @param message
+     * @throws Exception
+     */
+    @Override
+    protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
+        WebSocketUtils.sendPongMessage(session);
+    }
+
+    /**
+     * 连接出错时
+     *
+     * @param session
+     * @param exception
+     * @throws Exception
+     */
+    @Override
+    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
+        log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
+    }
+
+    /**
+     * 连接关闭后
+     *
+     * @param session
+     * @param status
+     */
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
+        LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
+        WebSocketSessionHolder.removeSession(loginUser.getUserId());
+        log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
+    }
+
+    /**
+     * 是否支持分片消息
+     *
+     * @return
+     */
+    @Override
+    public boolean supportsPartialMessages() {
+        return false;
+    }
+
+}

+ 37 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/holder/WebSocketSessionHolder.java

@@ -0,0 +1,37 @@
+package com.ruoyi.common.websocket.holder;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * WebSocketSession 用于保存当前所有在线的会话信息
+ *
+ * @author zendwang
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class WebSocketSessionHolder {
+
+    private static final Map<Long, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();
+
+    public static void addSession(Long sessionKey, WebSocketSession session) {
+        USER_SESSION_MAP.put(sessionKey, session);
+    }
+
+    public static void removeSession(Long sessionKey) {
+        if (USER_SESSION_MAP.containsKey(sessionKey)) {
+            USER_SESSION_MAP.remove(sessionKey);
+        }
+    }
+
+    public static WebSocketSession getSessions(Long sessionKey) {
+        return USER_SESSION_MAP.get(sessionKey);
+    }
+
+    public static Boolean existSession(Long sessionKey) {
+        return USER_SESSION_MAP.containsKey(sessionKey);
+    }
+}

+ 51 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/interceptor/PlusWebSocketInterceptor.java

@@ -0,0 +1,51 @@
+package com.ruoyi.common.websocket.interceptor;
+
+import com.ruoyi.common.core.domain.model.LoginUser;
+import com.ruoyi.common.satoken.utils.LoginHelper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+import java.util.Map;
+
+import static com.ruoyi.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
+
+/**
+ * WebSocket握手请求的拦截器
+ *
+ * @author zendwang
+ */
+@Slf4j
+public class PlusWebSocketInterceptor implements HandshakeInterceptor {
+
+    /**
+     * 握手前
+     *
+     * @param request    request
+     * @param response   response
+     * @param wsHandler  wsHandler
+     * @param attributes attributes
+     * @return 是否握手成功
+     */
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
+        LoginUser loginUser = LoginHelper.getLoginUser();
+        attributes.put(LOGIN_USER_KEY, loginUser);
+        return true;
+    }
+
+    /**
+     * 握手后
+     *
+     * @param request   request
+     * @param response  response
+     * @param wsHandler wsHandler
+     * @param exception 异常
+     */
+    @Override
+    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
+
+    }
+}

+ 38 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/listener/WebSocketTopicListener.java

@@ -0,0 +1,38 @@
+package com.ruoyi.common.websocket.listener;
+
+import cn.hutool.core.collection.CollUtil;
+import com.ruoyi.common.websocket.holder.WebSocketSessionHolder;
+import com.ruoyi.common.websocket.utils.WebSocketUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.Ordered;
+
+/**
+ * WebSocket 主题订阅监听器
+ *
+ * @author zendwang
+ */
+@Slf4j
+public class WebSocketTopicListener implements ApplicationRunner, Ordered {
+
+    @Override
+    public void run(ApplicationArguments args) throws Exception {
+        WebSocketUtils.subscribeMessage((message) -> {
+            log.info("WebSocket主题订阅收到消息session keys={}  message={}!", message.getSessionKeys(), message.getMessage());
+            if (CollUtil.isNotEmpty(message.getSessionKeys())) {
+                message.getSessionKeys().forEach(key -> {
+                    if (WebSocketSessionHolder.existSession(key)) {
+                        WebSocketUtils.sendMessage(key, message.getMessage());
+                    }
+                });
+            }
+        });
+        log.info("初始化WebSocket主题订阅监听器成功");
+    }
+
+    @Override
+    public int getOrder() {
+        return -1;
+    }
+}

+ 102 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/utils/WebSocketUtils.java

@@ -0,0 +1,102 @@
+package com.ruoyi.common.websocket.utils;
+
+import cn.hutool.core.collection.CollUtil;
+import com.ruoyi.common.core.domain.model.LoginUser;
+import com.ruoyi.common.json.utils.JsonUtils;
+import com.ruoyi.common.redis.utils.RedisUtils;
+import com.ruoyi.common.satoken.utils.LoginHelper;
+import com.ruoyi.common.websocket.dto.WebSocketMessageDto;
+import com.ruoyi.common.websocket.holder.WebSocketSessionHolder;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.socket.PongMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static com.ruoyi.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
+import static com.ruoyi.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC;
+
+/**
+ * 工具类
+ *
+ * @author zendwang
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class WebSocketUtils {
+
+    /**
+     * 发送消息
+     * @param sessionKey
+     * @param message
+     */
+    public static void sendMessage(Long sessionKey, String message) {
+        WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
+        sendMessage(session, message);
+    }
+
+    /**
+     * 订阅消息
+     *
+     * @param consumer
+     */
+    public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
+        RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
+    }
+
+    /**
+     * 发布订阅的消息
+     *
+     * @param webSocketMessage
+     */
+    public static void publishMessage(WebSocketMessageDto webSocketMessage) {
+        List<Long> unsentSessionKeys = new ArrayList<>();
+        // 当前服务内session,直接发送消息
+        for (Long sessionKey: webSocketMessage.getSessionKeys()) {
+            if (WebSocketSessionHolder.existSession(sessionKey)) {
+                WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
+                continue;
+            }
+            unsentSessionKeys.add(sessionKey);
+        }
+        // 不在当前服务内session,发布订阅消息
+        if (CollUtil.isNotEmpty(unsentSessionKeys)) {
+            WebSocketMessageDto broadcastMessage = WebSocketMessageDto.builder()
+                .message(webSocketMessage.getMessage()).sessionKeys(unsentSessionKeys).build();
+            RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage,  consumer -> {
+                log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
+                    WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
+            });
+        }
+    }
+
+    public static void sendPongMessage(WebSocketSession session) {
+        sendMessage(session, new PongMessage());
+    }
+
+    public static void sendMessage(WebSocketSession session, String message) {
+        sendMessage(session, new TextMessage(message));
+    }
+
+    private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
+        if (session == null || !session.isOpen()) {
+            log.error("[send] session会话已经关闭");
+        } else {
+            try {
+                // 获取当前会话中的用户
+                LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
+                session.sendMessage(message);
+                log.info("[send] sessionId: {},userId:{},userType:{},message:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType(), message);
+            } catch (IOException e) {
+                log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
+            }
+        }
+    }
+}

+ 1 - 0
ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

@@ -0,0 +1 @@
+com.ruoyi.common.websocket.config.WebSocketConfig

+ 4 - 0
ruoyi-modules/ruoyi-demo/pom.xml

@@ -94,6 +94,10 @@
             <artifactId>ruoyi-common-tenant</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.ruoyi</groupId>
+            <artifactId>ruoyi-common-websocket</artifactId>
+        </dependency>
         <!-- 短信 用哪个导入哪个依赖 -->
 <!--        <dependency>-->
 <!--            <groupId>com.aliyun</groupId>-->

+ 33 - 0
ruoyi-modules/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/WeSocketController.java

@@ -0,0 +1,33 @@
+package com.ruoyi.demo.controller;
+
+import com.ruoyi.common.core.domain.R;
+import com.ruoyi.common.websocket.dto.WebSocketMessageDto;
+import com.ruoyi.common.websocket.utils.WebSocketUtils;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * WebSocket 演示案例
+ *
+ * @author zendwang
+ */
+@RequiredArgsConstructor
+@RestController
+@RequestMapping("/demo/websocket")
+@Slf4j
+public class WeSocketController {
+
+    /**
+     * 发布消息
+     *
+     * @param dto 发送内容
+     */
+    @GetMapping("/send")
+    public R<Void> send(WebSocketMessageDto dto) throws InterruptedException {
+        WebSocketUtils.publishMessage(dto);
+        return R.ok("操作成功");
+    }
+}