您好,登錄后才能下訂單哦!
python的multiprocessing模塊是跨平臺的多進程模塊,multiprocessing具有創建子進程,進程間通信,隊列,事件,鎖等功能,multiprocessing模塊包含Process,Queue,Pipe,Lock等多個組件。
創建進程的類
Process([group [, target [, name [, args [, kwargs]]]]])
參數介紹:
group參數未使用,值始終為None
target表示調用對象,即子進程要執行的任務
args表示調用對象的位置參數元組,args=()
kwargs表示調用對象的字典,kwargs={'key':'value'}
name為子進程的名稱
Note:需要使用關鍵字方式指定參數
from multiprocessing import Process
def func():
print("first process")
if __name__ == '__main__':
# 創建進程對象,主進程和子進程是異步執行的
p = Process(target=func)
# 開啟進程
p.start()
from multiprocessing import Process
def func(*args,**kwargs):
print("IPADDR:%s PORT:%d"%args)
for k in kwargs:
print("%s --> %s"%(k,kwargs[k]))
if __name__ == '__main__':
# 創建進程對象,并傳遞參數
p = Process(target=func,args=('127.0.0.1',8080),kwargs={'key':'value'})
# 如果主進程中的代碼已經結束了,子進程還沒結束,主進程會等待子進程
# 開啟進程
p.start()
import os
from multiprocessing import Process
def func():
# os模塊的getpid方法可以獲取當前進程的pid,getppid方法可以獲取當前進程的父進程的pid
print("子進程pid:%s,父進程pid:%s"%(os.getpid(),os.getppid()))
if __name__ == '__main__':
p_l = []
# 創建多個進程
for i in range(10):
p = Process(target=func)
p.start()
p_l.append(p)
# 異步執行子進程,最后執行主進程中的代碼
for p in p_l:
p.join() # 阻塞,使主進程等待子進程結束
print("------主進程------")
結果:
子進程pid:9944,父進程pid:1484
子進程pid:8932,父進程pid:1484
子進程pid:8504,父進程pid:1484
子進程pid:14884,父進程pid:1484
子進程pid:4828,父進程pid:1484
子進程pid:14644,父進程pid:1484
子進程pid:14908,父進程pid:1484
子進程pid:1980,父進程pid:1484
子進程pid:14604,父進程pid:1484
子進程pid:10008,父進程pid:1484
------主進程------
Note :因為在windows操作系統中,沒有fork(),在創建子進程的時候會自動運行啟動它的文件中的所有代碼,因此必須將創建子進程的語句寫在ifname=='main':條件語句下。
import os
from multiprocessing import Process
class MyProcess(Process): # 必須繼承Process類
def __init__(self,arg1,arg2,arg3):
'''
繼承父類的初始化方法,加上自己需要的參數
:param arg1:
:param arg2:
:param arg3:
'''
super().__init__()
self.arg1 = arg1
self.arg2 = arg2
self.arg3 = arg3
def run(self):
'''
必須要有run方法的實現
:return:
'''
print('子進程:%d ,父進程:%s '%(os.getpid(),os.getppid()),self.arg1,self.arg2,self.arg3)
self.walk() # walk方法在子進程中執行
def walk(self):
print('子進程:%d'%os.getpid())
if __name__ == '__main__':
p = MyProcess(1,2,3)
p.start() # 會默認調用run方法
p.walk() # walk方法直接在主進程中調用,并沒有在子進程中執行
print('父進程:%d '%os.getpid())
結果:
子進程:1220
父進程:1220
子進程:2164 ,父進程:1220 1 2 3
子進程:2164
在為開啟daemon前,主進程會等待子進程結束在結束;
開啟daemon后,程序會在主進程結束時結束子進程
import time
from multiprocessing import Process
def cal_time(second):
while True:
print("current time:%s"%time.ctime())
time.sleep(second)
if __name__ == '__main__':
p = Process(target=cal_time,args=(1,))
'''
守護進程的作用:會隨著主進程代碼執行結束而結束
守護進程要在start前設置
守護進程中不能再開啟子進程
'''
p.daemon = True
p.start()
for i in range(10):
time.sleep(0.2)
print('*'*i)
未開啟daemon結果:子進程一直在運行
current time:Tue Feb 12 17:48:44 2019
*
**
***
****
current time:Tue Feb 12 17:48:45 2019
*****
******
*******
********
*********
current time:Tue Feb 12 17:48:46 2019
current time:Tue Feb 12 17:48:47 2019
current time:Tue Feb 12 17:48:48 2019
current time:Tue Feb 12 17:48:49 2019
開啟daemon后結果:主進程結束程序就結束了
current time:Tue Feb 12 17:49:14 2019
*
**
***
****
current time:Tue Feb 12 17:49:15 2019
*****
******
*******
********
*********
name:查看進程名
pid:查看進程id
is_alive:查看進程是否正在運行
terminate:結束進程
import time
from multiprocessing import Process
def func():
print("start")
time.sleep(3)
print("end")
if __name__ == '__main__':
p = Process(target=func)
p.start()
time.sleep(3)
print("進程名:%s,進程id:%s"%(p.name,p.pid))
# is_alive方法查看進程是否正在運行
print(p.is_alive())
# terminate方法結束進程
p.terminate()
time.sleep(3)
print(p.is_alive())
結果:
start
進程名:Process-1,進程id:17564
True
False
進程鎖:當多個進程訪問共享資源時,進程鎖保證同一時間只能有一個任務可以進行修改,程序的運行方法有并發改為串行,這樣速度慢了,但是保證了數據的安全
import os
import time
import random
from multiprocessing import Process,Lock
def func(lock,n):
lock.acquire() #加鎖
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s: %s is done' % (n, os.getpid()))
lock.release() #釋放
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=func,args=(lock,i))
p.start()
信號量:Lock(鎖)可以保證同一時間只能有一個任務對共享數據進行操作,而Semaphore(信號量)可以在同一時間讓指定數量的進程操作共享數據。
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore
'''
迷你唱吧,20個人,同一時間只能有4個人進去
'''
def sing(i,sem):
sem.acquire() # 加鎖
print('%s enter the ktv'%i)
time.sleep(random.randint(1,10))
print('%s leave the ktv'%i)
sem.release() # 釋放
if __name__ == '__main__':
sem = Semaphore(4)
for i in range(20):
p = Process(target=sing,args=(i,sem))
p.start()
事件:Event是進程之間的狀態標記通信,因為進程不共享數據,所以事件對象需要以參數形式傳遞到函數中使用。
e = Event() # 實例化一個事件對象
e.set() # 標記變為非阻塞
e.wait() # 默認標記為阻塞,在等待的過程中,遇到非阻塞信號就繼續執行
e.clear() # 標記變為阻塞
e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞
import time
import random
from multiprocessing import Event
from multiprocessing import Process
def traffic_light(e):
while True:
if e.is_set(): # True為綠燈
time.sleep(3) # 等3秒后變為紅燈
print("紅燈亮")
e.clear()
else: # False為紅燈,等3秒后變為綠燈
time.sleep(3)
print("綠燈亮")
e.set()
def car(i,e):
e.wait() # 默認是紅燈
print("%s 車通過"%i)
if __name__ == '__main__':
e = Event()
# 控制紅綠燈的進程
tra = Process(target=traffic_light,args=(e,))
tra.start()
for i in range(100):
if i%6 == 0:
time.sleep(random.randint(1,3))
p = Process(target=car,args=(i,e))
p.start()
管道是進程間通信(IPC)的一種,管道是雙向通信的,但它不保證數據安全
創建管道:p1,p2=Pipe()
send():發送數據
recv():接收數據
close():關閉
def func(p):
foo,son = p
foo.close() # 不使用主進程的管道一端,先行關閉
while True:
try:
print(son.recv())
# 子進程在結束數據時,如果管道無數據,且對端沒有close,就會報EOFError;如果管道無數據,對端沒close,進程會阻塞
except EOFError:
break
if __name__ == '__main__':
foo,son = Pipe()
p = Process(target=func,args=((foo,son),))
p.start()
son.close() # 不使用子進程的管道一端,先行關閉
foo.send("hello")
foo.send("hello")
foo.close()
隊列:進程之間是獨立的,要實現進程間通信(IPC);multiprocessing模塊支持兩種形式:隊列(queue)和管道(pipe),這兩種方式都是使用消息傳遞的,且都是雙向通信的,Queue = Pipe+Lock。
q = Queue() # 創建隊列對象,無長度限制
q1 = Queue(3) # 傳參數,創建一個有最大長度限制的隊列
q.put(1) # 放入一個數據,對于無長度限制的隊列來說,永不阻塞;對于有長度限制的隊列來說,放滿就阻塞
q.get() # 隊列中有數據就取出一個數據,隊列中無數據就會阻塞;遵循先進先出原則
q.qsize() # 查看隊列的數據大小,不一定準確
from multiprocessing import Process
from multiprocessing import Queue
def queue_put(q):
q.put("123") # 子進程隊列中放入一個變量
if __name__ == '__main__':
q = Queue()
p = Process(target=queue_put,args=(q,))
p.start()
print(q.get()) # 主進程獲取到變量
示例2:子進程與子進程之間的通信
from multiprocessing import Process
from multiprocessing import Queue
def queue_put(q):
q.put("123") # 子進程隊列中放入一個變量
def queue_get(q):
print(q.get()) # 另一個子進程獲取到隊列中的數據
if __name__ == '__main__':
q = Queue()
p = Process(target=queue_put,args=(q,))
p.start()
p1 = Process(target=queue_get,args=(q,))
p1.start()
JoinableQueue也是multiprocessing模塊的一種隊列的實現,但它與Queue不同的是JoinableQueue允許項目的使用者通知生成者項目已經被成功處理。創建方式同Queue。
主要方法:put與get與Queue一致
? ? q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常
? ? q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
import time
import random
from multiprocessing import JoinableQueue
from multiprocessing import Process
'''
程序執行流程
1、生產者生產的數據全部被消費 --> 2、生產者進程結束 --> 3、主進程代碼執行結束 --> 4、消費者守護進程結束
'''
def producer(q,food):
for i in range(5):
q.put("%s -- %s"%(i,food))
print("生產了 %s"%(food))
time.sleep(random.random())
q.join() # 2、等待消費者消費完所有數據
def consumer(q,name):
while True:
food = q.get()
if food == None:break
print("%s 吃了 %s"%(name,food))
q.task_done() # 1、消費者每消費一個數據就返回一個task_done給生產者
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer,args=(q,'youtiao'))
p1.start()
p2 = Process(target=producer,args=(q,'baozi'))
p2.start()
c1 = Process(target=consumer,args=(q,'daxiong'))
c1.daemon = True # 4、消費者守護進程結束
c1.start()
c2 = Process(target=consumer,args=(q,'chenglei'))
c2.daemon = True
c2.start()
c3 = Process(target=consumer,args=(q,'niu'))
c3.daemon = True
c3.start()
p1.join() # 3、等待p1執行完畢
p2.join() # 3、等待p2執行完畢
Manager也是multiprocessing模塊的一個類,這個類主要提供了進程間通信(IPC)的一個機制,它支持Python所有的數據類型,但不提供數據安全的機制。
from multiprocessing import Manager
from multiprocessing import Process
def func(d):
print(d)
d['num'] -= 10
if __name__ == '__main__':
m = Manager()
d = m.dict({'num':100})
l = []
for i in range(10):
p = Process(target=func,args=(d,))
p.start()
# p.join() # 同步
l.append(p)
for j in l:
j.join() # 異步
結果:
{'num': 100}
{'num': 90}
{'num': 80}
{'num': 70}
{'num': 60}
{'num': 50}
{'num': 40}
{'num': 30}
{'num': 20}
{'num': 10}
在執行大量并發任務時,多進程是行之有效的手段之一,但是多進程需要注意幾個問題,一是操作系統不可能無限開啟進程,一般是有幾個核開啟幾個進程,二是開啟進程過多,系統資源占用過多,會導致系統運行速度變慢;那么遇到這種情況時pool(進程池)便是最好的解決方案。
Pool可以指定開啟一定數量的進程(一般為CPU核數+1個)等待用戶使用,當有新的請求進入時,如果池中有空閑進程,便直接開啟;如果池中的進程都在使用,那么該請求就會等待,直到池中有進程結束,重用該進程。
import time
from multiprocessing import Process
from multiprocessing import Pool
def func(i):
i -= 1
if __name__ == '__main__':
# 計算進程池所需事件
start1_time = time.time() # 開始時間
p = Pool(5) # 進程池中創建5個進程
p.map(func,range(100)) # 調用進程執行任務,target = func args = (1,2,3...),第二個參數要是可迭代對象
p.close() # 不允許再向進程池中添加任務
p.join() # 等待進程池中所有進程執行結束
stop1_time = time.time() - start1_time # 結束時間
print("進程池所需時間: %s "%stop1_time)
# 計算多進程所需時間
start2_time = time.time() # 開始時間
l = []
for i in range(100):
p1 = Process(target=func,args=(i,))
p1.start()
l.append(p)
for j in l:
j.join()
stop2_time = time.time() - start2_time
print("多進程所需時間: %s"%stop2_time)
結果:
進程池所需時間: 0.19990277290344238
多進程所需時間: 1.7190303802490234
由上可知,進程池在執行大量并發任務時的效率。
map(self, func, iterable, chunksize=None):將func
應用于iterable
中的每個元素,收集結果在返回的列表中。
map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):異步的map
apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):異步提交任務的機制
apply(self, func, args=(), kwds={}):同步提交任務的機制
close():不允許再提交新的任務
join():等待進程池中的進程執行結束在往下執行,此方法只能在close()或teminate()之后調用
執行apply或apply_async方法時,會返回ApplyResult類的實例對象
ApplyResult類有以下方法:
obj.get():獲取進程的返回值
obj.ready():調用完成時,返回True
obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發ValueError異常
obj.wait([timeout]):等待結果變為可用
import time
from multiprocessing import Pool
'''
apply:同步提交任務的機制
apply_async:異步提交任務機制
'''
def func(i):
time.sleep(1)
i += 1
print(i)
if __name__ == '__main__':
p = Pool(5)
res_l = []
for i in range(20):
# p.apply(func,args=(i,)) # 同步,執行完畢立即獲取到返回值
res = p.apply_async(func,args=(i,)) # 異步,通過get獲取返回值
res_l.append(res)
p.close() # 不允許再提交新的任務
p.join() # 等待進程池中的進程執行結束在往下執行
for res in res_l:
print(res.get()) # 使用get來獲取apply_aync的結果
在進程池中,一個進程任務結束就會返回一個結果,主進程則調用一個函數去處理這個結果,這就是回調函數。回調函數是在主進程中完成的,不能傳參數,只能接受多進程中函數的返回值;
在爬蟲中,使用回調比較多,爬蟲將訪問網頁、下載網頁的過程放到子進程中去做,分析數據,處理數據讓回調函數去做,因為訪問網頁與下載網頁有網絡延時,而處理數據只占用很小的時間
import requests
from multiprocessing import Pool
def get(url):
ret = requests.get(url)
return {'url':url,
'status_code':ret.status_code,
'content':ret.text}
def parser(dic):
print(dic['url'],len(dic['content']))
parse_url = "URL:%s Size:%s"%(dic['url'],len(dic['content']))
with open('url.txt','a') as f:
f.write(parse_url+'\n')
if __name__ == '__main__':
url_l = [
'http://www.baidu.com',
'http://www.google.com',
'https://zh.wikipedia.org/wiki/Wikipedia:%E9%A6%96%E9%A1%B5',
'https://www.youtube.com/?app=desktop',
'https://www.facebook.com/'
]
p = Pool(5)
for i in url_l:
p.apply_async(get,args=(i,),callback=parser)
p.close()
p.join()
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。