您好,登錄后才能下訂單哦!
?
目錄
threading.RLock類:... 1
threading.Condition類:... 2
threading.Barrier類:... 4
?
?
?
可重入鎖,是線程相關的鎖;
線程A獲得可重復鎖,并可多次成功獲取,不會阻塞,最后要在線程A中做和acquire次數相同的release(拿多少次鎖,還多少回來);
?
注,線程相關:
threading.local類;
?
例:
lock = threading.RLock()
ret = lock.acquire()
print(ret)
ret = lock.acquire(timeout=5)
print(ret)
ret = lock.acquire(False)
print(ret)
ret = lock.acquire(False)?? #全能拿到鎖
print(ret)
?
lock.release()
lock.release()
lock.release()
lock.release()
?
# lock.release()?? #前面沒有對應的acquire,拋RuntimeError: cannot release un-acquired lock
?
def sub(lock:threading.RLock):
??? lock.release()?? #主線程中加的,不能在子線程中釋放,理解線程級別
???
threading.Thread(target=sub, args=(lock,)).start()
輸出:
True
True
True
True
Exception in thread Thread-1:
Traceback (most recent call last):
? File "D:\Python\Python35\lib\threading.py", line 914, in _bootstrap_inner
??? self.run()
? File "D:\Python\Python35\lib\threading.py", line 862, in run
??? self._target(*self._args, **self._kwargs)
? File "E:/git_practice/cmdb/example_threading2.py", line 249, in sub
??? lock.release()
RuntimeError: cannot release un-acquired lock
?
?
?
Condition(lock=None),構造方法,可傳入一個lock或RLock對象,默認是RLock;
?
cond = threading.Condition()
cond.acquire(*args),獲取鎖;
cond.release()
cond.wait(timeout=None),等待或超時;
cond.notify(n=1),喚醒至多指定數目個數的等待線程,默認1個,沒有等待的線程就沒有任何操作,源碼中waiter;
cond.notify_all(),喚醒所有等待的線程,源碼中waiters;
?
總結:
Condition用于生產者-消費者模型,解決生產者-消費者速度匹配問題;
采用了通知機制,非常有效率;
使用Condition,必須先acquire,用完要release,因為內部使用了鎖,默認使用RLock,最好的方式是使用with上下文;
消費者wait等待通知,生產者生產好消息,對消費者發通知,可使用notify或notify_all;
?
可把Condition理解為一把高級的鎖,它提供了Lock、RLock更高級的功能,允許我們能夠控制復雜的線程同步問題;
threading.Condition內部維護了一個鎖對象(默認是RLock),可在創建Condition對象時把鎖對象作為參數傳入;
threading.Condition也提供了acquire和release方法,含義與鎖的一致,其實它只是簡單調用內部鎖對象的對應的方法而已;
threading.Condition還提供了wait、notify、notify_all方法:
wait([timeout]),釋放內部所占用的鎖,同時線程被掛起,直至接收到通知被喚醒或超時(如果提供timeout),當線程被喚醒并重新占用鎖時,程序才會繼續執行下去;
notify(),喚醒一個掛起的線程(如果存在掛起的線程),notify()不會釋放所占用的鎖;
notify_all(),喚醒所有掛起的線程(如果存在掛起的線程),不會釋放所占用的鎖;
?
Lock與RLock:
RLock允許在同一線程中被多次acquire,而Lock不允許這種情況;
如果使用RLock,那么 acquire和release必須成對出現,即調用了n次acquire,必須調用n次release才能真正釋放所占用的鎖;
?
例:
class Dispatcher:
??? def __init__(self):
??????? self.data = 0
??????? self.event = threading.Event()
?
??? def produce(self):
??????? for i in range(100):
??????????? data = random.randint(1,100)
??????????? self.data = data
??????????? self.event.wait(1)
?
??? def custom(self):
??????? while True:?? #消費者浪費了大量cpu時間,主動來查看有沒有數據
??????????? logging.info(self.data)?? #有重復消費問題
??????????? self.event.wait(1)?? #隔1秒生成1個
?
d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)
c.start()?? #消費者先啟動
p.start()
輸出:
……
2018-08-06-15:54:25?????? Thread info: 13052 Thread-1 13
2018-08-06-15:54:25?????? Thread info: 12052 Thread-2 13
2018-08-06-15:54:26?????? Thread info: 12052 Thread-2 13
……
?
例:
class Dispatcher:
??? def __init__(self):
??????? self.data = 0
??????? self.event = threading.Event()
??????? self.cond = threading.Condition()
?
??? def produce(self):
??????? for i in range(100):
??????????? data = random.randint(1,100)
??????????? # logging.info(data)
??????????? with self.cond:
??????????????? self.data = data
??????????????? self.cond.notify(2)?? #通知機制,有數據,通知消費者來消費;交給2個人做,一般是1(生產者)對多(消費者)
?????????? ?????self.cond.notify_all()?? #通知所有消費者,1對多
??????????? self.event.wait(1)
?
??? def custom(self):
??????? # while True:
??????? while not self.event.is_set():
??????????? # logging.info(self.data)
??????????? with self.cond:?? #消費者被迫匹配生產者
??????????????? self.cond.wait()
??????????????? logging.info(self.data)
??????????? # self.event.wait(1)
?
d = Dispatcher()
p = threading.Thread(target=d.produce)
# c = threading.Thread(target=d.custom)
# c1 = threading.Thread(target=d.custom)?? #開啟2個消費線程
# c.start()
# c1.start()
for i in range(5):?? #開啟5個消費線程;如果produce中self.conf.notify(2),生產者通知2個線程處理,5個消費者中誰搶在前誰處理
??? threading.Thread(target=d.custom, name='c-{}'.format(i)).start()
p.start()?? #如果生產者先啟動,已經生成的數據不會被消費者消費,除非在隊列中
?
注:
以上有線程安全問題,解決:中間加MQ;
上例不是線程安全的,程序邏輯有很多瑕疵,但可很好的理解Condition的使用和生產者消費者模型;
一對多,其實就是廣播模式;
?
?
?
屏障、柵欄,可以想象成路障、道閘,3.2引入;
Barrier(paties,action=None,timeout=None),構建Barrier對象,指定參與方數目,timeout是wait方法未指定超時的默認值;
n_waiting,當前在屏障中等待的線程數;
paties,參與方數目,需要多少個等待;
wait(timeout=None),等待通過屏障,返回0到線程數count-1的整數,count為等待的線程總數,每個線程返回不同;如果wait方法設置了超時,并超時發送,屏障將處于broken狀態;wait方法超時發生,屏障處于broken狀態,直至reset;
broken,如果屏障處于打破的狀態,返回True;
abort(),將屏障置于broken狀態,等待中的線程或調用等待方法的線程中都會拋BrokenBarrierError異常,直至reset方法來恢復屏障;
reset(),恢復屏障,重新開始攔截;
?
應用場景:
1、并發初始化;如,centos7中systemd,能并行啟動就并行;
所有線程都必須初始化完成后,才能繼續工作,如運行前加載數據、檢查,如果這些工作沒完成就開始運行,將不能正常工作;
10個線程做10種工作準備,只有這10個線程都完成后,才能繼續工作,先完成的要等待后完成的線程;
如,啟動一個程序,先加載磁盤文件、緩存預熱、初始化連接池等,這些工作齊頭并進,不過只有等滿足了,程序才能繼續后向執行,假設數據庫連接失敗,則初始化工作失效,就要abort,屏障broken,所有線程收到異常退出;
2、工作量,有10個計算任務,完成6個就算工作完成,如求樣本數、求平均數;
?
例:
def worker(barrier:threading.Barrier):
??? logging.info('n_waiting={}'.format(barrier.n_waiting))
??? try:
??????? bid = barrier.wait()
??????? logging.info('after barrier {}'.format(bid))
??? except threading.BrokenBarrierError:
? ??????logging.info('broken barrier is {}'.format(threading.current_thread()))
?
barrier = threading.Barrier(3)?? #3個一撥3個一撥
?
for _ in range(3):?? #依次3,4,5,6
??? threading.Thread(target=worker,args=(barrier,)).start()
輸出:
2018-08-07-08:27:53?????? Thread info: 11496 Thread-1 n_waiting=0
2018-08-07-08:27:53?????? Thread info: 12540 Thread-2 n_waiting=1
2018-08-07-08:27:53?????? Thread info: 4612 Thread-3 n_waiting=2
2018-08-07-08:27:53?????? Thread info: 4612 Thread-3 after barrier 2
2018-08-07-08:27:53?????? Thread info: 11496 Thread-1 after barrier 0
2018-08-07-08:27:53?????? Thread info: 12540 Thread-2 after barrier 1
?
例:
for i in range(6):
??? if i == 2:?? #屏障中等待2個,屏障被broken,wait的線程拋異常,新wait的線程也拋異常,直至屏障恢復,才繼續按達到參與方的數目繼續攔截
??????? barrier.abort()
??? elif i == 3:
??????? barrier.reset()
??? threading.Event().wait(1)
??? threading.Thread(target=worker,args=(barrier,)).start()
輸出:
2018-08-07-09:21:49?????? Thread info: 12668 Thread-1 n_waiting=0
2018-08-07-09:21:50?????? Thread info: 12424 Thread-2 n_waiting=1
2018-08-07-09:21:50?????? Thread info: 12424 Thread-2 broken barrier is <Thread(Thread-2, started 12424)>
2018-08-07-09:21:50?????? Thread info: 12668 Thread-1 broken barrier is <Thread(Thread-1, started 12668)>
2018-08-07-09:21:51?????? Thread info: 11468 Thread-3 n_waiting=0
2018-08-07-09:21:51?????? Thread info: 11468 Thread-3 broken barrier is <Thread(Thread-3, started 11468)>
2018-08-07-09:21:52?????? Thread info: 9788 Thread-4 n_waiting=0
2018-08-07-09:21:53?????? Thread info: 12680 Thread-5 n_waiting=1
2018-08-07-09:21:54?????? Thread info: 10948 Thread-6 n_waiting=2
2018-08-07-09:21:54?????? Thread info: 10948 Thread-6 after barrier 2
2018-08-07-09:21:54?????? Thread info: 9788 Thread-4 after barrier 0
2018-08-07-09:21:54?????? Thread info: 12680 Thread-5 after barrier 1
?
例:
wait方法超時發生,屏障處于broken狀態,直至reset;
?
def worker(barrier:threading.Barrier, i:int):
??? logging.info('waiting for {} threads'.format(barrier.n_waiting))
??? try:
??????? logging.info(barrier.broken)
??????? if i < 3:
??????????? barrier_id = barrier.wait(1)
??????? else:
??????????? if i == 6:
??????????????? barrier.reset()
??????????? barrier_id = barrier.wait()
??????? logging.info('after barrier {}'.format(barrier_id))
??? except threading.BrokenBarrierError:
??????? logging.info('broken barrier. run.')
?
barrier = threading.Barrier(3)
?
for x in range(9):
??? threading.Event().wait(2)
??? threading.Thread(target=worker, args=(barrier,x), name='worker-{}'.format(x)).start()
輸出:
2018-08-07-09:33:24?????? Thread info: 10556 worker-0 waiting for 0 threads
2018-08-07-09:33:24?????? Thread info: 10556 worker-0 False
2018-08-07-09:33:25?????? Thread info: 10556 worker-0 broken barrier. run.
2018-08-07-09:33:26?????? Thread info: 12752 worker-1 waiting for 0 threads
2018-08-07-09:33:26?????? Thread info: 12752 worker-1 True
2018-08-07-09:33:26?????? Thread info: 12752 worker-1 broken barrier. run.
2018-08-07-09:33:28?????? Thread info: 5324 worker-2 waiting for 0 threads
2018-08-07-09:33:28?????? Thread info: 5324 worker-2 True
2018-08-07-09:33:28?????? Thread info: 5324 worker-2 broken barrier. run.
2018-08-07-09:33:30?????? Thread info: 6716 worker-3 waiting for 0 threads
2018-08-07-09:33:30?????? Thread info: 6716 worker-3 True
2018-08-07-09:33:30?????? Thread info: 6716 worker-3 broken barrier. run.
2018-08-07-09:33:32?????? Thread info: 9180 worker-4 waiting for 0 threads
2018-08-07-09:33:32?????? Thread info: 9180 worker-4 True
2018-08-07-09:33:32?????? Thread info: 9180 worker-4 broken barrier. run.
2018-08-07-09:33:34?????? Thread info: 6788 worker-5 waiting for 0 threads
2018-08-07-09:33:34?????? Thread info: 6788 worker-5 True
2018-08-07-09:33:34?????? Thread info: 6788 worker-5 broken barrier. run.
2018-08-07-09:33:36?????? Thread info: 12044 worker-6 waiting for 0 threads
2018-08-07-09:33:36?????? Thread info: 12044 worker-6 True
2018-08-07-09:33:38?????? Thread info: 5020 worker-7 waiting for 1 threads
2018-08-07-09:33:38?????? Thread info: 5020 worker-7 False
2018-08-07-09:33:40?????? Thread info: 13052 worker-8 waiting for 2 threads
2018-08-07-09:33:40?????? Thread info: 13052 worker-8 False
2018-08-07-09:33:40?????? Thread info: 13052 worker-8 after barrier 2
2018-08-07-09:33:40?????? Thread info: 5020 worker-7 after barrier 1
2018-08-07-09:33:40?????? Thread info: 12044 worker-6 after barrier 0
?
?
?
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。