您好,登錄后才能下訂單哦!
本篇文章為大家展示了Django異步任務線程池實現原理,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
請求任務異步處理的原理
使用python manage.py runserver模式啟動的Django應用只有一個進程,對于每個請求,主線程會開啟一個子線程來處理請求。請求子線程向主線程申請一個新線程,然后把耗時的任務交給新線程,自身立即響應,這就是請求任務異步處理的原理。
可視化線程池
如果想要管理這批異步線程,知道他們是否在運行中,可以使用線程池(ThreadPoolExecutor)。
線程池會先啟動若干數量的線程,并讓這些線程都處于睡眠狀態,當向線程池submit一個任務后,會喚醒線程池中的某一個睡眠線程,讓它來處理這個任務,當處理完這個任務,線程又處于睡眠狀態。
submit任務后會返回一個期程(future),這個對象可以查看線程池中執行此任務的線程是否仍在處理中
因此可以構建一個全局可視化線程池:
from concurrent.futures.thread import ThreadPoolExecutor class ThreadPool(object): def __init__(self): # 線程池 self.executor = ThreadPoolExecutor(20) # 用于存儲每個項目批量任務的期程 self.future_dict = {} # 檢查某個項目是否有正在運行的批量任務 def is_project_thread_running(self, project_id): future = self.future_dict.get(project_id, None) if future and future.running(): # 存在正在運行的批量任務 return True return False # 展示所有的異步任務 def check_future(self): data = {} for project_id, future in self.future_dict.items(): data[project_id] = future.running() return data def __del__(self): self.executor.shutdown() # 主線程中的全局線程池 # global_thread_pool的生命周期是Django主線程運行的生命周期 global_thread_pool = ThreadPool()
使用:
# 檢查異步任務 if global_thread_pool.is_project_thread_running(project_id): raise exceptions.ValidationError(detail='存在正在處理的批量任務,請稍后重試') # 提交一個異步任務 future = global_thread_pool.executor.submit(self.batch_thread, project_id) global_thread_pool.future_dict[project_id] = future # 查看所有異步任務 @login_required def check_future(request): data = global_thread_pool.check_future() return HttpResponse(status=status.HTTP_200_OK, content=json.dumps(data))
串行執行
使用線程鎖
在全局線程池中初始化線程鎖
class ThreadPool(object): def __init__(self): self.executor = ThreadPoolExecutor(20) self.future_dict = {} self.lock = threading.Lock()
然后執行線程前需要獲取鎖并再執行結束后釋放鎖
def batch_thread(self): global_thread_pool.lock.acquire() try: ... global_thread_pool.lock.release() except Exception: trace_log = traceback.format_exc() logger.error('異步任務執行失敗:\n %s' % trace_log) global_thread_pool.lock.release()
需要捕捉異常預防子線程出錯而無法釋放鎖的情況
異步線程任務執行前先檢查數據庫連接是否可用,然后關掉不可用連接
由于django的數據庫連接是保存到線程本地變量中的,通過ThreadPoolExecutor創建的線程會保存各自的數據庫連接。
當連接被保存的時間超過mysql連接的最大超時時間,連接失效,但不會被線程釋放。
之后再調起線程執行涉及到數據庫操作的異步任務時,會用到失效的數據庫連接,導致報錯“MySQL server has gone away”。
解決方案是在線程池的所有異步任務執行前先檢查數據庫連接是否可用,然后關掉不可用連接
def batch_thread(self): for conn in connections.all(): conn.close_if_unusable_or_obsolete() ...
上述內容就是Django異步任務線程池實現原理,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。