您好,登錄后才能下訂單哦!
第1章?celery
Celery是一個簡單靈活,且可靠,處理大量消息的分布式系統
用python寫的,用來執行定時任務和異步任務的框架
消息中間àbroker
任務執行單元àworker
任務執行結果存儲àstore
異步任務:將耗時操作任務交給celery去異步執行,比如發送短信/郵件,消息推送,音視頻處理等
定時任務:定時執行某件事情,比如每天的數據統計
?
pip install celery
消息中間件:rabbitmq/redis,這里使用redis
docker run -p 6379:6379 --name=redis -d redis:latest redis-server
定義任務:
from?celery?import?Celery
import?time
#?任務產生存儲
broker =?'redis://10.211.55.8:16379/1'
#?任務執行結果存儲
backend =?'redis://10.211.55.8:16379/2'
#?第一個參數是別名,隨便寫即可
app = Celery('first_celery',?backend=backend,?broker=broker)
@app.task
def?test_celery(x,?y):
????time.sleep(2)
????return?x +?y
添加任務:
import?task
if?__name__ ==?'__main__':
????# result不是函數的執行結果,是一個對象
????result = tasks.add.delay(2,3)
????print(result.id)
使用celery命令開啟work進程
celery –A tasks worker –l info
?
當有多個任務需要執行時,會有多個任務函數,位了方便解藕,我們可以使用這種方式,方便管理
celery.py
from?celeryimport?Celery
broker =?'redis://10.211.55.8:16379/2'
backend =?'redis://10.211.55.8:16379/3'
# include包含兩個任務的函數,去相應文件中找對應的任務函數
cel = Celery('test',?broker=broker,?backend=backend,?include=['celery_task.task',?'celery_task.task2'])
#?時區,有時候我們想在固定的時間來執行相關任務,就需要設置時區相關配置
cel.conf.timezone =?'Asia/Shanghai'
#?是否UTC
cel.conf.enable_utc =?False
task.py
from?.celeryimport?app
@app.task
def?sum(x,?y):
????returnx+y
task2.py
from?.celeryimport?app
@app.task
def?less(x,?y):
????returnx-y
在celery_task目錄下創建添加任務的py文件
from?celery_taskimport?task
from?celery_task?import?task2
task.sum.delay(1,?2)
task2.less.delay(3,?1)
from?celery_taskimport?task
from?celery_task?import?task2
from?datetime?import?datetime,?timedelta
# #?方式一:
# #獲取時間對象
# time = datetime(2019, 8, 3, 15, 0, 0)
# print(time)
# #?將時間轉換成UTC時間
# utc_time = datetime.utcfromtimestamp(time.timestamp())
# print(utc_time)
# #?用apply_async,args是函數參數,eta是指定時間
# result = task.add.apply_async(args=[1, 2], eta=utc_time)
#?方式二:
ctime = datetime.now()
#?把當前時間轉換成UTC時間
utc_time = datetime.utcfromtimestamp(ctime.utcfromtimestamp())
#?當前時間延遲十秒
time_delta = timedelta(seconds=10)
task_time = utc_time + time_delta
result = task.add.apply_async(args=[2,?3],?eta=task_time)
# task.sum.delay(1, 2)
# task2.less.delay(3, 1)
?多任務結構中celery.py文件如下
from?celeryimport?Celery
from?datetime?import?timedelta
from?celery.schedules?import?crontab
broker =?'redis://10.211.55.8:16379/2'
backend =?'redis://10.211.55.8:16379/3'
# include包含兩個任務的函數,去相應文件中找對應的任務函數
app = Celery('test',?broker=broker,?backend=backend,?include=['celery_task.task',?'celery_task.task2'])
#?時區
app.conf.timezone =?'Asia/Shanghai'
#?是否UTC
app.conf.enable_utc =?False
app.conf.beat_schedules = {
????#別名,可隨意編寫
????'add_crontab': {
????????#執行task下的add函數
????????'task':?'celery_task.task.add',
????????#每隔兩秒執行
????????'schedules': timedelta(seconds=2),
????????#固定時間執行,每年4月11號,8點42分執行
????????#'schedules': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
????????#傳遞參數
????????'args': [1,?2],
????}
}
在項目的目錄下創建celeryconfig.py
import?djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
????'app01.tasks',
)
#有些情況可以防止死鎖
CELERYD_FORCE_EXECV=True
#?設置并發worker數量
CELERYD_CONCURRENCY=4
#允許重試
CELERY_ACKS_LATE=True
#?每個worker最多執行100個任務被銷毀,可以防止內存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
#?超時時間
CELERYD_TASK_TIME_LIMIT=12*30
在app目錄下創建tasks.py
from?celeryimport?task
@task
def?add(x,?y):
????returnx+y
試圖函數views中使用
from?django.shortcutsimport?render,HttpResponse
from?app01.tasks?import?add
from?datetime?import?datetime,?timedelta
def?test(request):
????# result=add.delay(2,3)
????ctime = datetime.now()
????#?默認用utc時間
????utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
????time_delay= timedelta(seconds=5)
????task_time = utc_ctime +?time_delay
????result = add.apply_async(args=[4,?3],?eta=task_time)
????print(result.id)
????returnHttpResponse('ok')
settings中注冊
INSTALLED_APPS= [
????'django.contrib.admin',
????'django.contrib.auth',
????'django.contrib.contenttypes',
????'django.contrib.sessions',
????'django.contrib.messages',
????'django.contrib.staticfiles',
????'djcelery',
????'app01',
]
from?djceleryimport?celeryconfig
#?默認使用rabbitmq,redis的話需要指定下
BROKER_BACKEND='redis'
BROKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
?
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。