您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關python中怎么實現多線程和多進程,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
1、GIL
名稱:全局解釋器鎖。在cpython中python中一個線程對應c語言中的一個線程。GIL使得同一時刻只有一個線程運行在同一個cpu上,無法將多個線程映射到 多個cpu上,從而保證了線程在某種時刻是安全的。GIL并不會一直占有,會根據執行的字節碼行數、時間片或io操作時釋放。非常適用io操作。
例如:
import threading var_total = 0def add():global var_totalfor var_i in range(1000000): var_total += 1def desc():global var_totalfor var_i in range(1000000): var_total -= 1if __name__ == '__main__': var_thread1 = threading.Thread(target=add) var_thread2 = threading.Thread(target=desc) var_thread1.start() var_thread2.start() var_thread1.join() var_thread2.join()print(var_total)
2、多線程編程
操作系統所能操作和調度的最小單元為線程。線程的調度比進程更加輕量級。對于io操作來說多進程和多線程性能相當。
多線程實現-方法
import timeimport threadingdef getDetailHtml(var_url):print('get detail html started') time.sleep(2)print('get detail html end')def getDetailUrl(var_url):print('get detail url started') time.sleep(4)print('get detail url end')if __name__ == '__main__': var_thread1 = threading.Thread(target=getDetailHtml,args=('http://www.baidu.com',)) var_thread2 = threading.Thread(target=getDetailUrl, args=('http://www.baidu.com',)) var_start_time = time.time()#設置當主線程退出,子線程終結,把線程設置成守護線程 #var_thread1.setDaemon(True) var_thread2.setDaemon(True) var_thread1.start() var_thread2.start()#線程阻塞,等待子線程執行完成,主線程退出 #var_thread1.join() #var_thread2.join() print('last time:{}'.format(time.time() - var_start_time))
多線程實現-類
import threadingimport timeclass GetDetailHtml(threading.Thread):def __init__(self,var_name):super().__init__(name = var_name)def run(self):print('get detail html started') time.sleep(2)print('get detail html end')class GetDetailUrl(threading.Thread):def run(self):print('get detail url started') time.sleep(4)print('get detail url end')if __name__ == '__main__': var_thread1 = GetDetailHtml('getDetailHtml') var_thread2 = GetDetailUrl() var_start_time = time.time() var_thread1.start() var_thread2.start() var_thread1.join() var_thread2.join()print('last time:{}'.format(time.time() - var_start_time))
3、線程間的通信Queue
from queue import Queueimport threadingimport timedef setDetailUrl(var_detail_queue):while True: time.sleep(2)for var_i in range(5): var_detail_queue.put('https://www.{}.com'.format(var_i))def getDetailUrl(var_detail_queue,var_thread_name):while True: var_url = var_detail_queue.get() time.sleep(2)print(var_thread_name,':',var_url)if __name__ == '__main__': var_detail_queue = Queue(maxsize=10000) var_set_detail = threading.Thread(target=setDetailUrl,args=(var_detail_queue,)) var_get_thread_list= []for var_index in range(5): var_get_thread = threading.Thread(target=getDetailUrl,args=(var_detail_queue,'thread'+str(var_index))) var_get_thread_list.append(var_get_thread) var_set_detail.start()for var_one in var_get_thread_list: var_one.start() var_set_detail.join()for var_one in var_get_thread_list: var_one.join()
4、線程間的鎖Lock、Rlock
import threadingfrom threading import Lock var_total = 0var_lock = Lock()def add():global var_totalfor var_i in range(1000000):#使用鎖會影響性能,使用鎖會出現死鎖(資源競爭也會出現死鎖),lock不能連續acquire多次,否則出現死鎖 var_lock.acquire() var_total += 1 var_lock.release()def reduce():global var_totalfor var_i in range(1000000): var_lock.acquire() var_total -= 1 var_lock.release()if __name__ == '__main__': var_thread_add = threading.Thread(target=add) var_thread_reduce = threading.Thread(target=reduce) var_thread_add.start() var_thread_reduce.start() var_thread_add.join() var_thread_reduce.join()print(var_total)
import threadingfrom threading import RLock var_total = 0var_lock = RLock()def add():global var_totalfor var_i in range(1000000):#RLock在同一個線程里面可以連續調用多次acquire,acquire的次數要和release的次數相等 var_lock.acquire() var_lock.acquire() var_total += 1 var_lock.release() var_lock.release()def reduce():global var_totalfor var_i in range(1000000): var_lock.acquire() var_total -= 1 var_lock.release()if __name__ == '__main__': var_thread_add = threading.Thread(target=add) var_thread_reduce = threading.Thread(target=reduce) var_thread_add.start() var_thread_reduce.start() var_thread_add.join() var_thread_reduce.join()print(var_total)
5、線程同步condition、Semaphore
from threading import Thread,Conditionclass XiaoAi(Thread):def __init__(self, var_con):super().__init__(name='小愛')self.var_con = var_condef run(self):with self.var_con:self.var_con.wait()print('{}:在'.format(self.name))self.var_con.notify()self.var_con.wait()print('{}:好啊'.format(self.name))class TianMao(Thread):def __init__(self, var_con):super().__init__(name='天貓精靈')self.var_con = var_condef run(self):with self.var_con:#notify,wait方法必須在with語句之后調用 print('{}:小愛同學'.format(self.name))#天貓先啟動,notify已發出但是小愛未啟動時未接收到 self.var_con.notify()self.var_con.wait()print('{}:我們來對古詩吧'.format(self.name))self.var_con.notify()if __name__ == '__main__': var_con = Condition() var_tianmao = TianMao(var_con) var_xiaoai = XiaoAi(var_con)#注意啟動順序 var_xiaoai.start() var_tianmao.start()
#Semaphore用于控制進入數量的鎖import threadingimport timeclass HtmlSpider(threading.Thread):def __init__(self, var_url, var_sem):super().__init__()self.var_url = var_urlself.var_sem = var_semdef run(self): time.sleep(2)print(self.var_url)self.var_sem.release()class UrlProducer(threading.Thread):def __init__(self, var_sem):super().__init__()self.var_sem = var_semdef run(self):for var_i in range(20):self.var_sem.acquire() var_thread = HtmlSpider('https://www.baidu.com{}'.format(var_i), self.var_sem) var_thread.start()if __name__ == '__main__': var_sem = threading.Semaphore(5) var_url_producer = UrlProducer(var_sem) var_url_producer.start()
6、線程池
from concurrent.futures import ThreadPoolExecutor, as_completed, waitimport timefrom concurrent.futures import Future'''線程池,主線程中可以獲取某一個線程的狀態或者某一任務的狀態,以及返回值當一個線程完成時主線程可以立即知道futures可以讓多線程和多進程編碼接口一致'''def getHtml(var_times): time.sleep(var_times)print('get page {} success'.format(var_times))return var_timesif __name__ == '__main__': var_execute = ThreadPoolExecutor(max_workers=1)# 通過submit函數提交執行的函數到線程池中,submit非阻塞的,立即返回 var_task1 = var_execute.submit(getHtml,(2)) var_task2 = var_execute.submit(getHtml,(3))#done方法用于判定任務是否執行成功 print(var_task1.done())print(var_task2.done())#在未執行的時候可以取消任務 print(var_task2.cancel()) time.sleep(3)print(var_task1.done())print(var_task2.done())#返回執行結果 print(var_task1.result())print('*'*20)#獲取已經成功的task返回值 var_execute_01 = ThreadPoolExecutor(max_workers=2) var_times = [3,2,5,6,4]#list推導式 var_tasks = [var_execute_01.submit(getHtml, var_one_time) for var_one_time in var_times ]#阻塞線程 wait(var_tasks)print('wait')#as_completed 是一個生成器,誰先完成就打印誰 for var_one_complete_task in as_completed(var_tasks):print(var_one_complete_task.result())print('*' * 20)#使用map函數,打印順序和var_times一致 var_tasks_data = var_execute_01.map(getHtml, var_times)for var_one_tasks_data in var_tasks_data:print(var_one_tasks_data)
7、多進程和多線程比較
#耗cpu的操作選用多進程編程,對于IO操作選用多線程編程,進程切換代價要高于線程import timefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom concurrent.futures import ProcessPoolExecutordef fib(var_n):if var_n <= 2:return 1 return fib(var_n - 1) + fib(var_n - 2)def threadPool():with ThreadPoolExecutor(3) as var_executor: var_all_tasks = [var_executor.submit(fib, (var_num)) for var_num in range(25,35)] var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('thread run time is {} s'.format(time.time() - var_start_time))def processPool():with ProcessPoolExecutor(3) as var_executor: var_all_tasks = [var_executor.submit(fib, (var_num)) for var_num in range(25,35)] var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('process run time is {} s'.format(time.time() - var_start_time))def randomSleep(var_n): time.sleep(var_n)return var_ndef threadIoPool():with ThreadPoolExecutor(3) as var_executor: var_all_tasks = [var_executor.submit(randomSleep, (var_num)) for var_num in range(5,10)] var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('thread io run time is {} s'.format(time.time() - var_start_time))def processIoPool():with ProcessPoolExecutor(3) as var_executor: var_all_tasks = [var_executor.submit(randomSleep, (var_num)) for var_num in range(5,10)] var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('process io run time is {} s'.format(time.time() - var_start_time))if __name__ == '__main__': threadPool() processPool() threadIoPool() processIoPool()
8、fork案例
import osimport timeprint('bobby')#fork只能用于linux/unix中,用于創建子進程,子進程會把父進程中的數據原樣拷貝至子進程中,子進程會運行fork之后的代碼var_pid = os.fork()print('bobby1')if var_pid == 0:print('子進程:{},父進程是:{}'.format(os.getpid(), os.getppid()))else:print('我是父進程:{}'.format(var_pid)) time.sleep(2)'''bobbybobby1我是父進程:951bobby1子進程:951,父進程是:950'''
9、多進程使用
import multiprocessingimport timedef getSleep(var_n): time.sleep(var_n)print('The child process ran successfully')return var_nif __name__ == '__main__': var_process = multiprocessing.Process(target=getSleep, args=(2,))print(var_process.pid) var_process.start()print(var_process.pid) var_process.join()print('The main process runs successfully')
10、進程池
import multiprocessingimport timedef getSleep(var_n): time.sleep(var_n)print('The child process ran successfully')return var_nif __name__ == '__main__':#使用進程池 var_pool = multiprocessing.Pool(multiprocessing.cpu_count())''' var_result = var_pool.apply_async(getSleep, args=(3,)) #關閉pool不在接收新的任務 var_pool.close() #等待所有的任務完成 var_pool.join() #打印結果 print(var_result.get()) ''' ''' #imap方法,完成順序和添加順序一致 for var_one_result in var_pool.imap(getSleep, [3,2,1]): print(var_one_result) ''' #imap_unordered方法,先執行完成先打印 for var_one_result in var_pool.imap_unordered(getSleep, [3, 2, 1]):print(var_one_result)
11、進程間通信Queue、Manager
import timefrom multiprocessing import Process, Queue, Pool, Managerdef producer(var_queue): var_queue.put('a') time.sleep(2)def consumer(var_queue): time.sleep(2) var_data = var_queue.get()print(var_data)if __name__ == '__main__':''' var_queue = Queue(10) var_producer = Process(target=producer, args=(var_queue,)) var_consumer = Process(target=consumer, args=(var_queue,)) var_producer.start() var_consumer.start() var_producer.join() var_consumer.join() ''' #queue不能用于進程池 ''' var_queue = Queue(10) var_pool = Pool(2) var_pool.apply_async(producer, args=(var_queue,)) var_pool.apply_async(consumer, args=(var_queue,)) var_pool.close() var_pool.join() ''' #Manager可以用于進程之間的通信 var_queue = Manager().Queue(10) var_pool = Pool(2) var_pool.apply_async(producer, args=(var_queue,)) var_pool.apply_async(consumer, args=(var_queue,)) var_pool.close() var_pool.join()
from multiprocessing import Manager , Processdef addDict(var_dict, var_key, var_value): var_dict[var_key] = var_valueif __name__ == '__main__': var_dict = Manager().dict() first_process = Process(target=addDict, args=(var_dict, 'a', 'a')) second_process = Process(target=addDict, args=(var_dict, 'b', 'b')) first_process.start() second_process.start() first_process.join() second_process.join()print(var_dict)
12、進程間的通信Pipe
#pipe只能用于兩個指定的進程之間的通信,pipe性能高于queueimport timefrom multiprocessing import Process, Pipedef producer(var_pipe): var_pipe.send('a') time.sleep(2)def consumer(var_pipe): time.sleep(2) var_data = var_pipe.recv()print(var_data)if __name__ == '__main__': var_recv, var_send = Pipe() var_producer = Process(target=producer, args=(var_send,)) var_consumer = Process(target=consumer, args=(var_recv,)) var_producer.start() var_consumer.start() var_producer.join() var_consumer.join()
關于python中怎么實現多線程和多進程就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。