DelayedQueueController.java 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package com.ruoyi.demo.controller.queue;
  2. import com.ruoyi.common.core.domain.R;
  3. import com.ruoyi.common.utils.redis.QueueUtils;
  4. import lombok.RequiredArgsConstructor;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * 延迟队列 演示案例
  12. * <p>
  13. * 轻量级队列 重量级数据量 请使用 MQ
  14. * 例如: 创建订单30分钟后过期处理
  15. * <p>
  16. * 集群测试通过 同一个数据只会被消费一次 做好事务补偿
  17. * 集群测试流程 两台集群分别开启订阅 在其中一台发送数据 观察接收消息的规律
  18. *
  19. * @author Lion Li
  20. * @version 3.6.0
  21. */
  22. @Slf4j
  23. @RequiredArgsConstructor
  24. @RestController
  25. @RequestMapping("/demo/queue/delayed")
  26. public class DelayedQueueController {
  27. /**
  28. * 订阅队列
  29. *
  30. * @param queueName 队列名
  31. */
  32. @GetMapping("/subscribe")
  33. public R<Void> subscribe(String queueName) {
  34. log.info("通道: {} 监听中......", queueName);
  35. // 项目初始化设置一次即可
  36. QueueUtils.subscribeBlockingQueue(queueName, (String orderNum) -> {
  37. // 观察接收时间
  38. log.info("通道: {}, 收到数据: {}", queueName, orderNum);
  39. });
  40. return R.ok("操作成功");
  41. }
  42. /**
  43. * 添加队列数据
  44. *
  45. * @param queueName 队列名
  46. * @param orderNum 订单号
  47. * @param time 延迟时间(秒)
  48. */
  49. @GetMapping("/add")
  50. public R<Void> add(String queueName, String orderNum, Long time) {
  51. QueueUtils.addDelayedQueueObject(queueName, orderNum, time, TimeUnit.SECONDS);
  52. // 观察发送时间
  53. log.info("通道: {} , 发送数据: {}", queueName, orderNum);
  54. return R.ok("操作成功");
  55. }
  56. /**
  57. * 删除队列数据
  58. *
  59. * @param queueName 队列名
  60. * @param orderNum 订单号
  61. */
  62. @GetMapping("/remove")
  63. public R<Void> remove(String queueName, String orderNum) {
  64. if (QueueUtils.removeDelayedQueueObject(queueName, orderNum)) {
  65. log.info("通道: {} , 删除数据: {}", queueName, orderNum);
  66. } else {
  67. return R.fail("操作失败");
  68. }
  69. return R.ok("操作成功");
  70. }
  71. /**
  72. * 销毁队列
  73. *
  74. * @param queueName 队列名
  75. */
  76. @GetMapping("/destroy")
  77. public R<Void> destroy(String queueName) {
  78. // 用完了一定要销毁 否则会一直存在
  79. QueueUtils.destroyDelayedQueue(queueName);
  80. return R.ok("操作成功");
  81. }
  82. }