您好,登錄后才能下訂單哦!
一、線程池
1、為什么需要使用線程池
1.1 創建/銷毀線程伴隨著系統開銷,過于頻繁的創建/銷毀線程,會很大程度上影響處理效率。
記創建線程消耗時間T1,執行任務消耗時間T2,銷毀線程消耗時間T3,如果T1+T3>T2,那說明開啟一個線程來執行這個任務太不劃算了!在線程池緩存線程可用已有的閑置線程來執行新任務,避免了創建/銷毀帶來的系統開銷。
1.2 線程并發數量過多,搶占系統資源從而導致阻塞。
線程能共享系統資源,如果同時執行的線程過多,就有可能導致系統資源不足而產生阻塞的情況。
1.3 對線程進行一些簡單的管理。
比如:延時執行、定時循環執行的策略等,運用線程池都能進行很好的實現。
2、Python中建立線程池的方法
2.1 使用threadpool模塊,這是個python的第三方模塊,支持python2和python3
2.2 使用concurrent.futures模塊,這個模塊是python3中自帶的模塊,python2.7以上版本也可以安裝使用
2.3 自己構建一個線程池
二、隊列(queue)
Queue模塊提供的隊列(FIFO)適用于多線程編程,在生產者(producer)和消費者(consumer)之間線程安全(thread-safe)地傳遞消息或其它數據,因此多個線程可以共用同一個Queue實例。常用方法:
Queue.qsize():返回queue的大小。
Queue.empty():判斷隊列是否為空,通常不太靠譜。
Queue.full():判斷是否滿了。
Queue.put(item, block=True, timeout=None): 往隊列里放數據。
Queue.put_nowait(item):往隊列里存放元素,不等待
Queue.get(item, block=True, timeout=None): 從隊列里取數據。
Queue.get_nowait(item):從隊列里取元素,不等待
Queue.task_done():表示隊列中某個元素是否的使用情況,使用結束會發送信息。
Queue.join():一直阻塞直到隊列中的所有元素都執行完畢。
三、使用threading+Queue處理多任務
假設有十個任務需要處理,打算在后臺開啟五個線程,簡化后的模型
import Queue import threading import time queue = Queue.Queue() class ThreadNum(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: #消費者端,從隊列中獲取num num = self.queue.get() print("Retrieved", num) time.sleep(1) #在完成這項工作之后,使用 queue.task_done() 函數向任務已經完成的隊列發送一個信號 self.queue.task_done() print("Consumer Finished") def main(): #產生一個 threads pool, 并把消息傳遞給thread函數進行處理,這里開啟10個并發 for i in range(5): t = ThreadNum(queue) t.setDaemon(True) t.start() #往隊列中填數據 for num in range(10): queue.put(num) #wait on the queue until everything has been processed queue.join() if __name__ == '__main__': main() time.sleep(500)
輸出為:
('Retrieved', 0) ('Retrieved', 1)('Retrieved', 2) ('Retrieved', 3) ('Retrieved', 4) ('Retrieved', 5)('Retrieved', 6) ('Retrieved', 7) ('Retrieved', 8) ('Retrieved', 9)
具體工作步驟描述如下:
1、創建一個 Queue.Queue() 的實例,然后使用數據對它進行填充。
2、將經過填充數據的實例傳遞給線程類,后者是通過繼承 threading.Thread 的方式創建的。
3、生成守護線程池。
4、每次從隊列中取出一個項目,并使用該線程中的數據和 run 方法以執行相應的工作。
5、在完成這項工作之后,使用 queue.task_done() 函數向任務已經完成的隊列發送一個信號。
6、對隊列執行 join 操作,實際上意味著等到隊列為空,再退出主程序。
在使用這個模式時需要注意一點:通過將守護線程設置為 true,程序運行完自動退出。好處是在退出之前,可以對隊列執行 join 操作、或者等到隊列為空。
注意運行main函數后繼續執行time.sleep(500),可以觀察到主線程未結束的情況下ThreadNum(queue)生成的線程還在運行。如果需要停止線程的話可以對以上代碼加以修改。
import Queue import threading import time queue = Queue.Queue() class ThreadNum(threading.Thread): """沒打印一個數字等待1秒,并發打印10個數字需要多少秒?""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): done = False while not done: #消費者端,從隊列中獲取num num = self.queue.get() if num is None: done = True else: print("Retrieved", num) time.sleep(1) #在完成這項工作之后,使用 queue.task_done() 函數向任務已經完成的隊列發送一個信號 self.queue.task_done() print("Consumer Finished") def main(): #產生一個 threads pool, 并把消息傳遞給thread函數進行處理,這里開啟10個并發 for i in range(5): t = ThreadNum(queue) t.setDaemon(True) t.start() #往隊列中填錯數據 for num in range(10): queue.put(num) queue.join() time.sleep(100) for i in range(10): queue.put(None) print('None') time.sleep(200) if __name__ == '__main__': start = time.time() main() print"Elapsed Time: %s" % (time.time() - start)
main函數執行完后隊列向線程發送None消息,觸發線程的停止標識,這樣就可以動態管理線程池了。
以上這篇Python 使用threading+Queue實現線程池示例就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。