您好,登錄后才能下訂單哦!
這篇文章主要介紹了Python中消息隊列與進程池的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
Queue消息隊列
1.創建
import multiprocessing queue = multiprocessing.Queue(隊列長度)
2.方法
方法 | 描述 |
---|---|
put | 變量名.put(數據),放入數據(如隊列已滿,則程序進入阻塞狀態,等待隊列取出后再放入) |
put_nowait | 變量名.put_nowati(數據),放入數據(如隊列已滿,則不等待隊列信息取出后再放入,直接報錯) |
get | 變量名.get(數據),取出數據(如隊列為空,則程序進入阻塞狀態,等待隊列防如數據后再取出) |
get_nowait | 變量名.get_nowait(數據),取出數據(如隊列為空,則不等待隊列放入信息后取出數據,直接報錯),放入數據后立馬判斷是否為空有時為True,原因是放入值和判斷同時進行 |
qsize | 變量名.qsize(),消息數量 |
empty | 變量名.empty()(返回值為True或False),判斷是否為空 |
full | 變量名.full()(返回值為True或False),判斷是否為滿 |
3.進程通信
因為進程間不共享全局變量,所以使用Queue進行數據通信,可以在父進程中創建兩個字進程,一個往Queue里寫數據,一個從Queue里取出數據。
例:
import multiprocessing import time def write_queue(queue): # 循環寫入數據 for i in range(10): if queue.full(): print("隊列已滿!") break # 向隊列中放入消息 queue.put(i) print(i) time.sleep(0.5) def read_queue(queue): # 循環讀取隊列消息 while True: # 隊列為空,停止讀取 if queue.empty(): print("---隊列已空---") break # 讀取消息并輸出 result = queue.get() print(result) if __name__ == '__main__': # 創建消息隊列 queue = multiprocessing.Queue(3) # 創建子進程 p1 = multiprocessing.Process(target=write_queue, args=(queue,)) p1.start() # 等待p1寫數據進程執行結束后,再往下執行 p1.join() p1 = multiprocessing.Process(target=read_queue, args=(queue,)) p1.start()
執行結果:
Pool進程池
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那么該請求就會等待,直到池中有進程結束,才會用之前的進程來執行新的任務。
1.創建
import multiprocessing pool = multiprocessing.Pool(最大進程數)
2.方法
方法 | 描述 |
---|---|
apply() | 以同步方式添加進程 |
apply_async() | 以異步方式添加進程 |
close() | 關閉Pool,使其不接受新任務(還可以使用) |
terminate() | 不管任務是否完成,立即終止 |
join() | 主進程阻塞,等待子進程的退出,必須在close和terminate后使用 |
3.進程池內通信
創建進程池內Queue消息隊列通信
import multiprocessing Queue:queue = multiprocessing.Manager().Queue()
例:
import multiprocessing import time
寫入數據的方法
def write_data(queue): # for循環 向消息隊列中寫入值 for i in range(5): # 添加消息 queue.put(i) print(i) time.sleep(0.2) print("隊列已滿~")
創建讀取數據的方法
def read_data(queue): # 循環讀取數據 while True: # 判斷隊列是否為空 if queue.qsize() == 0: print("隊列為空~") break # 從隊列中讀取數據 result = queue.get() print(result) if __name__ == '__main__': # 創建進程池 pool = multiprocessing.Pool(2) # 創建進程池隊列 queue = multiprocessing.Manager().Queue() # 在進程池中的進程間進行通信 # 使用線程池同步的方式,先寫后讀 # pool.apply(write_data, (queue, )) # pool.apply(read_data, (queue, )) # apply_async() 返回ApplyResult 對象 result = pool.apply_async(write_data, (queue, )) # ApplyResult對象的wait() 方法,表示后續進程必須等待當前進程執行完再繼續 result.wait() pool.apply_async(read_data, (queue, )) pool.close() # 異步后,主線程不再等待子進程執行結束,再結束 # join() 后,表示主線程會等待子進程執行結束后,再結束 pool.join()
運行結果:
4.案例(文件夾copy器)
代碼:
# 導入模塊 import os import multiprocessing # 拷貝文件函數 def copy_dir(file_name, source_dir, desk_dir): # 要拷貝的文件路徑 source_path = source_dir+'/'+file_name # 目標路徑 desk_path = desk_dir+'/'+file_name # 獲取文件大小 file_size = os.path.getsize(source_path) # 記錄拷貝次數 i = 0 # 以二進制度讀方式打開原文件 with open(source_path, "rb") as source_file: # 以二進制寫入方式創建并打開目標文件 with open(desk_path, "wb") as desk_file: # 循環寫入 while True: # 讀取1024字節 file_data = source_file.read(1024) # 如果讀到的不為空,則將讀到的寫入目標文件 if file_data: desk_file.write(file_data) # 讀取次數+1 i += 1 # 拷貝百分比進度等于拷貝次數*1024*100/文件大小 n = i*102400/file_size if n >= 100: n = 100 print(file_name, "拷貝進度%.2f%%" % n) else: print(file_name, "拷貝成功") break if __name__ == '__main__': # 要拷貝的文件夾 source_dir = 'test' # 要拷貝到的路徑 desk_dir = 'C:/Users/Administrator/Desktop/'+source_dir # 存在文件夾則不創建 try: os.mkdir(desk_dir) except: print("目標文件夾已存在,未創建") # 獲取文件夾內文件目錄,存到列表里 file_list = os.listdir(source_dir) print(file_list) # 創建進程池,最多同時運行3個子進程 pool = multiprocessing.Pool(3) for file_name in file_list: # 異步方式添加到進程池內 pool.apply_async(copy_dir, args=(file_name, source_dir, desk_dir)) # 關閉進程池(停止添加,已添加的還可運行) pool.close() # 讓主進程阻塞,等待子進程結束 pool.join()
運行結果:
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Python中消息隊列與進程池的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。