您好,登錄后才能下訂單哦!
tornado中的協程是如何工作的
Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.。 —— [ 維基百科 ]
我們在平常編程中,更習慣使用的是子例程(subroutine),通俗的叫法是函數,或者過程。子例程,往往只有一個入口(函數調用,實參通過傳給形參開始執行),一個出口(函數return,執行完畢,或者引發異常,將控制權轉移給調用者)。但協程是子例程基礎上,一種更加寬泛定義的計算機程序模塊(子例程可以看做協程的特例),它可以有多個入口點,允許從一個入口點,執行到下一個入口點之前暫停,保存執行狀態,等到合適的時機恢復執行狀態,從下一個入口點重新開始執行,這也是協程應該具有的能力。
定義
協程代碼塊
一個入口點和下一個入口點(或者退出點)中的代碼。
協程模塊
由n個入口點代碼,和n個協程代碼塊組成。第一個入口點通常是一個函 數入口點。其組織形式如:函數入口點->協程代碼塊->入口點->協程代碼塊…,入口點和代碼塊相間。
線性模塊
一個同步函數的函數體是線性執行的。也就是說一個模塊中的每一行代碼,相繼執行,一個模塊在執行中,如果還沒有執行完畢,不會去執行其他模塊的代碼。稱這樣的代碼模塊為線性模塊。
一個協程模塊,如果只含有單一入口點和單一協程代碼塊(假設這個協程代碼塊全是同步代碼),當然這個協程模塊是一個線性執行模塊,但是如果含有多個入口點和多個協程代碼塊,那么就不是一個線性模塊。那么執行一個協程模塊過程實際是分散的(不同的時間段,執行不同的協程代碼塊,協程代碼塊的執行時間段,彼此不相交),但也是順序的(后一個協程代碼塊在前一個協程代碼塊執行結束后才執行)。兩個屬于同一協程模塊的相繼協程代碼塊執行的中間時間間隙,可能有很多其他協程模塊的協程代碼片段在執行。
談到協程,必須要說說python語義中的生成器(generator)。
在pep255中提到了”simple generator”和”yield語句”(此時還不是”yield表達式”)的實現。一個basic idea,提供一種函數,能夠返回中間結果給調用者,然后維護函數的局部狀態,以便函數當離開后,也能恢復執行。
prep255中舉了一個簡單的例子,生成斐波那契數列:
def fib(): a, b = 0, 1 while 1: yield b a, b = b, a+b
a,b初始化為0,1。當yield b被執行,1被返回給調用者。當fib恢復執行,a,變成了1,b也是1,然后將1返回給調用者,如此循環。generator是一種非常自然的編程方式,因為對于fib來說,它的功能不變,都還是不斷生成下一個斐波那契數。而對于fib的調用者來說,fib像一個列表的迭代器,不斷迭代,可以獲取下一個斐波那契數。
def caller(): for num in fib(): print num
生成器是一個含有yield表達式的函數,此時該函數叫生成器。一個生成器永遠是異步的,即使生成器模塊中含有阻塞代碼。因為調用一個生成器,生成器的參數會綁定到生成器,結果返回是一個生成器對象,它的類型是types.GeneratorType,不會去執行生成器主模塊中的代碼。
每次調用一個GeneratorType對象的next方法,生成器函數執行到下一個yield語句或者,或者碰到一個return語句,或者執行到生成器函數結束。
在pep342中,對Generator進一步加強,增加了GeneratorType的send方法,和yield表達式語義。yield表達式,可以作為等號右邊的表達式。如果對Generator調用send(None)方法,生成器函數會從開始一直執行到yield表達式。那么下一次對Generator調用send(argument),Generator恢復執行。那么可以在生成器函數體內獲得這個argument,這個argument將會作為yield表達式的返回值。
從上面可以看到,Generator已經具備協程的一些能力。如:能夠暫停執行,保存狀態;能夠恢復執行;能夠異步執行。
但是此時Generator還不是一個協程。一個真正的協程能夠控制代碼什么時候繼續執行。而一個Generator執行遇到一個yield表達式 或者語句,會將執行控制權轉移給調用者。
However, it is still possible to implement coroutines on top of a generator facility, with the aid of a top-level dispatcher routine (a trampoline, essentially) that passes control explicitly to child generators identified by tokens passed back from the generators。 —— [ 維基百科 ]
在維基百科中提到,可以實現一個頂級的調度子例程,將執行控制權轉移回Generator,從而讓它繼續執行。在tornado中,ioLoop就是這樣的頂級調度子例程,每個協程模塊通過,函數裝飾器coroutine和ioLoop進行通信,從而ioLoop可以在協程模塊執行暫停后,在合適的時機重新調度協程模塊執行。
不過,接下來還不能介紹coroutine和ioLoop,在介紹這兩者之前,先得明白tornado中在協程環境中一個非常重要的類Future.
Future類位于tornado源碼的concurrent模塊中。Future類的完整代碼,請查看tornado的源碼。在這里截取一部分代碼作為分析之用
class Future(object): def done(self): return self._done def result(self, timeout=None): self._clear_tb_log() if self._result is not None: return self._result if self._exc_info is not None: raise_exc_info(self._exc_info) self._check_done() return self._result def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._set_done() def _set_done(self): self._done = True for cb in self._callbacks: try: cb(self) except Exception: app_log.exception('exception calling callback %r for %r', cb, self) self._callbacks = None
Future類重要成員函數:
def done(self):
Future的_result成員是否被設置
def result(self, timeout=None):
獲取Future對象的結果
def add_done_callback(self, fn):
添加一個回調函數fn給Future對象。如果這個Future對象已經done,則直接執行fn,否則將fn加入到Future類的一個成員列表中保存。
def _set_done(self):
一個內部函數,主要是遍歷列表,逐個調用列表中的callback函數,也就是前面add_done_calback加如來的。
def set_result(self, result):
給Future對象設置result,并且調用_set_done。也就是說,當Future對象獲得result后,所有add_done_callback加入的回調函數就會執行。
Future封裝了異步操作的結果。實際是它類似于在網頁html前端中,圖片異步加載的占位符,但加載后最終也是一個完整的圖片。Future也是同樣用處,tornado使用它,最終希望它被set_result,并且調用一些回調函數。Future對象實際是coroutine函數裝飾器和IOLoop的溝通使者,有著非常重要的作用。
tornado框架的底層核心類,位于tornado的ioloop模塊。功能方面類似win32窗口的消息循環。每個窗口可以綁定一個窗口過程。窗口過程主要是一個消息循環在執行。消息循環主要任務是利用PeekMessage系統調用,從消息隊列中取出各種類型的消息,判斷消息的類型,然后交給特定的消息handler進行執行。
tornado中的IOLoop與此相比具有很大的相似性,在協程運行環境中擔任著協程調度器的角色, 和win32的消息循環本質上都是一種事件循環,等待事件,然后運行對應的事件處理器(handler)。不過IOLoop主要調度處理的是IO事件(如讀,寫,錯誤)。除此之外,還能調度callback和timeout事件。
在本博文中,我們暫時只關注callback事件,因為這個與協程調度的相關性最大。
def add_future(self, future, callback): assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
add_future函數在基類IOLoop中實現,函數參數是一個Future對象和一個callback函數。當Future對象被set_result,執行一個回調函數,是個lambda函數,在lambda函數中調用IOLoop的add_callback函數。將add_future的參數callback加入到IOLoop的統一調度中,讓callback在IOLoop下一次迭代中執行。
def add_callback(self, callback, *args, **kwargs): with self._callback_lock: if self._closing: raise RuntimeError("IOLoop is closing") list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty and thread.get_ident() != self._thread_ident: self._waker.wake()
add_callback函數主要在IOLoop的子類PollIOLoop中實現。也很容易理解。
將傳入的callback函數,利用偏函數進行包裝,將所有callback真正運行時需要的參數,都綁定到生成的偏函數中,實際上就是找個地方把callback運行時需要的參數保存起來。將包裝好的偏函數加入到回調函數列表。當IOLoop下一次迭代運行的時候,遍歷callback函數列表,運行偏函數的時候,就不再需要傳入參數執行,效果等同于用實參運行callback。
IOLoop對象調用start函數,會運行event loop。在event loop中,首先遍歷callback列表,執行回調函數,然后遍歷timeout列表,執行timeoutCallback。最后才執行ioHandler。
函數裝飾器本質是一個函數,我們稱這個函數為裝飾器函數。裝飾器函數簽名含有一個 函數對象(可調用對象callable)參數,返回的結果是一個裝飾器內部定義的一個新函數對象。如果返回的函數對象被調用,裝飾器函數的參數(函數對象)也會被調用。不過,會在這個參數(裝飾器函數參數)調用前做一些事情,或者在這個參數調用后做一些事情。實際上做的這些事情,就是利用內部自定義的函數對象對參數(原函數)的一些裝飾(額外操作)
當一個函數被裝飾器裝飾。那么以后調用這個函數(此函數已經非彼函數)的時候,實際上調用的是裝飾器函數返回的內部函數對象。理解tornado中coroutine修飾的函數如何執行,主要是 理解coroutine這個裝飾器函數內部定義的新函數對象所做的那些事兒。
def coroutine(func, replace_callback=True): return _make_coroutine_wrapper(func, replace_callback=True)
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() self.stack_context_deactivate = None if self.handle_yield(first_yielded): self.run() def run(self): if self.running or self.finished: return try: self.running = True while True: future = self.future if not future.done(): return self.future = None try: try: value = future.result() except Exception: self.had_exception = True yielded = self.gen.throw(*sys.exc_info()) else: yielded = self.gen.send(value) except (StopIteration, Return) as e: self.finished = True self.future = _null_future self.result_future.set_result(getattr(e, 'value', None)) self.result_future = None return except Exception: self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None return if not self.handle_yield(yielded): return finally: self.running = False def handle_yield(self, yielded): try: self.future = convert_yielded(yielded) except BadYieldError: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if not self.future.done() or self.future is moment: self.io_loop.add_future( self.future, lambda f: self.run()) return False return True
以上的代碼其實都對源碼進行了一些調整。但函數調用進入到Runner的構造函數的時候,也就是說Generator的第一次執行已經完畢。那么接下來,調用的是,handle_yield,對第一次Generator執行的返回結果進行處理。當然返回的結果可能是多種類型。可能是一個Future對象,list,dict,或者其他類型對象,或者普通類型。通過convert_yield,self.future保存的是一個Future對象的引用(第一次Generator執行返回的結果)。此時如果self.future還沒被set_result。對為self.future綁定一個done_callback(lambda f: self.run()),加入到self.io_loop中。
在前文說到。ioloop的add_future函數中,實際上是只有當參數future,在某個地方調用了set_result, 才在執行done_callback時,將參數callback加入到IOLoop中調度。換句話說。Runner類中,self.run要等到self.future在某個代碼塊被set_result,IOLoop才有可能在下一次迭代的時候執行它,從而調度協程繼續恢復執行。而在self.run函數中,我們可以看到將會通過Generator的send函數,恢復執行下一個協程代碼塊。所以關鍵的問題是我們需要明白Runner類中self.future,在什么時候被set_result?
從這里我們可以看到Future類的重要作用。future.set_result起到的作用是:
發送一個信號,告訴IOLoop去調度暫停的協程繼續執行。
我們結合下面的代碼例子就可以明白協程調度的整個流程是如何進行的了。
import tornado.ioloop from tornado.gen import coroutine from tornado.concurrent import Future @coroutine def asyn_sum(a, b): print("begin calculate:sum %d+%d"%(a,b)) future = Future() def callback(a, b): print("calculating the sum of %d+%d:"%(a,b)) future.set_result(a+b) tornado.ioloop.IOLoop.instance().add_callback(callback, a, b) result = yield future print("after yielded") print("the %d+%d=%d"%(a, b, result)) def main(): asyn_sum(2,3) tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": main()
實際的運行場景是:一個協程(asyn_sum)遇到yield表達式被暫停執行后,IOLoop調用另外一個代碼段(asyn_sum中的回調函數callback)執行,而在callback中,剛好可以訪問到屬于被暫停協程(asyn_sum)中的future對象(也就是Runner對象中的self.future的引用),callback中將future調用set_result,那么這個暫停的協程(asyn_sum)在IOLoop下一次迭代調度回調函數時中,被恢復執行。
tornado中的協程實現基于python語言的Generator并且結合一個全局的調度器IOLoop,Generator通過函數裝飾器coroutine和IOLoop進行通信。IOLoop并沒有直接控制能力,調度恢復被暫停的協程繼續執行。future對象在協程中被yield。協程暫停,IOLoop調度另外一個代碼模塊執行,而在這個執行的代碼模塊中剛好,可以訪問這個future對象,將其set_result,結果通過IOLoop間接恢復暫停協程執行。不同執行代碼模塊中,共享future對象,彼此合作,協程調度得順利執行。
從這種意義上來說,future對象,像window中的Event內核對象的作用。window中的event用于線程中同步。而協程中的yield future相當于WaitForSingleObject(event_object), 而future.set_result(result)。相當于SetEvent(event_object)。而future和Event的不同點在于,協程借future來恢復執行,而線程借Event來進行線程間同步。
以上就是本文關于詳細解讀tornado協程(coroutine)原理的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。