Selaa lähdekoodia

[重磅更新] 增加 轻量级 分布式队列 支持

疯狂的狮子li 3 vuotta sitten
vanhempi
commit
08e0ed4fc6

+ 1 - 0
README.md

@@ -33,6 +33,7 @@
 | 序列化框架 | Jackson | [Jackson官网](https://github.com/FasterXML/jackson) | 统一使用 jackson 高效可靠 |
 | Redis客户端 | Redisson | [Redisson文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 支持单机、集群配置 |
 | 分布式限流 | Redisson | [Redisson文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 全局、请求IP、集群ID 多种限流 |
+| 分布式队列 | Redisson | [Redisson文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 普通队列、延迟队列、优先队列 等 |
 | 分布式锁 | Lock4j | [Lock4j官网](https://gitee.com/baomidou/lock4j) | 注解锁、工具锁 多种多样 |
 | 分布式幂等 | Redisson | [Lock4j文档](https://gitee.com/baomidou/lock4j) | 拦截重复提交 |
 | 分布式日志 | TLog | [TLog文档](https://yomahub.com/tlog/docs) | 支持跟踪链路日志记录、性能分析、链路排查 |

+ 215 - 0
ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java

@@ -0,0 +1,215 @@
+package com.ruoyi.common.utils.redis;
+
+import com.ruoyi.common.utils.spring.SpringUtils;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.redisson.api.*;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * 分布式队列工具
+ * 轻量级队列 重量级数据量 请使用 MQ
+ * 要求 redis 5.X 以上
+ *
+ * @author Lion Li
+ * @version 3.6.0 新增
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class QueueUtils {
+
+    private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
+
+
+    /**
+     * 获取客户端实例
+     */
+    public static RedissonClient getClient() {
+        return CLIENT;
+    }
+
+    /**
+     * 添加延迟队列数据 默认毫秒
+     *
+     * @param queueName 队列名
+     * @param data      数据
+     * @param time      延迟时间
+     */
+    public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
+        addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * 添加延迟队列数据
+     *
+     * @param queueName 队列名
+     * @param data      数据
+     * @param time      延迟时间
+     * @param timeUnit  单位
+     */
+    public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
+        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
+        // 已存在则无视
+        if (delayedQueue.contains(data)) {
+            return;
+        }
+        delayedQueue.offer(data, time, timeUnit);
+    }
+
+    /**
+     * 删除延迟队列数据
+     */
+    public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
+        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
+        return delayedQueue.remove(data);
+    }
+
+    /**
+     * 销毁延迟队列 所有阻塞监听 报错
+     */
+    public static <T> void destroyDelayedQueue(String queueName) {
+        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
+        delayedQueue.destroy();
+    }
+
+    /**
+     * 尝试设置 优先队列比较器 用于排序优先级
+     *
+     * @param queueName  队列名
+     * @param comparator 比较器
+     */
+    public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.trySetComparator(comparator);
+    }
+
+    /**
+     * 尝试设置 优先队列比较器 用于排序优先级
+     *
+     * @param queueName  队列名
+     * @param comparator 比较器
+     * @param destroy    已存在是否销毁
+     */
+    public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator, boolean destroy) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        if (priorityBlockingQueue.isExists() && destroy) {
+            destroyPriorityQueueObject(queueName);
+        }
+        return priorityBlockingQueue.trySetComparator(comparator);
+    }
+
+    /**
+     * 添加优先队列数据
+     *
+     * @param queueName 队列名
+     * @param data      数据
+     */
+    public static <T> boolean addPriorityQueueObject(String queueName, T data) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.offer(data);
+    }
+
+    /**
+     * 获取一个优先队列数据 没有数据返回 null
+     *
+     * @param queueName 队列名
+     */
+    public static <T> T getPriorityQueueObject(String queueName) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.poll();
+    }
+
+    /**
+     * 删除优先队列数据
+     */
+    public static <T> boolean removePriorityQueueObject(String queueName, T data) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.remove(data);
+    }
+
+    /**
+     * 销毁优先队列
+     */
+    public static boolean destroyPriorityQueueObject(String queueName) {
+        RPriorityBlockingQueue<?> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.delete();
+    }
+
+    /**
+     * 尝试设置 有界队列 容量 用于限制数量
+     *
+     * @param queueName 队列名
+     * @param capacity  容量
+     */
+    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.trySetCapacity(capacity);
+    }
+
+    /**
+     * 尝试设置 有界队列 容量 用于限制数量
+     *
+     * @param queueName 队列名
+     * @param capacity  容量
+     * @param destroy   已存在是否销毁
+     */
+    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        if (boundedBlockingQueue.isExists() && destroy) {
+            destroyBoundedQueueObject(queueName);
+        }
+        return boundedBlockingQueue.trySetCapacity(capacity);
+    }
+
+    /**
+     * 添加有界队列数据
+     *
+     * @param queueName 队列名
+     * @param data      数据
+     * @return 添加成功 true 已达到界限 false
+     */
+    public static <T> boolean addBoundedQueueObject(String queueName, T data) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.offer(data);
+    }
+
+    /**
+     * 获取一个有界队列数据 没有数据返回 null
+     *
+     * @param queueName 队列名
+     */
+    public static <T> T getBoundedQueueObject(String queueName) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.poll();
+    }
+
+    /**
+     * 删除有界队列数据
+     */
+    public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.remove(data);
+    }
+
+    /**
+     * 销毁有界队列
+     */
+    public static boolean destroyBoundedQueueObject(String queueName) {
+        RBoundedBlockingQueue<?> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.delete();
+    }
+
+    /**
+     * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)
+     */
+    public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) {
+        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+        queue.subscribeOnElements(consumer);
+    }
+
+}

+ 83 - 0
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java

@@ -0,0 +1,83 @@
+package com.ruoyi.demo.controller.queue;
+
+import com.ruoyi.common.core.domain.AjaxResult;
+import com.ruoyi.common.utils.redis.QueueUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 有界队列 演示案例
+ * <p>
+ * 轻量级队列 重量级数据量 请使用 MQ
+ * <p>
+ * 集群测试通过 同一个数据只会被消费一次 做好事务补偿
+ * 集群测试流程 在其中一台发送数据 两端分别调用获取接口 一次获取一条
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Slf4j
+@Api(value = "有界队列 演示案例", tags = {"有界队列"})
+@RequiredArgsConstructor(onConstructor_ = @Autowired)
+@RestController
+@RequestMapping("/demo/queue/bounded")
+public class BoundedQueueController {
+
+
+    @ApiOperation("添加队列数据")
+    @GetMapping("/add")
+    public AjaxResult<Void> add(@ApiParam("队列名") String queueName,
+                                @ApiParam("容量") int capacity) {
+        // 用完了一定要销毁 否则会一直存在
+        boolean b = QueueUtils.destroyBoundedQueueObject(queueName);
+        log.info("通道: {} , 删除: {}", queueName, b);
+        // 初始化设置一次即可
+        if (QueueUtils.trySetBoundedQueueCapacity(queueName, capacity)) {
+            log.info("通道: {} , 设置容量: {}", queueName, capacity);
+        } else {
+            log.info("通道: {} , 设置容量失败", queueName);
+            return AjaxResult.error("操作失败");
+        }
+        for (int i = 0; i < 11; i++) {
+            String data = "data-" + i;
+            boolean flag = QueueUtils.addBoundedQueueObject(queueName, data);
+            if (flag == false) {
+                log.info("通道: {} , 发送数据: {} 失败, 通道已满", queueName, data);
+            } else {
+                log.info("通道: {} , 发送数据: {}", queueName, data);
+            }
+        }
+        return AjaxResult.success("操作成功");
+    }
+
+    @ApiOperation("删除队列数据")
+    @GetMapping("/remove")
+    public AjaxResult<Void> remove(@ApiParam("队列名") String queueName) {
+        String data = "data-" + 5;
+        if (QueueUtils.removeBoundedQueueObject(queueName, data)) {
+            log.info("通道: {} , 删除数据: {}", queueName, data);
+        } else {
+            return AjaxResult.error("操作失败");
+        }
+        return AjaxResult.success("操作成功");
+    }
+
+    @ApiOperation("获取队列数据")
+    @GetMapping("/get")
+    public AjaxResult<Void> get(@ApiParam("队列名") String queueName) {
+        String data;
+        do {
+            data = QueueUtils.getBoundedQueueObject(queueName);
+            log.info("通道: {} , 获取数据: {}", queueName, data);
+        } while (data != null);
+        return AjaxResult.success("操作成功");
+    }
+
+}

+ 79 - 0
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java

@@ -0,0 +1,79 @@
+package com.ruoyi.demo.controller.queue;
+
+import com.ruoyi.common.core.domain.AjaxResult;
+import com.ruoyi.common.utils.redis.QueueUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 延迟队列 演示案例
+ * <p>
+ * 轻量级队列 重量级数据量 请使用 MQ
+ * 例如: 创建订单30分钟后过期处理
+ * <p>
+ * 集群测试通过 同一个数据只会被消费一次 做好事务补偿
+ * 集群测试流程 两台集群分别开启订阅 在其中一台发送数据 观察接收消息的规律
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Slf4j
+@Api(value = "延迟队列 演示案例", tags = {"延迟队列"})
+@RequiredArgsConstructor(onConstructor_ = @Autowired)
+@RestController
+@RequestMapping("/demo/queue/delayed")
+public class DelayedQueueController {
+
+    @ApiOperation("订阅队列")
+    @GetMapping("/subscribe")
+    public AjaxResult<Void> subscribe(@ApiParam("队列名") String queueName) {
+        log.info("通道: {} 监听中......", queueName);
+        // 项目初始化设置一次即可
+        QueueUtils.subscribeBlockingQueue(queueName, (String orderNum) -> {
+            // 观察接收时间
+            log.info("通道: {}, 收到数据: {}", queueName, orderNum);
+        });
+        return AjaxResult.success("操作成功");
+    }
+
+    @ApiOperation("添加队列数据")
+    @GetMapping("/add")
+    public AjaxResult<Void> add(@ApiParam("队列名") String queueName,
+                                @ApiParam("订单号") String orderNum,
+                                @ApiParam("延迟时间(秒)") Long time) {
+        QueueUtils.addDelayedQueueObject(queueName, orderNum, time, TimeUnit.SECONDS);
+        // 观察发送时间
+        log.info("通道: {} , 发送数据: {}", queueName, orderNum);
+        return AjaxResult.success("操作成功");
+    }
+
+    @ApiOperation("删除队列数据")
+    @GetMapping("/remove")
+    public AjaxResult<Void> remove(@ApiParam("队列名") String queueName,
+                                   @ApiParam("订单号") String orderNum) {
+        if (QueueUtils.removeDelayedQueueObject(queueName, orderNum)) {
+            log.info("通道: {} , 删除数据: {}", queueName, orderNum);
+        } else {
+            return AjaxResult.error("操作失败");
+        }
+        return AjaxResult.success("操作成功");
+    }
+
+    @ApiOperation("销毁队列")
+    @GetMapping("/destroy")
+    public AjaxResult<Void> destroy(@ApiParam("队列名") String queueName) {
+        // 用完了一定要销毁 否则会一直存在
+        QueueUtils.destroyDelayedQueue(queueName);
+        return AjaxResult.success("操作成功");
+    }
+
+}

+ 19 - 0
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java

@@ -0,0 +1,19 @@
+package com.ruoyi.demo.controller.queue;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+
+/**
+ * 实体类 注意不允许使用内部类 否则会找不到类
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Data
+@Accessors(chain = true)
+@NoArgsConstructor
+public class PriorityDemo {
+    private String name;
+    private Integer orderNum;
+}

+ 16 - 0
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java

@@ -0,0 +1,16 @@
+package com.ruoyi.demo.controller.queue;
+
+import java.util.Comparator;
+
+/**
+ * 比较器 注意不允许使用 内部类或匿名类或lambda表达式 会找不到类
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+public class PriorityDemoComparator implements Comparator<PriorityDemo> {
+    @Override
+    public int compare(PriorityDemo pd1, PriorityDemo pd2) {
+        return Integer.compare(pd1.getOrderNum(), pd2.getOrderNum());
+    }
+}

+ 85 - 0
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java

@@ -0,0 +1,85 @@
+package com.ruoyi.demo.controller.queue;
+
+import cn.hutool.core.util.RandomUtil;
+import com.ruoyi.common.core.domain.AjaxResult;
+import com.ruoyi.common.utils.redis.QueueUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 优先队列 演示案例
+ * <p>
+ * 轻量级队列 重量级数据量 请使用 MQ
+ * <p>
+ * 集群测试通过 同一个消息只会被消费一次 做好事务补偿
+ * 集群测试流程 在其中一台发送数据 两端分别调用获取接口 一次获取一条
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Slf4j
+@Api(value = "优先队列 演示案例", tags = {"优先队列"})
+@RequiredArgsConstructor(onConstructor_ = @Autowired)
+@RestController
+@RequestMapping("/demo/queue/priority")
+public class PriorityQueueController {
+
+    @ApiOperation("添加队列数据")
+    @GetMapping("/add")
+    public AjaxResult<Void> add(@ApiParam("队列名") String queueName) {
+        // 用完了一定要销毁 否则会一直存在
+        boolean b = QueueUtils.destroyPriorityQueueObject(queueName);
+        log.info("通道: {} , 删除: {}", queueName, b);
+        // 初始化设置一次即可 此处注意 不允许用内部类或匿名类
+        boolean flag = QueueUtils.trySetPriorityQueueComparator(queueName, new PriorityDemoComparator());
+        if (flag) {
+            log.info("通道: {} , 设置比较器成功", queueName);
+        } else {
+            log.info("通道: {} , 设置比较器失败", queueName);
+            return AjaxResult.error("操作失败");
+        }
+        for (int i = 0; i < 10; i++) {
+            int randomNum = RandomUtil.randomInt(10);
+            PriorityDemo data = new PriorityDemo().setName("data-" + i).setOrderNum(randomNum);
+            if (QueueUtils.addPriorityQueueObject(queueName, data)) {
+                log.info("通道: {} , 发送数据: {}", queueName, data);
+            } else {
+                log.info("通道: {} , 发送数据: {}, 发送失败", queueName, data);
+            }
+        }
+        return AjaxResult.success("操作成功");
+    }
+
+    @ApiOperation("删除队列数据")
+    @GetMapping("/remove")
+    public AjaxResult<Void> remove(@ApiParam("队列名") String queueName,
+                                   @ApiParam("对象名") String name,
+                                   @ApiParam("排序号") Integer orderNum) {
+        PriorityDemo data = new PriorityDemo().setName(name).setOrderNum(orderNum);
+        if (QueueUtils.removePriorityQueueObject(queueName, data)) {
+            log.info("通道: {} , 删除数据: {}", queueName, data);
+        } else {
+            return AjaxResult.error("操作失败");
+        }
+        return AjaxResult.success("操作成功");
+    }
+
+    @ApiOperation("获取队列数据")
+    @GetMapping("/get")
+    public AjaxResult<Void> get(@ApiParam("队列名") String queueName) {
+        PriorityDemo data;
+        do {
+            data = QueueUtils.getPriorityQueueObject(queueName);
+            log.info("通道: {} , 获取数据: {}", queueName, data);
+        } while (data != null);
+        return AjaxResult.success("操作成功");
+    }
+
+}