BoundedQueueController.java 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package com.ruoyi.demo.controller.queue;
  2. import com.ruoyi.common.core.domain.R;
  3. import com.ruoyi.common.utils.redis.QueueUtils;
  4. import io.swagger.v3.oas.annotations.tags.Tag;
  5. import lombok.RequiredArgsConstructor;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10. /**
  11. * 有界队列 演示案例
  12. * <p>
  13. * 轻量级队列 重量级数据量 请使用 MQ
  14. * <p>
  15. * 集群测试通过 同一个数据只会被消费一次 做好事务补偿
  16. * 集群测试流程 在其中一台发送数据 两端分别调用获取接口 一次获取一条
  17. *
  18. * @author Lion Li
  19. * @version 3.6.0
  20. */
  21. @Slf4j
  22. @Tag(name = "有界队列 演示案例", description = "有界队列")
  23. @RequiredArgsConstructor
  24. @RestController
  25. @RequestMapping("/demo/queue/bounded")
  26. public class BoundedQueueController {
  27. /**
  28. * 添加队列数据
  29. *
  30. * @param queueName 队列名
  31. * @param capacity 容量
  32. */
  33. @GetMapping("/add")
  34. public R<Void> add(String queueName, int capacity) {
  35. // 用完了一定要销毁 否则会一直存在
  36. boolean b = QueueUtils.destroyBoundedQueueObject(queueName);
  37. log.info("通道: {} , 删除: {}", queueName, b);
  38. // 初始化设置一次即可
  39. if (QueueUtils.trySetBoundedQueueCapacity(queueName, capacity)) {
  40. log.info("通道: {} , 设置容量: {}", queueName, capacity);
  41. } else {
  42. log.info("通道: {} , 设置容量失败", queueName);
  43. return R.fail("操作失败");
  44. }
  45. for (int i = 0; i < 11; i++) {
  46. String data = "data-" + i;
  47. boolean flag = QueueUtils.addBoundedQueueObject(queueName, data);
  48. if (flag == false) {
  49. log.info("通道: {} , 发送数据: {} 失败, 通道已满", queueName, data);
  50. } else {
  51. log.info("通道: {} , 发送数据: {}", queueName, data);
  52. }
  53. }
  54. return R.ok("操作成功");
  55. }
  56. /**
  57. * 删除队列数据
  58. *
  59. * @param queueName 队列名
  60. */
  61. @GetMapping("/remove")
  62. public R<Void> remove(String queueName) {
  63. String data = "data-" + 5;
  64. if (QueueUtils.removeBoundedQueueObject(queueName, data)) {
  65. log.info("通道: {} , 删除数据: {}", queueName, data);
  66. } else {
  67. return R.fail("操作失败");
  68. }
  69. return R.ok("操作成功");
  70. }
  71. /**
  72. * 获取队列数据
  73. *
  74. * @param queueName 队列名
  75. */
  76. @GetMapping("/get")
  77. public R<Void> get(String queueName) {
  78. String data;
  79. do {
  80. data = QueueUtils.getBoundedQueueObject(queueName);
  81. log.info("通道: {} , 获取数据: {}", queueName, data);
  82. } while (data != null);
  83. return R.ok("操作成功");
  84. }
  85. }