您好,登錄后才能下訂單哦!
這篇文章主要講解了Python如何使用生成器代替線程,內容清晰明了,對此有興趣的小伙伴可以學習一下,相信大家閱讀完之后會有幫助。
問題
你想使用生成器(協程)替代系統線程來實現并發。這個有時又被稱為用戶級線程或綠色線程。
解決方案
要使用生成器實現自己的并發,你首先要對生成器函數和 yield 語句有深刻理解。 yield 語句會讓一個生成器掛起它的執行,這樣就可以編寫一個調度器, 將生成器當做某種“任務”并使用任務協作切換來替換它們的執行。 要演示這種思想,考慮下面兩個使用簡單的 yield 語句的生成器函數:
# Two simple generator functions def countdown(n): while n > 0: print('T-minus', n) yield n -= 1 print('Blastoff!') def countup(n): x = 0 while x < n: print('Counting up', x) yield x += 1
這些函數在內部使用yield語句,下面是一個實現了簡單任務調度器的代碼:
from collections import deque class TaskScheduler: def __init__(self): self._task_queue = deque() def new_task(self, task): ''' Admit a newly started task to the scheduler ''' self._task_queue.append(task) def run(self): ''' Run until there are no more tasks ''' while self._task_queue: task = self._task_queue.popleft() try: # Run until the next yield statement next(task) self._task_queue.append(task) except StopIteration: # Generator is no longer executing pass # Example use sched = TaskScheduler() sched.new_task(countdown(10)) sched.new_task(countdown(5)) sched.new_task(countup(15)) sched.run()
TaskScheduler 類在一個循環中運行生成器集合——每個都運行到碰到yield語句為止。 運行這個例子,輸出如下:
T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
...
到此為止,我們實際上已經實現了一個“操作系統”的最小核心部分。 生成器函數就是任務,而yield語句是任務掛起的信號。 調度器循環檢查任務列表直到沒有任務要執行為止。
實際上,你可能想要使用生成器來實現簡單的并發。 那么,在實現actor或網絡服務器的時候你可以使用生成器來替代線程的使用。
下面的代碼演示了使用生成器來實現一個不依賴線程的actor:
from collections import deque class ActorScheduler: def __init__(self): self._actors = {} # Mapping of names to actors self._msg_queue = deque() # Message queue def new_actor(self, name, actor): ''' Admit a newly started actor to the scheduler and give it a name ''' self._msg_queue.append((actor,None)) self._actors[name] = actor def send(self, name, msg): ''' Send a message to a named actor ''' actor = self._actors.get(name) if actor: self._msg_queue.append((actor,msg)) def run(self): ''' Run as long as there are pending messages. ''' while self._msg_queue: actor, msg = self._msg_queue.popleft() try: actor.send(msg) except StopIteration: pass # Example use if __name__ == '__main__': def printer(): while True: msg = yield print('Got:', msg) def counter(sched): while True: # Receive the current count n = yield if n == 0: break # Send to the printer task sched.send('printer', n) # Send the next count to the counter task (recursive) sched.send('counter', n-1) sched = ActorScheduler() # Create the initial actors sched.new_actor('printer', printer()) sched.new_actor('counter', counter(sched)) # Send an initial message to the counter to initiate sched.send('counter', 10000) sched.run()
完全弄懂這段代碼需要更深入的學習,但是關鍵點在于收集消息的隊列。 本質上,調度器在有需要發送的消息時會一直運行著。 計數生成器會給自己發送消息并在一個遞歸循環中結束。
下面是一個更加高級的例子,演示了使用生成器來實現一個并發網絡應用程序:
from collections import deque from select import select # This class represents a generic yield event in the scheduler class YieldEvent: def handle_yield(self, sched, task): pass def handle_resume(self, sched, task): pass # Task Scheduler class Scheduler: def __init__(self): self._numtasks = 0 # Total num of tasks self._ready = deque() # Tasks ready to run self._read_waiting = {} # Tasks waiting to read self._write_waiting = {} # Tasks waiting to write # Poll for I/O events and restart waiting tasks def _iopoll(self): rset,wset,eset = select(self._read_waiting, self._write_waiting,[]) for r in rset: evt, task = self._read_waiting.pop(r) evt.handle_resume(self, task) for w in wset: evt, task = self._write_waiting.pop(w) evt.handle_resume(self, task) def new(self,task): ''' Add a newly started task to the scheduler ''' self._ready.append((task, None)) self._numtasks += 1 def add_ready(self, task, msg=None): ''' Append an already started task to the ready queue. msg is what to send into the task when it resumes. ''' self._ready.append((task, msg)) # Add a task to the reading set def _read_wait(self, fileno, evt, task): self._read_waiting[fileno] = (evt, task) # Add a task to the write set def _write_wait(self, fileno, evt, task): self._write_waiting[fileno] = (evt, task) def run(self): ''' Run the task scheduler until there are no tasks ''' while self._numtasks: if not self._ready: self._iopoll() task, msg = self._ready.popleft() try: # Run the coroutine to the next yield r = task.send(msg) if isinstance(r, YieldEvent): r.handle_yield(self, task) else: raise RuntimeError('unrecognized yield event') except StopIteration: self._numtasks -= 1 # Example implementation of coroutine-based socket I/O class ReadSocket(YieldEvent): def __init__(self, sock, nbytes): self.sock = sock self.nbytes = nbytes def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): data = self.sock.recv(self.nbytes) sched.add_ready(task, data) class WriteSocket(YieldEvent): def __init__(self, sock, data): self.sock = sock self.data = data def handle_yield(self, sched, task): sched._write_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): nsent = self.sock.send(self.data) sched.add_ready(task, nsent) class AcceptSocket(YieldEvent): def __init__(self, sock): self.sock = sock def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): r = self.sock.accept() sched.add_ready(task, r) # Wrapper around a socket object for use with yield class Socket(object): def __init__(self, sock): self._sock = sock def recv(self, maxbytes): return ReadSocket(self._sock, maxbytes) def send(self, data): return WriteSocket(self._sock, data) def accept(self): return AcceptSocket(self._sock) def __getattr__(self, name): return getattr(self._sock, name) if __name__ == '__main__': from socket import socket, AF_INET, SOCK_STREAM import time # Example of a function involving generators. This should # be called using line = yield from readline(sock) def readline(sock): chars = [] while True: c = yield sock.recv(1) if not c: break chars.append(c) if c == b'\n': break return b''.join(chars) # Echo server using generators class EchoServer: def __init__(self,addr,sched): self.sched = sched sched.new(self.server_loop(addr)) def server_loop(self,addr): s = Socket(socket(AF_INET,SOCK_STREAM)) s.bind(addr) s.listen(5) while True: c,a = yield s.accept() print('Got connection from ', a) self.sched.new(self.client_handler(Socket(c))) def client_handler(self,client): while True: line = yield from readline(client) if not line: break line = b'GOT:' + line while line: nsent = yield client.send(line) line = line[nsent:] client.close() print('Client closed') sched = Scheduler() EchoServer(('',16000),sched) sched.run()
這段代碼有點復雜。不過,它實現了一個小型的操作系統。 有一個就緒的任務隊列,并且還有因I/O休眠的任務等待區域。 還有很多調度器負責在就緒隊列和I/O等待區域之間移動任務。
討論
在構建基于生成器的并發框架時,通常會使用更常見的yield形式:
def some_generator(): ... result = yield data ...
使用這種形式的yield語句的函數通常被稱為“協程”。 通過調度器,yield語句在一個循環中被處理,如下:
f = some_generator() # Initial result. Is None to start since nothing has been computed result = None while True: try: data = f.send(result) result = ... do some calculation ... except StopIteration: break
這里的邏輯稍微有點復雜。不過,被傳給 send()
的值定義了在yield語句醒來時的返回值。 因此,如果一個yield準備在對之前yield數據的回應中返回結果時,會在下一次 send()
操作返回。 如果一個生成器函數剛開始運行,發送一個None值會讓它排在第一個yield語句前面。
除了發送值外,還可以在一個生成器上面執行一個 close()
方法。 它會導致在執行yield語句時拋出一個 GeneratorExit
異常,從而終止執行。 如果進一步設計,一個生成器可以捕獲這個異常并執行清理操作。 同樣還可以使用生成器的 throw()
方法在yield語句執行時生成一個任意的執行指令。 一個任務調度器可利用它來在運行的生成器中處理錯誤。
最后一個例子中使用的 yield from
語句被用來實現協程,可以被其它生成器作為子程序或過程來調用。 本質上就是將控制權透明的傳輸給新的函數。 不像普通的生成器,一個使用 yield from
被調用的函數可以返回一個作為 yield from
語句結果的值。 關于 yield from 的更多信息可以在 PEP 380 中找到。
最后,如果使用生成器編程,要提醒你的是它還是有很多缺點的。 特別是,你得不到任何線程可以提供的好處。例如,如果你執行CPU依賴或I/O阻塞程序, 它會將整個任務掛起直到操作完成。為了解決這個問題, 你只能選擇將操作委派給另外一個可以獨立運行的線程或進程。 另外一個限制是大部分Python庫并不能很好的兼容基于生成器的線程。 如果你選擇這個方案,你會發現你需要自己改寫很多標準庫函數。 作為本節提到的協程和相關技術的一個基礎背景,可以查看 PEP 342 和 “協程和并發的一門有趣課程”
PEP 3156 同樣有一個關于使用協程的異步I/O模型。 特別的,你不可能自己去實現一個底層的協程調度器。 不過,關于協程的思想是很多流行庫的基礎, 包括 gevent, greenlet, Stackless Python 以及其他類似工程。
看完上述內容,是不是對Python如何使用生成器代替線程有進一步的了解,如果還想學習更多內容,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。