Browse Source

add 新增 websocket 群发功能

疯狂的狮子Li 1 year ago
parent
commit
3a67a6599f

+ 5 - 0
ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java

@@ -5,6 +5,7 @@ import lombok.NoArgsConstructor;
 import org.springframework.web.socket.WebSocketSession;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -31,6 +32,10 @@ public class WebSocketSessionHolder {
         return USER_SESSION_MAP.get(sessionKey);
     }
 
+    public static Set<Long> getSessionsAll() {
+        return USER_SESSION_MAP.keySet();
+    }
+
     public static Boolean existSession(Long sessionKey) {
         return USER_SESSION_MAP.containsKey(sessionKey);
     }

+ 6 - 1
ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java

@@ -19,13 +19,18 @@ public class WebSocketTopicListener implements ApplicationRunner, Ordered {
     @Override
     public void run(ApplicationArguments args) throws Exception {
         WebSocketUtils.subscribeMessage((message) -> {
-            log.info("WebSocket主题订阅收到消息session keys={}  message={}!", message.getSessionKeys(), message.getMessage());
+            log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage());
+            // 如果key不为空就按照key发消息 如果为空就群发
             if (CollUtil.isNotEmpty(message.getSessionKeys())) {
                 message.getSessionKeys().forEach(key -> {
                     if (WebSocketSessionHolder.existSession(key)) {
                         WebSocketUtils.sendMessage(key, message.getMessage());
                     }
                 });
+            } else {
+                WebSocketSessionHolder.getSessionsAll().forEach(key -> {
+                    WebSocketUtils.sendMessage(key, message.getMessage());
+                });
             }
         });
         log.info("初始化WebSocket主题订阅监听器成功");

+ 19 - 3
ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java

@@ -1,13 +1,13 @@
 package org.dromara.common.websocket.utils;
 
 import cn.hutool.core.collection.CollUtil;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.dromara.common.core.domain.model.LoginUser;
 import org.dromara.common.redis.utils.RedisUtils;
 import org.dromara.common.websocket.dto.WebSocketMessageDto;
 import org.dromara.common.websocket.holder.WebSocketSessionHolder;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import org.springframework.web.socket.PongMessage;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketMessage;
@@ -77,6 +77,22 @@ public class WebSocketUtils {
         }
     }
 
+    /**
+     * 发布订阅的消息(群发)
+     *
+     * @param message 消息内容
+     */
+    public static void publishAll(String message) {
+        WebSocketSessionHolder.getSessionsAll().forEach(key -> {
+            WebSocketUtils.sendMessage(key, message);
+        });
+        WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
+        broadcastMessage.setMessage(message);
+        RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
+            log.info(" WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
+        });
+    }
+
     public static void sendPongMessage(WebSocketSession session) {
         sendMessage(session, new PongMessage());
     }