您好,登錄后才能下訂單哦!
這篇文章給大家介紹python如何使用redis做隊列服務,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
系統中引入消息隊列機制是對系統一個非常大的改善。例如一個web系統中,用戶做了某項操作后需要發送郵件通知到用戶郵箱中。你可以使用同步方式讓用戶等待郵件發送完成后反饋給用戶,但是這樣可能會因為網絡的不確定性造成用戶長時間的等待從而影響用戶體驗。
有些場景下是不可能使用同步方式等待完成的,那些需要后臺花費大量時間的操作。例如極端例子,一個在線編譯系統任務,后臺編譯完成需要30分鐘。這種場景的設計不可能同步等待后在回饋,必須是先反饋用戶隨后異步處理完成,再等待處理完成后根據情況再此反饋用戶與否。
另外適用消息隊列的情況是那些系統處理能力有限的情況下,先使用隊列機制把任務暫時存放起來,系統再一個個輪流處理掉排隊的任務。這樣在系統吞吐量不足的情況下也能穩定的處理掉高并發的任務。
消息隊列可以用來做排隊機制,只要系統需要用到排隊機制的地方就可以使用消息隊列來作。
目前成熟的消息隊列產品有很多,著名的例如rabbitmq。它使用起來相對還是比較簡單的,功能也相對比較豐富,一般場合下是完全夠用的。但是有個很煩人的就是它不支持優先級。 例如一個發郵件的任務,某些特權用戶希望它的郵件能夠更加及時的發送出去,至少比普通用戶要優先對待。默認情況下rabbitmq是無法處理掉的,扔給rabbitmq的任務都是FIFO先進先出。但是我們可以使用一些變通的技巧來支持這些優先級。創建多個隊列,并為rabbitmq的消費者設置相應的路由規則。
例如默認情況下有這樣一個隊列,我們拿list來模擬 [task1, task2, task3],消費者輪流按照FIFO的原則一個個拿出task來處理掉。如果有高優先級的任務進來,它也只能跟在最后被處理[task1, task2, task3, higitask1]. 但是如果使用兩個隊列,一個高優先級隊列,一個普通優先級隊列。 普通優先級[task1, task2, task3], 高優先級[hightask1 ] 然后我們設置消費者的路由讓消費者隨機從任意隊列中取數據即可。
并且我們可以定義一個專門處理高優先級隊列的消費者,它空閑的時候也不處理低優先級隊列的數據。這類似銀行的VIP柜臺,普通客戶在銀行取號排隊,一個VIP來了他雖然沒有從取號機里拿出一個排在普通會員前面的票,但是他還是可以更快地直接走VIP通道。
使用rabbitmq來做支持優先級的消息隊列的話,就像是上面所述同銀行VIP會員一樣,走不同的通道。但是這種方式只是相對的優先級,做不到絕對的優先級控制,例如我希望某一個優先級高的任務在絕對意義上要比其他普通任務優先處理掉,這樣上面的方案是行不通的。因為rabbitmq的消費者只知道再自己空閑的情況下從自己關心的隊列中“隨機”取某一個隊列里面的第一個數據來處理,它沒法控制優先取找哪一個隊列。或者更加細粒度的優先級控制。或者你系統里面設置的優先級有10多種。這樣使用rabbitmq也是很難實現的。
但是如果使用redis來做隊列的話上面的需求都可以實現。
首先redis它的設計是用來做緩存的,但是由于它自身的某種特性使得他可以用來做消息隊列。它有幾個阻塞式的API可以使用,正是這些阻塞式的API讓他有做消息隊列的能力。
試想一下在”數據庫解決所有問題“的思路下,不使用消息隊列也是可以完成你的需求的。我們把任務全部存放在數據庫然后通過不斷的輪詢方式來取任務處理。這種做法雖然可以完成你的任務但是做法很粗劣。但是如果你的數據庫接口提供一個阻塞的方法那么就可以避免輪詢操作了,你的數據庫也可以用來做消息隊列,只不過目前的數據庫還沒有這樣的接口。 另外做消息隊列的其他特性例如FIFO也很容易實現,只需要一個List對象從頭取數據,從尾部塞數據即可實現。
redis能做消息隊列得益于他list對象blpop brpop接口以及Pub/Sub(發布/訂閱)的某些接口。他們都是阻塞版的,所以可以用來做消息隊列。
一些基礎redis基礎知識的說明
redis> blpop tasklist 0 "im task 01"
這個例子使用blpop命令會阻塞方式地從tasklist列表中取頭一個數據,最后一個參數就是等待超時的時間。如果設置為0則表示無限等待。另外redis存放的數據都只能是string類型,所以在任務傳遞的時候只能是傳遞字符串。我們只需要簡單的將負責數據序列化成json格式的字符串,然后消費者那邊再轉換一下即可。
這里我們的示例語言使用python,鏈接redis的庫使用redis-py. 如果你有些編程基礎把它切換成自己喜歡的語言應該是沒問題的。
1簡單的FIFO隊列
import redis, time
def handle(info):
print info
time.sleep(20)
def main():
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
while 1:
result = r.brpop('task', 0)
handle(result[1])
if __name__ == "__main__":
main()
上例子即使一個最簡單的消費者,我們通過一個無限循環不斷地從redis的隊列中取數據。如果隊列中沒有數據則沒有超時的阻塞在那里,有數據則取出往下執行。
一般情況取出來是個復雜的字符串,我們可能需要將其格式化后作為再傳給處理函數,但是為了簡單我們的例子就是一個普通字符串。另外例子中的處理函數不做任何處理,僅僅sleep 用來模擬耗時的操作。
我們另開一個redis的客戶端來模擬生產者,自帶的客戶端就可以。多往tasklist 隊列里面塞上一些數據。
redis 127.0.0.1:6379> LPUSH task "fuckin1"
(integer) 1
redis 127.0.0.1:6379> LPUSH task "fuckin2"
(integer) 1
redis 127.0.0.1:6379> LPUSH task "fuckin3"
(integer) 2
redis 127.0.0.1:6379> LPUSH task "fuckin4"
(integer) 3
redis 127.0.0.1:6379> LPUSH task "fuckin5"
(integer) 4
redis 127.0.0.1:6379> LPUSH task "fuckin6"
(integer) 5
redis 127.0.0.1:6379> LPUSH task "fuckin7"
(integer) 6
redis 127.0.0.1:6379> LPUSH task "fuckin8"
(integer) 7
redis 127.0.0.1:6379> LPUSH task "fuckin10"
(integer) 8
redis 127.0.0.1:6379> lrange task 0 -1
1) "fuckin10"
2) "fuckin8"
3) "fuckin7"
4) "fuckin6"
5) "fuckin5"
6) "fuckin4"
7) "fuckin3"
可以看到
[root@host-192-168-1-56 soft]# python duilie.py
('task', 'fuckin1')
fuckin1
('task', 'fuckin2') ---每個任務之間間隔20秒,20秒是模擬任務執行時間
fuckin2
('task', 'fuckin3')
fuckin3
('task', 'fuckin4')
fuckin4
('task', 'fuckin5')
.。
。。。
。。。
('task', 'fuckin10')
fuckin10
。。。等待狀態,等待新的任務
假設一種簡單的需求,只需要高優先級的比低優先級的任務率先處理掉。其他任務之間的順序一概不管,這種我們只需要在在遇到高優先級任務的時候將它塞到隊列的前頭,而不是push到最后面即可。
因為我們的隊列是使用的redis的 list,所以很容易實現。遇到高優先級的使用rpush 遇到低優先級的使用lpush
redis> lpush tasklist 'im task 01' redis> lpush tasklist 'im task 02' redis> rpush tasklist 'im high task 01' redis> rpush tasklist 'im high task 01' redis> lpush tasklist 'im task 03' redis> rpush tasklist 'im high task 03'
隨后會看到,高優先級的總是比低優先級的率先執行。但是這個方案的缺點是高優先級的任務之間的執行順序是先進后出的。
例子2中只是簡單的將高優先級的任務塞到隊列最前面,低優先級的塞到最后面。這樣保證不了高優先級任務之間的順序。
假設當所有的任務都是高優先級的話,那么他們的執行順序將是相反的。這樣明顯違背了隊列的FIFO原則。
不過只要稍加改進就可以完善我們的隊列。
跟使用rabbitmq一樣,我們設置兩個隊列,一個高優先級一個低優先級的隊列。高優先級任務放到高隊列中,低的放在低優先隊列中。redis和rabbitmq不同的是它可以要求隊列消費者從哪個隊列里面先讀。
def main(): pool = redis.ConnectionPool(host='localhost', port=6379, db=0) r = redis.Redis(connection_pool=pool) while 1: result = r.brpop(['high_task_queue', 'low_task_queue'], 0) handle(result[1])
上面的代碼,會阻塞地從'high_task_queue', 'low_task_queue'這兩個隊列里面取數據,如果第一個沒有再從第二個里面取。 所以只需要將隊列消費者做這樣的改進便可以達到目的。
redis> lpush low_task_queue low001 redis> lpush low_task_queue low002 redis> lpush low_task_queue low003 redis> lpush low_task_queue low004 redis> lpush high_task_queue low001 redis> lpush high_task_queue low002 redis> lpush high_task_queue low003 redis> lpush high_task_queue low004
通過上面的測試看到,高優先級的會被率先執行,并且高優先級之間也是保證了FIFO的原則。
這種方案我們可以支持不同階段的優先級隊列,例如高中低三個級別或者更多的級別都可以。
假設有個這樣的需求,優先級不是簡單的高中低或者0-10這些固定的級別。而是類似0-99999這么多級別。那么我們第三種方案將不太合適了。 雖然redis有sorted set這樣的可以排序的數據類型,看是很可惜它沒有阻塞版的接口。于是我們還是只能使用list類型通過其他方式來完成目的。
有個簡單的做法我們可以只設置一個隊列,并保證它是按照優先級排序號的。然后通過二分查找法查找一個任務合適的位置,并通過 lset 命令插入到相應的位置。 例如隊列里面包含著寫優先級的任務[1, 3, 6, 8, 9, 14],當有個優先級為7的任務過來,我們通過自己的二分算法一個個從隊列里面取數據出來反和目標數據比對,計算出相應的位置然后插入到指定地點即可。
因為二分查找是比較快的,并且redis本身也都在內存中,理論上速度是可以保證的。但是如果說數據量確實很大的話我們也可以通過一些方式來調優。
回想我們第三種方案,把第三種方案結合起來就會很大程度上減少開銷。例如數據量十萬的隊列,它們的優先級也是隨機0-十萬的區間。我們可以設置10個或者100個不同的隊列,0-一萬的優先級任務投放到1號隊列,一萬-二萬的任務投放到2號隊列。這樣將一個隊列按不同等級拆分后它單個隊列的數據就減少許多,這樣二分查找匹配的效率也會高一點。但是數據所占的資源基本是不變的,十萬數據該占多少內存還是多少。只是系統里面多了一些隊列而已。
關于python如何使用redis做隊列服務就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。