您好,登錄后才能下訂單哦!
這篇文章主要介紹Redis中Stream類型怎么用,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
最近在看redis
這方面的知識,發現在redis5
中產生了一種新的數據類型Stream
,它和kafka
的設計有些類似,可以當作一個簡單的消息隊列來使用。
是可持久化的,可以保證數據不丟失。
支持消息的多播、分組消費。
支持消息的有序性。
解釋:
消費者組:
Consumer Group,即使用 XGROUP CREATE
命令創建的,一個消費者組中可以存在多個消費者,這些消費者之間是競爭
關系。
同一條消息,只能被這個消費者組中的某個消費者獲取。
多個消費者之間是相互獨立的,互不干擾。
消費者:
Consumer 消費消息。
last_delivered_id:
這個id保證了在同一個消費者組中,一個消息只能被一個消費者獲取。每當消費者組的某個消費者讀取到了這個消息后,這個last_delivered_id的值會往后移動一位,保證消費者不會讀取到重復的消息。
pending_ids
:記錄了消費者讀取到的消息id列表,但是這些消息可能還沒有處理,如果認為某個消息處理,需要調用ack
命令。這樣就確保了某個消息一定會被執行一次。
消息內容:
是一個鍵值對
的格式。
Stream 中 消息的 ID:
默認情況下,ID使用 *
,redis可以自動生成一個,格式為 時間戳-序列號
,也可以自己指定,一般使用默認生成的即可,且后生成的id號要比之前生成的大。
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
xadd 命令 返回的是數據的id, xx-yy (xx指的是毫秒數,yy指的是在這個毫秒內的第幾條消息)
1、向流中增加一條數據,
127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key這個流中增加一個 username 是zhangsan的數據 *表示自動生成id "1635999858912-0" # 返回的是ID 127.0.0.1:6379> keys * 1) "stream-key" # 可以看到stream自動創建了 127.0.0.1:6379>
2、向流中增加數據,不自動創建流
127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因為指定了nomkstream參數,而not-exists-stream之前不存在,所以加入失敗 (nil) 127.0.0.1:6379> keys * (empty array) 127.0.0.1:6379>
3、手動指定ID的值
127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此處id的值是自己傳遞的1-1,而不是使用*自動生成 "1-1" # 返回的是id的值 127.0.0.1:6379>
4、設置一個固定大小的Stream1、精確指定Stream的大小
指定指定Stream的大小比模糊指定Stream的大小會稍微多少消耗一些性能。
2、模糊指定Stream的大小
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first "1636001034141-0" 127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second "1636001044506-0" 127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third "1636001057846-0" 127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636001057846-0" 9) "groups" 10) (integer) 0 11) "first-entry" 12) 1) "1636001034141-0" 2) 1) "first" 2) "first" 13) "last-entry" 14) 1) "1636001057846-0" 2) 1) "third" 2) "third" 127.0.0.1:6379>
~
模糊指定流的大小,可以看到指定的是1,實際上已經到了3.
xrange key start end [COUNT count]
127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> xadd stream-key * username zhangsan QUEUED 127.0.0.1:6379(TX)> xadd stream-key * username lisi QUEUED 127.0.0.1:6379(TX)> exec 1) "1636003481706-0" 2) "1636003481706-1" 127.0.0.1:6379> xadd stream-key * username wangwu "1636003499055-0" 127.0.0.1:6379>
使用redis的事務操作,獲取到同一毫秒產生的多條數據,時間戳一樣,序列號不一樣
1、獲取所有的數據(-
和+
的使用)
127.0.0.1:6379> xrange stream-key - + 1) 1) "1636003481706-0" 2) 1) "username" 2) "zhangsan" 2) 1) "1636003481706-1" 2) 1) "username" 2) "lisi" 3) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu" 127.0.0.1:6379>
-:
表示最小id的值
+:
表示最大id的值
2、獲取指定id范圍內的數據,閉區間
127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-0 1) 1) "1636003481706-1" 2) 1) "username" 2) "lisi" 2) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu" 127.0.0.1:6379>
3、獲取指定id范圍內的數據,開區間
127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-0 1) 1) "1636003481706-1" 2) 1) "username" 2) "lisi" 127.0.0.1:6379>
(:
表示開區間
4、獲取某個毫秒后所有的數據
127.0.0.1:6379> xrange stream-key 1636003481706 + 1) 1) "1636003481706-0" 2) 1) "username" 2) "zhangsan" 2) 1) "1636003481706-1" 2) 1) "username" 2) "lisi" 3) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu" 127.0.0.1:6379>
直接寫毫秒
不寫后面的序列號即可。
5、獲取單條數據
127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-0 1) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu" 127.0.0.1:6379>
start
和end
的值寫的一樣即可獲取單挑數據。
6、獲取固定條數的數據
127.0.0.1:6379> xrange stream-key - + count 1 1) 1) "1636003481706-0" 2) 1) "username" 2) "zhangsan" 127.0.0.1:6379>
使用 count
進行限制
XREVRANGE key end start [COUNT count]
使用方式和XRANGE
類似,略。
xdel key ID [ID ...]
127.0.0.1:6379> xadd stream-key * username zhangsan "1636004176924-0" 127.0.0.1:6379> xadd stream-key * username lisi "1636004183638-0" 127.0.0.1:6379> xadd stream-key * username wangwu "1636004189211-0" 127.0.0.1:6379>
需求:往Stream中加入3條消息,然后刪除第2條消息
127.0.0.1:6379> xdel stream-key 1636004183638-0 (integer) 1 # 返回的是刪除記錄的數量 127.0.0.1:6379> xrang stream -key - + 127.0.0.1:6379> xrange stream-key - + 1) 1) "1636004176924-0" 2) 1) "username" 2) "zhangsan" 2) 1) "1636004189211-0" 2) 1) "username" 2) "wangwu" 127.0.0.1:6379>
注意:
需要注意的是,我們從Stream中刪除一個消息,這個消息并不是被真正的刪除了,而是被標記為刪除,這個時候這個消息還是占據著內容空間的。如果所有Stream中所有的消息都被標記刪除,這個時候才會回收內存空間。但是這個Stream并不會被刪除。
xlen key
查看Stream中元素的長度
127.0.0.1:6379> xadd stream-key * username zhangsan "1636004690578-0" 127.0.0.1:6379> xlen stream-key (integer) 1 127.0.0.1:6379> xlen not-exists-stream-key (integer) 0 127.0.0.1:6379>
注意:
如果xlen
后方的key
不存在則返回0,否則返回元素的個數。
xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]
127.0.0.1:6379> xadd stream-key * username zhangsan "1636009745401-0" 127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> xadd stream-key * username lisi QUEUED 127.0.0.1:6379(TX)> xadd stream-key * username wangwu QUEUED 127.0.0.1:6379(TX)> exec 1) "1636009763955-0" 2) "1636009763955-1" 127.0.0.1:6379> xadd stream-key * username zhaoliu "1636009769625-0" 127.0.0.1:6379>
1、maxlen精確限制
127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最后的2個消息 (integer) 2 127.0.0.1:6379> xrange stream-key - + # 可以看到之前加入的2個消息被刪除了 1) 1) "1636009763955-1" 2) 1) "username" 2) "wangwu" 2) 1) "1636009769625-0" 2) 1) "username" 2) "zhaoliu" 127.0.0.1:6379>
上方的意思是,保留stream-key
這個Stream中最后的2個消息。
2、minid模糊限制
minid 是刪除比這個id小的數據,本地測試的時候沒有測試出來
,略。
XREAD
只是讀取消息,讀取完之后并不會刪除消息。 使用XREAD
讀取消息,是完全獨立與消費者組的,多個客戶端可以同時讀取消息。
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
127.0.0.1:6379> xadd stream-key * username zhangsan "1636011801365-0" 127.0.0.1:6379> xadd stream-key * username lisi "1636011806261-0" 127.0.0.1:6379> xadd stream-key * username wangwu "1636011810905-0" 127.0.0.1:6379>
1、獲取用戶名是wangwu的數據
127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此處寫的是lisi的id,即讀取到的數據需要是 > 1636011806261-0 1) 1) "stream-key" 2) 1) 1) "1636011810905-0" 2) 1) "username" 2) "wangwu"
2、獲取2條數據
127.0.0.1:6379> xread count 2 streams stream-key 0-0 1) 1) "stream-key" 2) 1) 1) "1636011801365-0" 2) 1) "username" 2) "zhangsan" 2) 1) "1636011806261-0" 2) 1) "username" 2) "lisi" 127.0.0.1:6379>
count
限制單次讀取最后的消息,因為當前讀取可能沒有這么多。
3、非阻塞讀取Stream對尾的數據
即讀取隊列尾的下一個消息,在非阻塞模式下始終是nil
127.0.0.1:6379> xread streams stream-key $ (nil)
4、阻塞讀取Stream對尾的數據
注意:
$
表示讀取隊列最新進來的一個消息,不是Stream的最后一個消息。是xread block
執行后,再次使用xadd
添加消息后,xread block
才會返回。
block 0
表示永久阻塞,當消息到來時,才接觸阻塞。block 1000
表示阻塞1000ms,如果1000ms還沒有消息到來,則返回nil
xread進行順序消費
當使用xread進行順序消息時,需要記住返回的消息id,同時下次調用xread時,需要將上次返回的消息id傳遞進去。
xread
讀取消息,完全無視消費組,此時Stream
就可以理解為一個普通的list。
1、創建Stream的名稱是 stream-key
2、創建2個消息,aa和bb
127.0.0.1:6379> xadd stream-key * aa aa "1636362619125-0" 127.0.0.1:6379> xadd stream-key * bb bb "1636362623191-0"
1、創建一個從頭開始消費的消費者組
xgroup create stream-key(Stream 名) g1(消費者組名) 0-0(表示從頭開始消費)
2、創建一個從Stream最新的一個消息消費的消費者組
xgroup create stream-key g2 $
$
表示從最后一個元素消費,不包括Stream中的最后一個元素,即消費最新的消息。
xgroup create stream-key g3 1636362619125-0 #1636362619125-0 這個是上方aa消息的id的值
1636362619125-0
某個消息的具體的ID,這個g3
消費者組中的消息都是大于>
這個id的消息。
3、從消費者中讀取消息
127.0.0.1:6379> xreadgroup group g1(消費組名) c1(消費者名,自動創建) count 3(讀取3條) streams stream-key(Stream 名) >(從該消費者組中還未分配給另外的消費者的消息開始讀取) 1) 1) "stream-key" 2) 1) 1) "1636362619125-0" 2) 1) "aa" 2) "aa" 2) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key > (nil) # 返回 nil 是因為 g2消費組是從最新的一條信息開始讀取(創建消費者組時使用了$),需要在另外的窗口執行`xadd`命令,才可以再次讀取到消息 127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key > #只讀取到一條消息是因為,在創建消費者組時,指定了aa消息的id,bb消息的id大于aa,所以讀取出來了。 1) 1) "stream-key" 2) 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379>
4、讀取消費者的pending消息
127.0.0.1:6379> xgroup create stream-key g4 0-0 OK 127.0.0.1:6379> xinfo consumers stream-key g1 1) 1) "name" 2) "c1" 3) "pending" 4) (integer) 2 5) "idle" 6) (integer) 88792 127.0.0.1:6379> xinfo consumers stream-key g4 (empty array) 127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0 1) 1) "stream-key" 2) 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0 1) 1) "stream-key" 2) (empty array) 127.0.0.1:6379>
5、轉移消費者的消息
127.0.0.1:6379> xpending stream-key g1 - + 10 c1 1) 1) "1636362619125-0" 2) "c1" 3) (integer) 2686183 4) (integer) 1 2) 1) "1636362623191-0" 2) "c1" 3) (integer) 102274 4) (integer) 7 127.0.0.1:6379> xpending stream-key g1 - + 10 c2 (empty array) 127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xpending stream-key g1 - + 10 c2 1) 1) "1636362623191-0" 2) "c2" 3) (integer) 17616 4) (integer) 8 127.0.0.1:6379>
也可以通過xautoclaim
來實現。
1、查看消費組中消費者的pending消息
127.0.0.1:6379> xpending stream-key g1 - + 10 c2 1) 1) "1636362623191-0" 2) "c2" 3) (integer) 1247680 4) (integer) 8 127.0.0.1:6379>
2、查看消費組中的消費者信息
127.0.0.1:6379> xinfo consumers stream-key g1 1) 1) "name" 2) "c1" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 1474864 2) 1) "name" 2) "c2" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 1290069 127.0.0.1:6379>
3、查看消費組信息
127.0.0.1:6379> xinfo groups stream-key 1) 1) "name" 2) "g1" 3) "consumers" 4) (integer) 2 5) "pending" 6) (integer) 2 7) "last-delivered-id" 8) "1636362623191-0" 2) 1) "name" 2) "g2" 3) "consumers" ......
4、查看Stream信息
127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 2 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636362623191-0" 9) "groups" 10) (integer) 4 11) "first-entry" 12) 1) "1636362619125-0" 2) 1) "aa" 2) "aa" 13) "last-entry" 14) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379>
以上是“Redis中Stream類型怎么用”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。