亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用Python?Celery動態添加定時任務

發布時間:2023-05-19 17:06:53 來源:億速云 閱讀:137 作者:iii 欄目:編程語言

本篇內容介紹了“如何使用Python Celery動態添加定時任務”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

    一、背景

    實際工作中會有一些耗時的異步任務需要使用定時調度,比如發送郵件,拉取數據,執行定時腳本

    通過celery 實現調度主要思想是 通過引入中間人redis,啟動 worker 進行任務執行 ,celery-beat進行定時任務數據存儲

    二、Celery動態添加定時任務的官方文檔

    celery文檔:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    celery 自定義調度類說明:

    自定義調度器類可以在命令行中指定(--scheduler參數)

    django-celery-beat文檔 : https://pypi.org/project/django-celery-beat/

    關于django-celery-beat 插件的說明:

    此擴展使您能夠將定期任務計劃存儲在數據庫中,可以從 Django 管理界面管理周期性任務,您可以在其中創建、編輯和刪除周期性任務以及它們應該運行的頻率

    三、celery簡單實用

    3.1 基礎環境配置

    1. 安裝最新版本的Django

    pip3 install django #當前我安裝的版本是 3.0.6

    2. 創建項目

    django-admin startproject typeidea
    django-admin startapp blog

    3.安裝 celery

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于動態添加定時任務
    pip3 install django-celery-results
    pip3 install redis
    3.2 測試使用Celery應用

    1. 創建blog目錄、新建task.py

    首先在Django項目中創建一個blog文件夾,并且在blog文件夾下創建tasks.py模塊, 如下:

    如何使用Python?Celery動態添加定時任務

    tasks.py代碼如下:

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做為broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 創建任務函數
    @app.task
    def my_task():
        print('任務正在執行...')

    Celery第一個參數是給其設定一個名字, 第二參數我們設定一個中間人broker, 在這里我們使用Redis作為中間人。my_task函數是我們編寫的一個任務函數, 通過加上裝飾器app.task, 將其注冊到broker的隊列中。

    2. 啟動redis、創建worker

    現在我們在創建一個worker, 等待處理隊列中的任務。

    進入項目的根目錄,執行命令: celery -A celery_tasks.tasks worker -l info

    如何使用Python?Celery動態添加定時任務

    3. 調用任務

    下面來測試一下功能,創建一個任務,加入任務隊列中,提供worker執行。

    進入python終端, 執行如下代碼:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

    調用一個任務函數,將會返回一個AsyncResult對象,這個對象可以用來檢查任務的狀態或者獲得任務的返回值。

    4. 查看結果

    在worker的終端查看任務執行情況,可以看到已經收到83484dfe-f729-417b-8e51-6c7ae32a1377 任務,并打印了任務執行信息

    如何使用Python?Celery動態添加定時任務

    5. 存儲并查看任務執行狀態

    把任務執行結果賦值給ret,然后調用result() 會產生 DisabledBackend 報錯,可見沒有配置后端存儲的時候并不能保存任務執行的狀態信息,下一節我們會講到如何配置backend保存任務執行結果

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()

    如何使用Python?Celery動態添加定時任務

    四、配置backend存儲任務執行結果

    如果我們想跟蹤任務的狀態,Celery需要將結果保存到某個地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

    1. 添加backend參數

    在本例中我們使用Redis作為存儲結果的方案,通過Celery的backend參數來設定任務結果存儲地址。我們將tasks模塊修改如下:

    from celery import Celery
     
    # 使用redis作為broker以及backend
    app = Celery('celery_tasks.tasks',
                 broker='redis://127.0.0.1:6379/8',
                 backend='redis://127.0.0.1:6379/9')
     
    # 創建任務函數
    @app.task
    def my_task(a, b):
        print("任務函數正在執行....")
        return a + b

    給Celery增加了backend參數,指定redis作為結果存儲,并將任務函數修改為兩個參數,并且有返回值。

    2. 調用任務/查看任務執行結果

    下面再來執行調用一下這個任務看看。

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False

    再來看看worker的執行情況,如下:

    如何使用Python?Celery動態添加定時任務

    可以看到celery任務已經執行成功了。

    但是這只是一個開始,下一步要看看如何添加定時的任務。

    四、優化Celery目錄結構

    上面直接將Celery的應用創建、配置、tasks任務全部寫在了一個文件,這樣在后面項目越來越大,也是不方便的。下面來拆分一下,并且添加一些常用的參數。

    基本結構如下

    如何使用Python?Celery動態添加定時任務

    $ vim typeidea/celery.py (Celery應用文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name='typeidea'
    # set the default django setting module for the 'celery' program
    os.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings')
    app = Celery(project_name)
     
    app.config_from_object('django.conf:settings')
     
    app.autodiscover_tasks()

    vim blog/celeryconfig.py (配置Celery的參數文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 設置結果存儲
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
    # 設置代理人broker
    BROKER_URL = 'redis://127.0.0.1:6379/1'
    # celery 的啟動工作數量設置
    CELERY_WORKER_CONCURRENCY = 20
    # 任務預取功能,就是每個工作的進程/線程在獲取任務的時候,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情況下可以防止死鎖
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 執行多少個任務后進行重啟操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果網絡資源有限,不建議開足馬力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

    vim blog/tasks.py (tasks 任務文件)

    import time
    from blog.celery import app
     
    # 創建任務函數
    @app.task
    def my_task(a, b, c):
        print('任務正在執行...')
        print('任務1函數休眠10s')
        time.sleep(10)
        return a + b + c

    五、開始使用django-celery-beat調度器

    使用 django-celery-beat 動態添加定時任務  celery 4.x 版本在 django 框架中是使用 django-celery-beat 進行動態添加定時任務的。前面雖然已經安裝了這個庫,但是還要再說明一下。

    1. 安裝 django-celery-beat

    pip3 install django-celery-beat

    2.在項目的 settings 文件配置 django-celery-beat

    INSTALLED_APPS = [
        'blog',
        'django_celery_beat',
        ...
    ]
     
    # Django設置時區
    LANGUAGE_CODE = 'zh-hans'  # 使用中國語言
    TIME_ZONE = 'Asia/Shanghai'  # 設置Django使用中國上海時間
    # 如果USE_TZ設置為True時,Django會使用系統默認設置的時區,此時的TIME_ZONE不管有沒有設置都不起作用
    # 如果USE_TZ 設置為False,TIME_ZONE = 'Asia/Shanghai', 則使用上海的UTC時間。
    USE_TZ = False

    3. 創建 django-celery-beat 相關表

    執行Django數據庫遷移: python manage.py migrate

    如何使用Python?Celery動態添加定時任務

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 為celery 設置環境變量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 創建celery app
    app = Celery('blog')
    # 從單獨的配置模塊中加載配置
    app.config_from_object(celeryconfig)
     
    # 設置app自動加載任務
    app.autodiscover_tasks([
        'blog',
    ])

    配置 celeryconfig.py

    # 設置結果存儲
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
    # 設置代理人broker
    BROKER_URL = 'redis://127.0.0.1:6379/1'
    # celery 的啟動工作數量設置
    CELERY_WORKER_CONCURRENCY = 20
    # 任務預取功能,就是每個工作的進程/線程在獲取任務的時候,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情況下可以防止死鎖
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 執行多少個任務后進行重啟操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果網絡資源有限,不建議開足馬力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

    編寫任務 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做為broker
    # app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1')
     
    # 創建任務函數
    @app.task
    def my_task(a, b, c):
        print('任務正在執行...')
        print('任務1函數休眠10s')
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任務2函數正在執行....")
        print('任務2函數休眠10s')
        time.sleep(10)

    5. 啟動定時任務work

    啟動定時任務首先需要有一個work執行異步任務,然后再啟動一個定時器觸發任務。

    啟動任務 work

    $ celery -A blog worker -l info

    如何使用Python?Celery動態添加定時任務

    啟動定時器觸發 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    如何使用Python?Celery動態添加定時任務

    六、具體操作演練

    6.1 創建基于間隔時間的周期性任務

    1. 初始化周期間隔對象interval 對象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>

    2.創建一個無參數的周期性間隔任務

    >>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',)
    <PeriodicTask: my_task2: every 10 seconds>

    beat 調度服務日志顯示如下:

    如何使用Python?Celery動態添加定時任務

    worker 服務日志顯示如下:

    如何使用Python?Celery動態添加定時任務

    3.創建一個帶參數的周期性間隔任務

    >>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>

    beat 調度服務日志結果:

    如何使用Python?Celery動態添加定時任務

    worker 服務日志結果:

    如何使用Python?Celery動態添加定時任務

    4.如何高并發執行任務

    需要并行執行任務的時候,就需要設置多個worker來執行任務。

    6.2 創建一個不帶參數的周期性間隔任務

    1.初始化 crontab 的調度對象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute='*',
    ... hour='*',
    ... day_of_week='*',
    ... day_of_month='*',
    ... timezone=pytz.timezone('Asia/Shanghai')
    ... )

    2. 創建不帶參數的定時任務

    PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)

    beat 調度服務執行結果

    如何使用Python?Celery動態添加定時任務

    worker 執行服務結果

    如何使用Python?Celery動態添加定時任務

    6.3 周期性任務的查詢、刪除操作

    1. 周期性任務的查詢

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name='my_task2_crontab')
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name='my_task2_crontab')
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

    控制臺實際操作記錄

    如何使用Python?Celery動態添加定時任務

    2.周期性任務的暫停/啟動

    2.1 設置my_taks2_crontab 暫停任務

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()

    查看worker輸出:

    如何使用Python?Celery動態添加定時任務

    可以看到worker從19:31以后已經沒有輸出了,說明已經成功吧my_task2_crontab 任務暫停

    2.2 設置my_task2_crontab 開啟任務

    把任務的 enabled 為 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()

    查看worker輸出:

    如何使用Python?Celery動態添加定時任務

    可以看到worker從19:36開始有輸出,說明已把my_task2_crontab 任務重新啟動

    3. 周期性任務的刪除

    獲取到指定的任務后調用delete(),再次查詢指定任務會發現已經不存在了

    PeriodicTask.objects.get(name='my_task2_crontab').delete()
    >>> PeriodicTask.objects.get(name='my_task2_crontab')
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

    “如何使用Python Celery動態添加定時任務”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    嘉黎县| 黔东| 白水县| 旬阳县| 杨浦区| 翁牛特旗| 清水河县| 喀什市| 肇庆市| 霍林郭勒市| 达拉特旗| 犍为县| 家居| 徐汇区| 亚东县| 沭阳县| 那坡县| 青海省| 吉林市| 江永县| 新丰县| 渝北区| 中山市| 丰顺县| 许昌市| 嘉善县| 昭苏县| 丁青县| 得荣县| 咸丰县| 高平市| 甘德县| 广东省| 临漳县| 略阳县| 郓城县| 道真| 尤溪县| 澜沧| 杭锦旗| 吉安县|