您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關 Celery與RabbitMQ怎么在Python中使用,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
Broker
Broker(RabbitMQ)負責創建任務隊列,根據一些路由規則將任務分派到任務隊列,然后將任務從任務隊列交付給worker
Consumer (Celery Workers)
Consumer是執行任務的一個或多個Celery workers。可以根據用例啟動許多workers
Result Backend
后端用于存儲任務的結果。但是,它不是必需的元素,如果不在設置中包含它,就無法訪問任務的結果
??首先,需要安裝好Celery,可以使用PyPI:
pip install celery
??為什么我們需要broker呢?這是因為Celery本身并不構造消息隊列,所以它需要一個額外的消息傳輸來完成這項工作。這里可以將Celery看作消息代理的包裝器
實際上,也可以從幾個不同的代理中進行選擇,比如RabbitMQ、Redis或數據庫(例如Django數據庫)
在這里使用RabbitMQ作為代理,因為它功能完整、穩定,Celery推薦使用它。由于演示我的環境是在Mac OS中,安裝RabbitMQ使用Homebrew即可:
brew install rabbitmq #如果是Ubuntu的話使用apt-get安裝
??程序將在/usr/local/sbin中安裝RabbitMQ,雖然有些系統可能會有所不同。可以將此路徑添加到環境變量路徑,以便以后方便地使用。例如,打開shell啟動文件~/.bash_profile添加:
PATH=$PATH:/usr/local/sbin
現在,可以使用rabbitmq-server命令啟動我們的RabbitMQ服務器。檢查RabbitMQ服務器成功啟動,將看到類似的輸出:
??RabbitMQ使用Celery之前,需要對RabbitMQ進行一些配置。簡單地說,我們需要創建一個虛擬主機和用戶,然后設置用戶權限,以便它可以訪問虛擬主機
# 添加用戶跟密碼 $ rabbitmqctl add_user test test123 # 添加虛擬主機 $ rabbitmqctl add_vhost test_vhost # 為用戶添加標簽 $ rabbitmqctl set_user_tags test test_tag # 設置用戶權限 $ rabbitmqctl set_permissions -p test_vhost test ".*" ".*" ".*"
敲黑板!RabbitMQ中有三種操作:配置、寫入和讀取
上面命令末尾的字符串表示用戶test將擁有所有配置、寫入和讀取權限
現在讓我們創建一個簡單的項目來演示Celery的使用
在celery.py中添加以下代碼:
from __future__ import absolute_import from celery import Celery app = Celery('test_celery', broker='amqp://test:test123@localhost/test_vhost', backend='rpc://', include=['test_celery.tasks'])
在這里,初始化了一個名為app的Celery實例,將用于創建一個任務。Celery的第一個參數只是項目包的名稱,即“test_celery”。
broker參數指定代理URL,對于RabbitMQ,傳輸是amqp。
后端參數指定后端URL。Celery中的后端用于存儲任務結果。因此,如果需要在任務完成時訪問任務的結果,應該為Celery設置一個后端。
rpc意味著將結果作為AMQP消息發送回去,這對本次演示來說是一種可接受的格式
include參數指定了在Celery工作程序啟動時要導入的模塊列表。我們在這里添加了tasks模塊,以便找到我們的任務。
在tasks.py這個文件中,定義了我們的任務add_longtime:
from __future__ import absolute_import from test_celery.celery import app import time @app.task def add_longtime(a, b): print 'long time task begins' # sleep 5 seconds time.sleep(5) print 'long time task finished' return a + b
可以看到,導入了在前面的Celery模塊中定義的應用程序,并將其用作任務方法的裝飾器。另外注意!app.task只是一個裝飾器。此外,我們在add_longtime任務中休眠5秒,以模擬一個耗時較長的Task
在設置好Celery之后,我們需要開始運行任務,它包含在runs_tasks.py:
from .tasks import add_longtime import time if __name__ == '__main__': result = add_longtime.delay(1,2) #此時,任務還未完成,它將返回False print 'Task finished? ', result.ready() print 'Task result: ', result.result # 延長到10秒以確保任務已經完成 time.sleep(10) # 現在任務完成,ready方法將返回True print 'Task finished? ', result.ready() print 'Task result: ', result.result
這里,我們使用delay方法調用任務add_longtime,如果我們想異步處理任務,就需要使用delay方法。此外,保存任務的結果并打印一些信息。如果任務已經完成,ready方法將返回True,否則返回False。result屬性是任務的結果,如果任務尚未完成,則返回None。
現在,可以使用下面的命令啟動Celery(注:在項目文件夾中運行):
celery -A test_celery worker --loglevel=info
Celery成功連接到RabbitMQ,你會看到這樣的東西:
再項目文件中輸入以下命令運行它:
python -m test_celery.run_tasks
查看Celery控制臺,看到運行任務:
[2020-05-15 17:15:21,508: INFO/MainProcess]
Received task: test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3]
[2020-05-15 17:15:21,508: WARNING/Worker-3] long time task begins
[2020-05-15 17:15:31,510: WARNING/Worker-3] long time task finished
[2020-05-15 17:15:31,512: INFO/MainProcess]
Task test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3] succeeded in 15.003732774s: 3
當Celery收到一個任務,它打印出任務名稱與任務id(在括號中):
Received task: test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5]
在這一行下面是我們的任務add_longtime打印的兩行,時間延遲為5秒:
long time task begins long time task finished
最后一行顯示我們的任務在5秒內完成,任務結果為3:
Task test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5] succeeded in 5.025242167s: 3
在當前控制臺中,您將看到以下輸出:
Flower是一款基于網絡的Celery實時監控軟件。使用Flower,可以輕松地監視任務進度和歷史記錄
使用pip來安裝Flower:
pip install flower
要啟動Flower web控制臺,需要運行以下命令:
celery -A test_celery flower
Flower將運行具有默認端口5555的服務器,可以通過http://localhost:5555訪問web控制臺
上述就是小編為大家分享的 Celery與RabbitMQ怎么在Python中使用了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。