您好,登錄后才能下訂單哦!
小編給大家分享一下Python如何使用定時調度任務,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
使用簡單循環來實現調度任務這是毫不費力的。使用無限運行的 while
循環定期調用函數可用于調度作業,但這不是最好的方法,不過它是很有效的。可以使用內置time模塊的slleep()
來延遲執行。不過這并不是大多數作業的調度方式,因為,它看起來很難看,而且與其他方法相比,它的可讀性較差。
import time def task(): print("Job Completed!") while 1: task() time.sleep(10)
當涉及到每天早上 9:00 或每周三晚上 7:45 等這些日程安排時,事情就變得比較棘手了。
import datetime def task(): print("Job Completed!") while 1: now = datetime.datetime.now() # schedule at every wednesday,7:45 pm if now.weekday == 3 and now.strftime("%H:%m") == "19:45": task() # sleep for 6 days time.sleep(6 * 24 * 60 * 60)
這是我的第一時間想到的解決辦法,不用謝!這種方法的一個問題是這里的邏輯是阻塞的,即一旦在 python
項目中發現這段代碼,它就會卡在 while 1 循環中,從而阻塞其他代碼的執行。
線程是計算機科學中的一個概念。具有自己指令的小程序由進程執行并獨立管理,這就可以解決我們第一種方法的阻塞情況,讓我們看看怎么樣。
import time import threading def task(): print("Job Completed!") def schedule(): while 1: task() time.sleep(10) # makes our logic non blocking thread = threading.Thread(target=schedule) thread.start()
線程啟動后,其底層邏輯無法被主線程修改,因此我們可能需要添加資源,程序通過這些資源可以檢查特定場景并根據它們執行邏輯。
早些時候,我說使用 while
循環進行調度看起來很丑陋,調度庫可以解決這個問題。
import schedule import time def task(): print("Job Executing!") # for every n minutes schedule.every(10).minutes.do(task) # every hour schedule.every().hour.do(task) # every daya at specific time schedule.every().day.at("10:30").do(task) # schedule by name of day schedule.every().monday.do(task) # name of day with time schedule.every().wednesday.at("13:15").do(task) while True: schedule.run_pending() time.sleep(1)
正如您所見,通過這樣我們可以毫不費力地創建多個調度計劃。我特別喜歡創建作業的方式和方法鏈(Method Chaining
),另一方面,這個片段有一個 while
循環,這意味著代碼被阻塞,不過我相信你已經知道什么可以幫助我們解決這個問題。
Liunx
中的 crontab
實用程序是一種易于使用且被廣泛接受的調度解決方案。Python
庫python-crontab
提供了一個 API 來使用 Python
中的 CLI
工具。在crontab
中,一個定時調度使用 unix-cron
字符串格式( *)來描述,它是一組五個值的一條線,這表明當作業應該被執行時,python-crontab
將在文件中寫入 crontab
的計劃轉換為寫入編程方法。
from crontab import CronTab cron = CronTab(user='root') job = cron.new(command='my_script.sh') job.hour.every(1) cron.write()
python-crontab
不會自動保存計劃,需要執行 write()
方法來保存計劃。還有更多功能,我強烈建議您查看他們的文檔。
有些任務不能立即執行,因此我們需要根據 LIFO
或 FIFO
等隊列系統創建任務隊列并彈出任務。python-rq
允許我們做到這一點,使用 Redis
作為代理來排隊作業。新作業的條目存儲為帶有信息的哈希映射,例如created_at
, enqueued_at
, origin
, data
, description
.
排隊任務由名為 worker
的程序執行。workers
在 Redis
緩存中也有一個條目,負責將任務出列以及更新 Redis 中的任務狀態。任務可以在需要時排隊,但要安排它們,我們需要rq-scheduler
。
from rq_scheduler import Scheduler queue = Queue('circle', connection=Redis()) scheduler = Scheduler(queue=queue) scheduler.schedule( scheduled_time=datetime.utcnow(), # Time for first execution, in UTC timezone func=func, # Function to be queued args=[arg1, arg2], # Arguments passed into function when executed kwargs={'foo': 'bar'}, # Keyword arguments passed into function when executed interval=60, # Time before the function is called again, in seconds repeat=None, # Repeat this number of times (None means repeat forever) meta={'foo': 'bar'} # Arbitrary pickleable data on the job itself )
RQ worker
(RQ 工作器)必須在終端中單獨啟動或通過 python-rq
工作器啟動。一旦任務被觸發,就可以在工作終端中看到,在成功和失敗場景中都可以使用單獨的函數回調。
以上是“Python如何使用定時調度任務”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。