|
@@ -61,17 +61,17 @@ public class RedisPendingMessageResendJob {
|
|
|
|
|
|
listeners.forEach(listener -> {
|
|
|
PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName);
|
|
|
- // 每个消费者的pending消息数量
|
|
|
+ // 每个消费者的 pending 队列消息数量
|
|
|
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
|
|
|
pendingMessagesPerConsumer.entrySet().forEach(entry -> {
|
|
|
String consumerName = entry.getKey();
|
|
|
Long pendingMessageCount = entry.getValue();
|
|
|
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
|
|
|
|
|
|
- // 从消费者的pending队列中读取消息
|
|
|
- List<MapRecord<String, Object, Object>> retVal = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0")));
|
|
|
- if (CollUtil.isNotEmpty(retVal)) {
|
|
|
- for (MapRecord<String, Object, Object> record : retVal) {
|
|
|
+ // 从消费者的 pending 队列中读取消息
|
|
|
+ List<MapRecord<String, Object, Object>> records = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0")));
|
|
|
+ if (CollUtil.isNotEmpty(records)) {
|
|
|
+ for (MapRecord<String, Object, Object> record : records) {
|
|
|
// 重新投递消息
|
|
|
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
|
|
|
.ofObject(record.getValue()) // 设置内容
|