您好,登錄后才能下訂單哦!
這篇文章主要介紹“python中進程間通信及怎么設置狀態量控制另一個進程”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“python中進程間通信及怎么設置狀態量控制另一個進程”文章能幫助大家解決問題。
業務場景:在當前遇到的業務場景中,我們需要啟一個間隔任務,這個間隔任務跑一個算法,然后把算法的結果進行一些處理,并入庫。任務目前間隔是一小時,算法運行時間要50多分鐘,留給結果處理的時間并不多,所以有可能會出現超時。目前來說,優化方向在算法上會更為合理,因為結果處理本來就不用很多時間。但是在這個業務場景下,想要把結果處理的時間進行無限壓縮,壓縮到0,其實也是可以實現的,說是壓縮為0,實際上就是在算法執行完成后,再啟一個進程去處理,這樣就不會由于需要進行數據處理而影響到算法的運行,將算法和結果處理分為兩個獨立的進程去處理。在最開始的程序中,是把算法運行和結果處理作為一個周期,而現在是把算法運行和結果處理分為兩個周期去處理。
技術實現方案:
啟動二個進程,其中一個運行算法,在算法運行結束后,發送一個狀態值到另外一個進程,另外一個進程在收到狀態量后啟動數據處理即可。兩個進程間互不影響即可。其實也相當于算法進程控制數據處理進程
測試場景構造代碼:
from multiprocessing import Process,Pipe import time import sys import os def send_message(conn): for i in range(1000): print('send_message:%d'%i) print(os.getpid()) conn.send(i) time.sleep(3) def send_message1(conn): # for i in range(1000): print(conn.recv()) while True: if conn.recv() % 5 == 0: print(' today is nice day') time.sleep(1) if __name__ == '__main__': #創建一個進程通信管道 left,right = Pipe() t1 = Process(target=send_message,args=(left,)) t2 = Process(target=send_message1,args=(right,)) t1.start() t2.start()
在這個案例場景下有一些需要注意的點:
一、time.sleep()的問題,睡眠指定時間,總是會出錯,具體的出錯原因到現在也沒有找到,這是原來出現的問題,在這里沒有做長時間的測試,所以不一定會出現,但是還是要注意
二、代碼實現中與上述的描述差異有一些,如未啟用調度任務,只是啟了一個間隔運行的任務。
三、數據處理進程一直處理空跑狀態,會造成資源的浪費(更合理的應該是形成阻塞狀態,但是對于阻塞狀態的構造缺乏認知,所以先犧牲資源
四、在上述描述的需求中,在算法運行及數據處理的上一節點還有一個調度任務在控制,這里未做出體現,其實應該把定時任務和數據處理作為兩個周期獨立出來才更符合上述描述中的需求。
業務場景:在當前遇到的業務場景中,我們需要啟一個間隔任務,這個間隔任務跑一個算法,然后把算法的結果進行一些處理,并入庫。任務目前間隔是一小時,算法運行時間要50多分鐘,留給結果處理的時間并不多,所以有可能會出現超時。目前來說,優化方向在算法上會更為合理,因為結果處理本來就不用很多時間。但是在這個業務場景下,想要把結果處理的時間進行無限壓縮,壓縮到0,其實也是可以實現的,說是壓縮為0,實際上就是在算法執行完成后,再啟一個進程去處理,這樣就不會由于需要進行數據處理而影響到算法的運行,將算法和結果處理分為兩個獨立的進程去處理。在最開始的程序中,是把算法運行和結果處理作為一個周期,而現在是把算法運行和結果處理分為兩個周期去處理。
上面的解決方案中只涉及到了啟用兩個進程去運行兩個任務,并未涉及到啟用定時任務框架,所以可能會顯得和上述的業務場景不一致,所以在這里重新解決一下。上面也是沒有問題的,只是把定時任務框架也作為一個任務去處理即可。然后在定時任務運行完程后,向另外一個進程傳入一個參數,作為啟動另一個進程的狀態量即可。當然,在這里,兩個進程還是完全占滿的,即處理阻塞狀態。對于資源的利用還是沒有完全達到最好。后續再考慮使用進程池的方式,看是否可以讓其中的一個進程運行完后直接釋放資源。
技術解決方案如下:
from multiprocessing import Process,Pipe import time from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler # schedule = BackgroundScheduler() schedule = BlockingScheduler(timezone="Asia/Shanghai") # schedule = AsyncIOScheduler(timezone="Asia/Shanghai") def algorithm(conn): print('start_run') conn.send('please run') # time.sleep(5) def worth_result(conn): while True: if conn.recv() == 'please run': print(conn.recv() + ' very nice!') def time_job(conns): schedule.add_job(func=algorithm,trigger='interval',seconds=5,args=(conns,)) schedule.start() if __name__ == '__main__': left,right = Pipe() t1 = Process(target=time_job,args=(left,)) t2 = Process(target=worth_result,args=(right,)) t1.start() t2.start()
在這里還有一些點需要說明,定時任務選擇那一種類型其實都沒有關系,阻塞和非阻塞其實沒有關系,因為我們在這里是直接啟了兩個進程,每個進程間是相互獨立的,并非是在定時任務下啟用的兩個進程,所以不會影響的。
關于這個解決方案還有的問題:
一、上述所說,兩個進程是占滿的,所以對于資源來說,兩個進程的利用率一直很高
二、擴展性不足,如果在這個程序中還有其他需要處理的過程,就需要再添加進程,或者把他添加到當前的進程之下,代碼重構會比較麻煩一些
三、整個任務的控制不足,需要加以完善。比如對于運行狀態一些控制及查看,一般程序如果運行時間較長的話,我們應該添加這樣的接口,否則啟動后如果沒有出結果,我們是不知道其運行狀態,有一點被動
四、關于三,使用logging庫,應該是可以直接去輸出其日志,但是日志庫作為第三方庫,相當于是對整個運行狀態進行監控,會不會再占用一個進程,這個需要去測試
五、完備性及容災處理,如果程序由于資源等其他問題掛掉后,會有一些數據冗余下來,也就是一些算法未進行處理,這個時候需要考慮怎么樣去補數據?原始文件如果沒有保留下來呢?而且如果這些數據是極重要的數據該怎么處理?如果程序掛掉后,應該如何快速的去處理呢?直接重啟嗎?
六、如果數據處理的進程所用的時間比算法還多,那該怎么辦?目前的業務來看,是遠低于的,但是如果是遠高于呢?可否將處理工作進行分配,利用多臺機器來處理,然后再把結果合并起來?
分布式處理的思想越來越濃。
關于“python中進程間通信及怎么設置狀態量控制另一個進程”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。