BoundedQueueController.java 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package com.ruoyi.demo.controller.queue;
  2. import com.ruoyi.common.core.domain.R;
  3. import com.ruoyi.common.utils.redis.QueueUtils;
  4. import io.swagger.v3.oas.annotations.Operation;
  5. import io.swagger.v3.oas.annotations.Parameter;
  6. import io.swagger.v3.oas.annotations.tags.Tag;
  7. import lombok.RequiredArgsConstructor;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.springframework.web.bind.annotation.GetMapping;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. /**
  13. * 有界队列 演示案例
  14. * <p>
  15. * 轻量级队列 重量级数据量 请使用 MQ
  16. * <p>
  17. * 集群测试通过 同一个数据只会被消费一次 做好事务补偿
  18. * 集群测试流程 在其中一台发送数据 两端分别调用获取接口 一次获取一条
  19. *
  20. * @author Lion Li
  21. * @version 3.6.0
  22. */
  23. @Slf4j
  24. @Tag(name ="有界队列 演示案例", description = "有界队列")
  25. @RequiredArgsConstructor
  26. @RestController
  27. @RequestMapping("/demo/queue/bounded")
  28. public class BoundedQueueController {
  29. @Operation(summary = "添加队列数据")
  30. @GetMapping("/add")
  31. public R<Void> add(@Parameter(name = "队列名") String queueName,
  32. @Parameter(name = "容量") int capacity) {
  33. // 用完了一定要销毁 否则会一直存在
  34. boolean b = QueueUtils.destroyBoundedQueueObject(queueName);
  35. log.info("通道: {} , 删除: {}", queueName, b);
  36. // 初始化设置一次即可
  37. if (QueueUtils.trySetBoundedQueueCapacity(queueName, capacity)) {
  38. log.info("通道: {} , 设置容量: {}", queueName, capacity);
  39. } else {
  40. log.info("通道: {} , 设置容量失败", queueName);
  41. return R.fail("操作失败");
  42. }
  43. for (int i = 0; i < 11; i++) {
  44. String data = "data-" + i;
  45. boolean flag = QueueUtils.addBoundedQueueObject(queueName, data);
  46. if (flag == false) {
  47. log.info("通道: {} , 发送数据: {} 失败, 通道已满", queueName, data);
  48. } else {
  49. log.info("通道: {} , 发送数据: {}", queueName, data);
  50. }
  51. }
  52. return R.ok("操作成功");
  53. }
  54. @Operation(summary = "删除队列数据")
  55. @GetMapping("/remove")
  56. public R<Void> remove(@Parameter(name = "队列名") String queueName) {
  57. String data = "data-" + 5;
  58. if (QueueUtils.removeBoundedQueueObject(queueName, data)) {
  59. log.info("通道: {} , 删除数据: {}", queueName, data);
  60. } else {
  61. return R.fail("操作失败");
  62. }
  63. return R.ok("操作成功");
  64. }
  65. @Operation(summary = "获取队列数据")
  66. @GetMapping("/get")
  67. public R<Void> get(@Parameter(name = "队列名") String queueName) {
  68. String data;
  69. do {
  70. data = QueueUtils.getBoundedQueueObject(queueName);
  71. log.info("通道: {} , 获取数据: {}", queueName, data);
  72. } while (data != null);
  73. return R.ok("操作成功");
  74. }
  75. }