亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python如何使用生成器代替線程

發布時間:2020-08-04 13:56:54 來源:億速云 閱讀:132 作者:小豬 欄目:開發技術

這篇文章主要講解了Python如何使用生成器代替線程,內容清晰明了,對此有興趣的小伙伴可以學習一下,相信大家閱讀完之后會有幫助。

問題

你想使用生成器(協程)替代系統線程來實現并發。這個有時又被稱為用戶級線程或綠色線程。

解決方案

要使用生成器實現自己的并發,你首先要對生成器函數和 yield 語句有深刻理解。 yield 語句會讓一個生成器掛起它的執行,這樣就可以編寫一個調度器, 將生成器當做某種“任務”并使用任務協作切換來替換它們的執行。 要演示這種思想,考慮下面兩個使用簡單的 yield 語句的生成器函數:

# Two simple generator functions
def countdown(n):
  while n > 0:
    print('T-minus', n)
    yield
    n -= 1
  print('Blastoff!')

def countup(n):
  x = 0
  while x < n:
    print('Counting up', x)
    yield
    x += 1

這些函數在內部使用yield語句,下面是一個實現了簡單任務調度器的代碼:

from collections import deque

class TaskScheduler:
  def __init__(self):
    self._task_queue = deque()

  def new_task(self, task):
    '''
    Admit a newly started task to the scheduler
    '''
    self._task_queue.append(task)

  def run(self):
    '''
    Run until there are no more tasks
    '''
    while self._task_queue:
      task = self._task_queue.popleft()
      try:
        # Run until the next yield statement
        next(task)
        self._task_queue.append(task)
      except StopIteration:
        # Generator is no longer executing
        pass

# Example use
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()

TaskScheduler 類在一個循環中運行生成器集合——每個都運行到碰到yield語句為止。 運行這個例子,輸出如下:

T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
...

到此為止,我們實際上已經實現了一個“操作系統”的最小核心部分。 生成器函數就是任務,而yield語句是任務掛起的信號。 調度器循環檢查任務列表直到沒有任務要執行為止。

實際上,你可能想要使用生成器來實現簡單的并發。 那么,在實現actor或網絡服務器的時候你可以使用生成器來替代線程的使用。

下面的代碼演示了使用生成器來實現一個不依賴線程的actor:

from collections import deque

class ActorScheduler:
  def __init__(self):
    self._actors = {}     # Mapping of names to actors
    self._msg_queue = deque()  # Message queue

  def new_actor(self, name, actor):
    '''
    Admit a newly started actor to the scheduler and give it a name
    '''
    self._msg_queue.append((actor,None))
    self._actors[name] = actor

  def send(self, name, msg):
    '''
    Send a message to a named actor
    '''
    actor = self._actors.get(name)
    if actor:
      self._msg_queue.append((actor,msg))

  def run(self):
    '''
    Run as long as there are pending messages.
    '''
    while self._msg_queue:
      actor, msg = self._msg_queue.popleft()
      try:
         actor.send(msg)
      except StopIteration:
         pass

# Example use
if __name__ == '__main__':
  def printer():
    while True:
      msg = yield
      print('Got:', msg)

  def counter(sched):
    while True:
      # Receive the current count
      n = yield
      if n == 0:
        break
      # Send to the printer task
      sched.send('printer', n)
      # Send the next count to the counter task (recursive)
      sched.send('counter', n-1)

  sched = ActorScheduler()
  # Create the initial actors
  sched.new_actor('printer', printer())
  sched.new_actor('counter', counter(sched))

  # Send an initial message to the counter to initiate
  sched.send('counter', 10000)
  sched.run()

完全弄懂這段代碼需要更深入的學習,但是關鍵點在于收集消息的隊列。 本質上,調度器在有需要發送的消息時會一直運行著。 計數生成器會給自己發送消息并在一個遞歸循環中結束。

下面是一個更加高級的例子,演示了使用生成器來實現一個并發網絡應用程序:

from collections import deque
from select import select

# This class represents a generic yield event in the scheduler
class YieldEvent:
  def handle_yield(self, sched, task):
    pass

  def handle_resume(self, sched, task):
    pass

# Task Scheduler
class Scheduler:
  def __init__(self):
    self._numtasks = 0    # Total num of tasks
    self._ready = deque()  # Tasks ready to run
    self._read_waiting = {} # Tasks waiting to read
    self._write_waiting = {} # Tasks waiting to write

  # Poll for I/O events and restart waiting tasks
  def _iopoll(self):
    rset,wset,eset = select(self._read_waiting,
                self._write_waiting,[])
    for r in rset:
      evt, task = self._read_waiting.pop(r)
      evt.handle_resume(self, task)
    for w in wset:
      evt, task = self._write_waiting.pop(w)
      evt.handle_resume(self, task)

  def new(self,task):
    '''
    Add a newly started task to the scheduler
    '''
    self._ready.append((task, None))
    self._numtasks += 1

  def add_ready(self, task, msg=None):
    '''
    Append an already started task to the ready queue.
    msg is what to send into the task when it resumes.
    '''
    self._ready.append((task, msg))

  # Add a task to the reading set
  def _read_wait(self, fileno, evt, task):
    self._read_waiting[fileno] = (evt, task)

  # Add a task to the write set
  def _write_wait(self, fileno, evt, task):
    self._write_waiting[fileno] = (evt, task)

  def run(self):
    '''
    Run the task scheduler until there are no tasks
    '''
    while self._numtasks:
       if not self._ready:
         self._iopoll()
       task, msg = self._ready.popleft()
       try:
         # Run the coroutine to the next yield
         r = task.send(msg)
         if isinstance(r, YieldEvent):
           r.handle_yield(self, task)
         else:
           raise RuntimeError('unrecognized yield event')
       except StopIteration:
         self._numtasks -= 1

# Example implementation of coroutine-based socket I/O
class ReadSocket(YieldEvent):
  def __init__(self, sock, nbytes):
    self.sock = sock
    self.nbytes = nbytes
  def handle_yield(self, sched, task):
    sched._read_wait(self.sock.fileno(), self, task)
  def handle_resume(self, sched, task):
    data = self.sock.recv(self.nbytes)
    sched.add_ready(task, data)

class WriteSocket(YieldEvent):
  def __init__(self, sock, data):
    self.sock = sock
    self.data = data

  def handle_yield(self, sched, task):
    sched._write_wait(self.sock.fileno(), self, task)

  def handle_resume(self, sched, task):
    nsent = self.sock.send(self.data)
    sched.add_ready(task, nsent)

class AcceptSocket(YieldEvent):
  def __init__(self, sock):
    self.sock = sock

  def handle_yield(self, sched, task):
    sched._read_wait(self.sock.fileno(), self, task)

  def handle_resume(self, sched, task):
    r = self.sock.accept()
    sched.add_ready(task, r)

# Wrapper around a socket object for use with yield
class Socket(object):
  def __init__(self, sock):
    self._sock = sock

  def recv(self, maxbytes):
    return ReadSocket(self._sock, maxbytes)

  def send(self, data):
    return WriteSocket(self._sock, data)

  def accept(self):
    return AcceptSocket(self._sock)

  def __getattr__(self, name):
    return getattr(self._sock, name)

if __name__ == '__main__':
  from socket import socket, AF_INET, SOCK_STREAM
  import time

  # Example of a function involving generators. This should
  # be called using line = yield from readline(sock)
  def readline(sock):
    chars = []
    while True:
      c = yield sock.recv(1)
      if not c:
        break
      chars.append(c)
      if c == b'\n':
        break
    return b''.join(chars)

  # Echo server using generators
  class EchoServer:
    def __init__(self,addr,sched):
      self.sched = sched
      sched.new(self.server_loop(addr))

    def server_loop(self,addr):
      s = Socket(socket(AF_INET,SOCK_STREAM))

      s.bind(addr)
      s.listen(5)
      while True:
        c,a = yield s.accept()
        print('Got connection from ', a)
        self.sched.new(self.client_handler(Socket(c)))

    def client_handler(self,client):
      while True:
        line = yield from readline(client)
        if not line:
          break
        line = b'GOT:' + line
        while line:
          nsent = yield client.send(line)
          line = line[nsent:]
      client.close()
      print('Client closed')

  sched = Scheduler()
  EchoServer(('',16000),sched)
  sched.run()

這段代碼有點復雜。不過,它實現了一個小型的操作系統。 有一個就緒的任務隊列,并且還有因I/O休眠的任務等待區域。 還有很多調度器負責在就緒隊列和I/O等待區域之間移動任務。

討論

在構建基于生成器的并發框架時,通常會使用更常見的yield形式:

def some_generator():
  ...
  result = yield data
  ...

使用這種形式的yield語句的函數通常被稱為“協程”。 通過調度器,yield語句在一個循環中被處理,如下:

f = some_generator()

# Initial result. Is None to start since nothing has been computed
result = None
while True:
  try:
    data = f.send(result)
    result = ... do some calculation ...
  except StopIteration:
    break

這里的邏輯稍微有點復雜。不過,被傳給 send() 的值定義了在yield語句醒來時的返回值。 因此,如果一個yield準備在對之前yield數據的回應中返回結果時,會在下一次 send() 操作返回。 如果一個生成器函數剛開始運行,發送一個None值會讓它排在第一個yield語句前面。

除了發送值外,還可以在一個生成器上面執行一個 close() 方法。 它會導致在執行yield語句時拋出一個 GeneratorExit 異常,從而終止執行。 如果進一步設計,一個生成器可以捕獲這個異常并執行清理操作。 同樣還可以使用生成器的 throw() 方法在yield語句執行時生成一個任意的執行指令。 一個任務調度器可利用它來在運行的生成器中處理錯誤。

最后一個例子中使用的 yield from 語句被用來實現協程,可以被其它生成器作為子程序或過程來調用。 本質上就是將控制權透明的傳輸給新的函數。 不像普通的生成器,一個使用 yield from 被調用的函數可以返回一個作為 yield from 語句結果的值。 關于 yield from 的更多信息可以在 PEP 380 中找到。

最后,如果使用生成器編程,要提醒你的是它還是有很多缺點的。 特別是,你得不到任何線程可以提供的好處。例如,如果你執行CPU依賴或I/O阻塞程序, 它會將整個任務掛起直到操作完成。為了解決這個問題, 你只能選擇將操作委派給另外一個可以獨立運行的線程或進程。 另外一個限制是大部分Python庫并不能很好的兼容基于生成器的線程。 如果你選擇這個方案,你會發現你需要自己改寫很多標準庫函數。 作為本節提到的協程和相關技術的一個基礎背景,可以查看 PEP 342 和 “協程和并發的一門有趣課程”

PEP 3156 同樣有一個關于使用協程的異步I/O模型。 特別的,你不可能自己去實現一個底層的協程調度器。 不過,關于協程的思想是很多流行庫的基礎, 包括 gevent, greenlet, Stackless Python 以及其他類似工程。

看完上述內容,是不是對Python如何使用生成器代替線程有進一步的了解,如果還想學習更多內容,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

得荣县| 鹤峰县| 蒙山县| 屯留县| 克山县| 镶黄旗| 大姚县| 繁昌县| 阿合奇县| 怀化市| 安阳市| 宝坻区| 武夷山市| 巨鹿县| 兴城市| 东乡族自治县| 涿鹿县| 东安县| 金川县| 右玉县| 绥中县| 舟山市| 朝阳市| 万全县| 怀柔区| 祁连县| 宁强县| 南澳县| 奉化市| 教育| 哈密市| 师宗县| 神农架林区| 万盛区| 抚远县| 睢宁县| 西乌| 固始县| 汝州市| 汽车| 丹江口市|