您好,登錄后才能下訂單哦!
在SpringBoot中如何使用RedisTemplate重新消費Redis Stream中未ACK的消息,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
消費組從stream中獲取到消息后,會分配給自己組中其中的一個消費者進行消費,消費者消費完畢,需要給消費組返回ACK,表示這條消息已經消費完畢了。
當消費者從消費組獲取到消息的時候,會先把消息添加到自己的pending消息列表,當消費者給消費組返回ACK的時候,就會把這條消息從pending隊列刪除。(每個消費者都有自己的pending消息隊列)
消費者可能沒有及時的返回ACK。例如消費者消費完畢后,宕機,沒有及時返回ACK,此時就會導致這條消息占用2倍的內存(stream中保存一份, 消費者的的pending消息列表中保存一份)
XADD my_stream * hello world
隨便添加一條消息,目的是為了初始化stream
XGROUP CREATE my_stream my_group $
XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream >
XREADGROUP GROUP my_group my_consumer2 BLOCK 0 STREAMS my_stream >
XADD my_stream * message1 Hello XADD my_stream * message2 SpringBoot XADD my_stream * message3 Community
可以看到,一共Push了3條消息,它們的ID分別是
1605524648266-0 (message1 )
1605524657157-0 (message2)
1605524665215-0 (message3)
現在的狀況是,消費者1,消費了2條消息(message1和message3),消費者2,消費了1條消息(message2)。都是消費成功了的,但是它們都還沒有進行ACK。
在客戶端,消費者消費到一條消息后會立即返回,需要重新執行命令,來回到阻塞狀態
現在我們打算,把消費者1,消費的那條message1
進行ACK
XACK my_stream my_group 1605524648266-0
127.0.0.1:6379> XPENDING my_stream my_group 1) (integer) 2 # 消費組中,所有消費者的pending消息數量 2) "1605524657157-0" # pending消息中的,最小消息ID 3) "1605524665215-0" # pending消息中的,最大消息ID 4) 1) 1) "my_consumer1" # 消費者1 2) "1" # 有1條待確認消息 2) 1) "my_consumer2" # 消費者2 2) "1" # 有2條待確認消息
127.0.0.1:6379> XPENDING my_stream my_group 0 + 10 my_consumer1 1) 1) "1605524665215-0" # 待ACK消息ID 2) "my_consumer1" # 所屬消費者 3) (integer) 847437 # 消息自從被消費者獲取后到現在過去的時間(毫秒) - idle time 4) (integer) 1 # 消息被獲取的次數 - delivery counter
這條命令,表示查詢消費組my_group
中消費者my_consumer1
的opending隊列,開始ID是0,結束ID是最大,最多檢索10個結果。
現在的情況就是,一共3條消息,消費者1消費了2條,ack了1條。消費者2消費了1條,沒有ack。消費者1和2,各自的pending隊列中都有一條未ack的消息
如何實現將未被成功消費的消息獲取出來重新進行消費?之前的演示,目的都是為了造一些數據,所以是用的客戶端命令,從這里開始,所有的演示,都會使用spring-data-redis
中的RedisTemplate
。
import java.time.Duration; import java.util.List; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.PendingMessages; import org.springframework.data.redis.connection.stream.PendingMessagesSummary; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.core.StreamOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit4.SpringRunner; import io.springboot.jwt.SpringBootJwtApplication; @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class RedisStreamTest { private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class); @Autowired private StringRedisTemplate stringRedisTemplate; @Test public void test() { StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream(); // 獲取my_group中的pending消息信息,本質上就是執行XPENDING指令 PendingMessagesSummary pendingMessagesSummary = streamOperations.pending("my_stream", "my_group"); // 所有pending消息的數量 long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages(); // 消費組名稱 String groupName= pendingMessagesSummary.getGroupName(); // pending隊列中的最小ID String minMessageId = pendingMessagesSummary.minMessageId(); // pending隊列中的最大ID String maxMessageId = pendingMessagesSummary.maxMessageId(); LOGGER.info("消費組:{},一共有{}條pending消息,最大ID={},最小ID={}", groupName, totalPendingMessages, minMessageId, maxMessageId); // 每個消費者的pending消息數量 Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); pendingMessagesPerConsumer.entrySet().forEach(entry -> { // 消費者 String consumer = entry.getKey(); // 消費者的pending消息數量 long consumerTotalPendingMessages = entry.getValue(); LOGGER.info("消費者:{},一共有{}條pending消息", consumer, consumerTotalPendingMessages); if (consumerTotalPendingMessages > 0) { // 讀取消費者pending隊列的前10條記錄,從ID=0的記錄開始,一直到ID最大值 PendingMessages pendingMessages = streamOperations.pending("my_stream", Consumer.from("my_group", consumer), Range.closed("0", "+"), 10); // 遍歷所有Opending消息的詳情 pendingMessages.forEach(message -> { // 消息的ID RecordId recordId = message.getId(); // 消息從消費組中獲取,到此刻的時間 Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery(); // 消息被獲取的次數 long deliveryCount = message.getTotalDeliveryCount(); LOGGER.info("openg消息,id={}, elapsedTimeSinceLastDelivery={}, deliveryCount={}", recordId, elapsedTimeSinceLastDelivery, deliveryCount); /** * 演示手動消費的這個判斷非常的針對,目的就是要讀取消費者“my_consumer1”pending消息中,ID=1605524665215-0的這條消息 */ if (consumer.equals("my_consumer1") && recordId.toString().equals("1605524665215-0")) { // 通過streamOperations,直接讀取這條pending消息, List<MapRecord<String, String, String>> result = streamOperations.range("my_stream", Range.rightOpen("1605524665215-0", "1605524665215-0")); // 開始和結束都是同一個ID,所以結果只有一條 MapRecord<String, String, String> record = result.get(0); // 這里執行日志輸出,模擬的就是消費邏輯 LOGGER.info("消費了pending消息:id={}, value={}", record.getId(), record.getValue()); // 如果手動消費成功后,往消費組提交消息的ACK Long retVal = streamOperations.acknowledge("my_group", record); LOGGER.info("消息ack,一共ack了{}條", retVal); } }); } }); } }
這種方式就是,遍歷消費組的pending消息情況,再遍歷每個消費者的pending消息id列表,再根據id,直接去stream讀取這條消息,進行消費Ack。
消費組:my_group,一共有2條pending消息,最大ID=1605524657157-0,最小ID=1605524665215-0 消費者:my_consumer1,一共有1條pending消息 openg消息,id=1605524665215-0, elapsedTimeSinceLastDelivery=PT1H9M4.061S, deliveryCount=1 消費了pending消息:id=1605524665215-0, value={message3=Community} 消息ack,一共ack了1條 消費者:my_consumer2,一共有1條pending消息 openg消息,id=1605524657157-0, elapsedTimeSinceLastDelivery=PT1H9M12.172S, deliveryCount=1
最終的結果就是,消費者1的唯一一條pending消息被Ack了,這里有幾個點要注意
遍歷消費者pending列表時候,最小/大消息id,可以根據XPENDING
指令中的結果來,我寫0 - +
,只是為了偷懶
遍歷到消費者pending消息的時候,可以根據elapsedTimeSinceLastDelivery
(idle time)和deliveryCount
(delivery counter)做一些邏輯判斷,elapsedTimeSinceLastDelivery
越長,表示這條消息被消費了很久,都沒Ack,deliveryCount
表示重新投遞N次后(下文會講),都沒被消費成功,可能是消費邏輯有問題,或者是Ack有問題。
127.0.0.1:6379> XPENDING my_stream my_group 1) (integer) 1 2) "1605524657157-0" 3) "1605524657157-0" 4) 1) 1) "my_consumer2" 2) "1"
消費者1,唯1條待ack的消息看,已經被我們遍歷出來手動消費,手動ack了,所以只剩下消費者2還有1條pending消息。。
如果一個消費者,一直不能消費掉某條消息,或者說這個消費者因為某些消息,永遠也不能上過線了,那么可以把這個消費者的pending消息,轉移到其他的消費者pending列表中,重新消費
其實我們這里要做的事情,就是把“消費者2”的唯一1條pending消息“ 1605524657157-0”(message2),交給“消費者1”,重新進行消費。
XCLAIM my_stream my_group my_consumer1 10000 1605524657157-0
把1605524657157-0
這條消息,重新給my_group
中的my_consumer1
進行消費,前提條件是這條消息的idle time
大于了10秒鐘(從獲取消息到現在超過10秒都沒Ack)。
import java.time.Duration; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit4.SpringRunner; import io.springboot.jwt.SpringBootJwtApplication; @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class RedisStreamTest { private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class); @Autowired private StringRedisTemplate stringRedisTemplate; @Test public void test() { List<ByteRecord> retVal = this.stringRedisTemplate.execute(new RedisCallback<List<ByteRecord>>() { @Override public List<ByteRecord> doInRedis(RedisConnection connection) throws DataAccessException { // XCLAIM 指令的實現方法 return connection.streamCommands().xClaim("my_stream".getBytes(), "my_group", "my_consumer1", Duration.ofSeconds(10), RecordId.of("1605524657157-0")); } }); for (ByteRecord byteRecord : retVal) { LOGGER.info("改了消息的消費者:id={}, value={}", byteRecord.getId(), byteRecord.getValue()); } } }
改了消息的消費者:id=1605524657157-0, value={[B@10b4f345=[B@63de4fa}
127.0.0.1:6379> XPENDING my_stream my_group 1) (integer) 1 2) "1605524657157-0" 3) "1605524657157-0" 4) 1) 1) "my_consumer1" 2) "1"
可以看到,消息 “1605524657157-0”(message2),已經從“消費者2”名下,轉移到了”消費者1”,接下來要做的事情,就是遍歷“消費者1”的pending列表,消費掉它。
最開始在控制,演示了通過客戶端,進行消費者阻塞消費的時候,寫了一條命令
XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream >
其中最后那個>
,表示ID,是一個特殊字符,如果不是,當ID不是特殊字符>
時, XREADGROUP
不再是從消息隊列中讀取消息, 而是從消費者的的pending消息列表中讀取歷史消息。(一般將參數設為0-0,表示讀取所有的pending消息)
127.0.0.1:6379> XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream 0 1) 1) "my_stream" 2) 1) 1) "1605524657157-0" 2) 1) "message2" 2) "SpringBoot"
讀取到了,消費者1,pending消息中的唯一一條消息記錄
import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.StreamOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit4.SpringRunner; import io.springboot.jwt.SpringBootJwtApplication; @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class RedisStreamTest { private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class); @Autowired private StringRedisTemplate stringRedisTemplate; @SuppressWarnings("unchecked") @Test public void test() { StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream(); // 從消費者的pending隊列中讀取消息 List<MapRecord<String, String, String>> retVal = streamOperations.read(Consumer.from("my_group", "my_consumer1"), StreamOffset.create("my_stream", ReadOffset.from("0"))); // 遍歷消息 for (MapRecord<String, String, String> record : retVal ) { // 消費消息 LOGGER.info("消息id={}, 消息value={}", record.getId(), record.getValue()); // 手動ack消息 streamOperations.acknowledge("my_group", record); } } }
這種方式,就是直接從消費者的pending隊列中讀取數據,手動進行消費,然后Ack
消息id=1605524657157-0, 消息value={message2=SpringBoot}
127.0.0.1:6379> XPENDING my_stream my_group 1) (integer) 0 2) (nil) 3) (nil) 4) (nil)
沒了,一條都沒,全部已經Ack了。
死信,就是一直沒法被消費的消息,可以根據這個兩個屬性idle time
和delivery counter
進行判斷
idle time
當消息被消費者讀取后,就會開始計時,如果一個pending消息的idle time
很長,表示這消息,可能是在Ack時發生了異常,或者說還沒來得及Ack,消費者就宕機了,導致一直沒有被Ack,當消息發生了轉移,它會清零,重新計時。
delivery counter
,它表示轉移的次數,每當一條消息的消費者發生變更的時候,它的值都會+1,如果一條pending消息的delivery counter
值很大,表示它在多個消費者之間進行了多次轉移都沒法成功消費,可以人工的讀取,消費掉。
redis5的stream,可以說功能還是蠻強大(設計上狠狠借鑒了一把Kakfa)。如果應用規模并不大,需要一個MQ服務,我想Stream的你可以試試看,比起自己搭建kakfa,RocketMQ之類的,來的快當而且更好維護。
看完上述內容,你們掌握在SpringBoot中如何使用RedisTemplate重新消費Redis Stream中未ACK的消息的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。