您好,登錄后才能下訂單哦!
這篇文章主要講解了Python并發concurrent.futures和asyncio的用法,內容清晰明了,對此有興趣的小伙伴可以學習一下,相信大家閱讀完之后會有幫助。
說明
Python標準庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼。
從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,concurrent.futures 模塊的主要特色是 ThreadPoolExecutor 和
ProcessPoolExecutor 類,這兩個類實現的接口能分別在不同的線程或進程中執行可調
用的對象。這兩個類在內部維護著一個工作線程或進程池,以及要執行的任務隊列。
Python 3.4 以后標準庫中asyncio 包,這個包使用事件循環驅動的協程實現并發。這是 Python 中最大也
是最具雄心壯志的庫之一。asyncio 大量使用 yield from 表達式,因此與
Python 舊版不兼容。
submit和map方法
submit方法作用是向線程池提交可回調的task,并返回一個回調實例。
example:
import time from concurrent.futures import ThreadPoolExecutor # 可回調的task def pub_task(msg): time.sleep(3) return msg # 創建一個線程池 pool = ThreadPoolExecutor(max_workers=3) # 往線程池加入2個task task1 = pool.submit(pub_task, 'a') task2 = pool.submit(pub_task, 'b') print(task1.done()) # False time.sleep(4) print(task2.done()) # True print(task1.result()) print(task2.result())
map方法是創建一個迭代器,回調的結果有序放在迭代器中。
問題:
Executor.map 函數易于使用,不過有個特性可能有用,也可能沒用,具體情況取決于需求:這個函數返回結果的順序與調用開始的順序一致。
如果第一個調用生成結果用時 10秒,而其他調用只用 1 秒,代碼會阻塞 10 秒,獲取 map 方法返回的生成器產出的第一個結果。
在此之后,獲取后續結果時不會阻塞,因為后續的調用已經結束。
如果必須等到獲取所有結果后再處理,這種行為沒問題;不過,通常更可取的方式是,不管提交的順序,只要有結果就獲取。
為此,要把 Executor.submit 方法和 futures.as_completed 函數結合起來使用。
from concurrent.futures import ThreadPoolExecutor import requests URLS = ['http://www.csdn.com', 'http://qq.com', 'http://www.leasonlove.cn'] def task(url, timeout=10): return requests.get(url, timeout=timeout) pool = ThreadPoolExecutor(max_workers=3) results = pool.map(task, URLS) for ret in results: print('%s, %s' % (ret.url, ret))
future異步編程
Future可以理解為一個在未來完成的操作,這是異步編程的基礎。通常情況下,我們執行io操作,訪問url時(如下)在等待結果返回之前會產生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed import requests URLS = ['http://www.csdn.cn', 'http://qq.com', 'http://www.leasonlove.cn'] def task(url, timeout=1): return requests.get(url, timeout=timeout) with ThreadPoolExecutor(max_workers=3) as executor: future_tasks = [executor.submit(task, url) for url in URLS] for f in future_tasks: if f.running(): print('%s is running' % str(f)) for f in as_completed(future_tasks): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, f_ret.content)) except Exception as e: # 第一個url無響應 f.cancel() print(str(e))
asyncio庫協程實現并發
對于gevent 和 asyncio 建議大家放棄Gevent,擁抱asyncio,asyncio是Python3.4以后標準庫。
而且由于Gevent直接修改標準庫里面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變為協作式運行。
但是我們無法保證你在復雜的生產環境中有哪些地方使用這些標準庫會由于打了補丁而出現奇怪的問題。
import asyncio import time start = time.time() async def do(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Finish after {}s'.format(x) task1 = do(1) task2 = do(2) task3 = do(4) tasks = [ asyncio.ensure_future(task1), asyncio.ensure_future(task2), asyncio.ensure_future(task3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task result: ', task.result()) end = time.time() print('TIME: ', end - start)
協程與線程
如果使用線程做過重要的編程,你就知道寫出程序有多么困難,因為調度程序任何時候都能中斷線程。
必須記住保留鎖,去保護程序中的重要部分,防止多步操作在執行的過程中中斷,防止數據處于無效狀態。
而協程默認會做好全方位保護,以防止中斷。我們必須顯式產出才能讓程序的余下部分運行。
對協程來說,無需保留鎖,在多個線程之間同步操作,協程自身就會同步,因為在任意時刻只有一個協程運行。
想交出控制權時,可以使用 yield 或 yield from 把控制權交還調度程序。
這就是能夠安全地取消協程的原因:按照定義,協程只能在暫停的 yield處取消,因此可以處理 CancelledError 異常,執行清理操作。
補充知識:Python-什么時候使用yield?
簡介
很多時候在python代碼中見到了yield,沒有系統學習過,自己也沒有用過。
yield語句延遲了語句的執行,然后發送了一個值給調用者,但保留了一定的狀態去保證函數離開之后可以繼續。當繼續的時候,函數繼續執行上一個的運行狀態。這使得它的代碼可以隨著時間產生一系列的值,而不是立即執行,然后像一個list一樣發送他們回來。
例子
例子1:
# A Simple Python program to demonstrate working # of yield # A generator function that yields 1 for first time, # 2 second time and 3 third time def simpleGeneratorFun(): yield 1 yield 2 yield 3 # Driver code to check above generator function for value in simpleGeneratorFun(): print(value)
返回語句發送一個特殊的值給它的調用者,而yield產生了一系列的值,當我們想要遍歷一個序列的時候,我們應該使用yield,但不想要把整個序列存儲在內存中。
yield用于python的生成器(generator)。一個genertator 被定義得看起來像一個普通函數一樣,但它需要產生一個數字得時候,它使用yield,而不是使用return。如果一個函數里面定義了yield,那么它自動稱為了一個generator函數。、
例子2:
# A Python program to generate squares from 1 # to 100 using yield and therefore generator # An infinite generator function that prints # next square number. It starts with 1 def nextSquare(): i = 1; # An Infinite loop to generate squares while True: yield i*i i += 1 # Next execution resumes # from this point # Driver code to test above generator # function for num in nextSquare(): if num > 100: break print(num)
輸出1,4,9…100
看完上述內容,是不是對Python并發concurrent.futures和asyncio的用法有進一步的了解,如果還想學習更多內容,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。