您好,登錄后才能下訂單哦!
這篇文章主要介紹了C++11中線程鎖和條件變量的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
std::thread類, 位于<thread>頭文件,實現了線程操作。std::thread可以和普通函數和 lambda 表達式搭配使用。它還允許向線程的執行函數傳遞任意多參數。
#include <thread> void func() { // do some work } int main() { std::thread t(func); t.join(); return 0; }
上面的例子中,t是一個線程實例,函數func()在該線程運行。調用join()函數是為了阻塞當前線程(此處即主線程),直到t線程執行完畢。線程函數的返回值都會被忽略,但線程函數接受任意數目的輸入參數。
void func(int i, double d, const std::string& s) { std::cout << i << ", " << d << ", " << s << std::endl; } int main() { std::thread t(func, 1, 12.50, "sample"); t.join(); return 0; }
雖然可以向線程函數傳遞任意多參數,但都必須以值傳遞。如果需以引用傳遞,則必須以std::ref或std::cref封裝,如下例所示:
void func(int& a) { a++; } int main() { int a = 42; std::thread t(func, std::ref(a)); t.join(); std::stringcout << a << std::endl; return 0; }
這個程序會打印43,但如果不用std::ref封裝,則輸出會是42。
除了join函數,這個類還提供更多的操作:
swap:交換兩個線程實例的句柄
detach:允許一個線程繼續獨立于線程實例運行;detach 過的線程不可以再 join
int main() { std::thread t(funct); t.detach(); return 0; }
一個重要的知識點是,如果一個線程函數拋出異常,并不會被常規的try-catch方法捕獲。也就是說,下面的寫法是不會奏效的:
try { std::thread t1(func); std::thread t2(func); t1.join(); t2.join(); } catch(const std::exception& ex) { std::cout << ex.what() << std::endl; }
要追蹤線程間的異常,你可以在線程函數內捕獲,暫時存儲在一個稍后可以訪問的結構內。
std::mutex g_mutex; std::vector<std::exception_ptr> g_exceptions; void throw_function() { throw std::exception("something wrong happened"); } void func() { try { throw_function(); } catch(...) { std::lock_guard<std::mutex> lock(g_mutex); g_exceptions.push_back(std::current_exception()); } } int main() { g_exceptions.clear(); std::thread t(func); t.join(); for(auto& e : g_exceptions) { try { if(e != nullptr) { std::rethrow_exception(e); } } catch(const std::exception& e) { std::cout << e.what() << std::endl; } } return 0; }
關于捕獲和處理異常,更深入的信息可以參看Handling C++ exceptions thrown from worker thread in the main thread和How can I propagate exceptions between threads?。
此外,值得注意的是,頭文件還在 `std::this_thread` 命名空間下提供了一些輔助函數:
get_id: 返回當前線程的 id
yield: 告知調度器運行其他線程,可用于當前處于繁忙的等待狀態
sleep_for:給定時長,阻塞當前線程
sleep_until:阻塞當前線程至給定時間點
在上個例子中,我們需要對g_exceptions這個 vector 的訪問進行同步處理,確保同一時刻只有一個線程能向它插入新的元素。為此我使用了一個 mutex 和一個鎖(lock)。mutex 是同步操作的主體,在 C++ 11 的<mutex>頭文件中,有四種風格的實現:
mutex:提供了核心的lock()unlock()方法,以及當 mutex 不可用時就會返回的非阻塞方法try_lock()
recursive_mutex:允許同一線程內對同一 mutex 的多重持有
timed_mutex: 與mutex類似,但多了try_lock_for()try_lock_until()兩個方法,用于在特定時長里持有 mutex,或持有 mutex 直到某個特定時間點
recursive_timed_mutex:recursive_mutex和timed_mutex的結合
下面是一個使用std::mutex的例子(注意get_id()和sleep_for()兩個輔助方法的使用)。
#include <iostream> #include <thread> #include <mutex> #include <chrono> std::mutex g_lock; void func() { g_lock.lock(); std::cout << "entered thread " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(rand() % 10)); std::cout << "leaving thread " << std::this_thread::get_id() << std::endl; g_lock.unlock(); } int main() { srand((unsigned int)time(0)); std::thread t1(func); std::thread t2(func); std::thread t3(func); t1.join(); t2.join(); t3.join(); return 0; }
輸出如下:
entered thread 10144
leaving thread 10144
entered thread 4188
leaving thread 4188
entered thread 3424
leaving thread 3424
lock()unlock()兩個方法應該很好懂,前者鎖住 mutex,如果該 mutex 不可用,則阻塞線程;稍后,后者解鎖線程。
下面一個例子展示了一個簡單的線程安全的容器(內部使用了std::vector)。該容器提供用于添加單一元素的add()方法,以及添加多個元素的addrange()方法(內部調用add()實現)。
注意:盡管如此,下面會指出,由于va_args的使用等原因,這個容器并非真正線程安全。此外,dump()方法不應屬于容器,在實際實現中它應該作為一個獨立的輔助函數。這個例子的目的僅僅是展示 mutex 的相關概念,而非實現一個完整的線程安全的容器。
template <typename T> class container { std::mutex _lock; std::vector<T> _elements; public: void add(T element) { _lock.lock(); _elements.push_back(element); _lock.unlock(); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { _lock.lock(); add(va_arg(arguments, T)); _lock.unlock(); } va_end(arguments); } void dump() { _lock.lock(); for(auto e : _elements) std::cout << e << std::endl; _lock.unlock(); } }; void func(container<int>& cont) { cont.addrange(3, rand(), rand(), rand()); } int main() { srand((unsigned int)time(0)); container<int> cont; std::thread t1(func, std::ref(cont)); std::thread t2(func, std::ref(cont)); std::thread t3(func, std::ref(cont)); t1.join(); t2.join(); t3.join(); cont.dump(); return 0; }
當你運行這個程序時,會進入死鎖。原因:在 mutex 被釋放前,容器嘗試多次持有它,這顯然不可能。這就是為什么引入std::recursive_mutex,它允許一個線程對 mutex 多重持有。允許的最大持有次數并不確定,但當達到上限時,線程鎖會拋出std::system_error錯誤。因此,要解決上面例子的錯誤,除了修改addrange令其不再調用lock和unlock之外,可以用std::recursive_mutex代替mutex。
template <typename T> class container { std::recursive_mutex _lock; // ... };
成功輸出:
6334
18467
41
6334
18467
41
6334
18467
41
敏銳的讀者可能注意到,每次調用func()輸出的都是相同的數字。這是因為,seed 是線程局部量,調用srand()只會在主線程中初始化 seed,在其他工作線程中 seed 并未被初始化,所以每次得到的數字都是一樣的。
手動加鎖和解鎖可能造成問題,比如忘記解鎖或鎖的次序出錯,都會造成死鎖。C++ 11 標準提供了若干類和函數來解決這個問題。封裝類允許以 RAII 風格使用 mutex,在一個鎖的生存周期內自動加鎖和解鎖。這些封裝類包括:
lock_guard:當一個實例被創建時,會嘗試持有 mutex (通過調用lock());當實例銷毀時,自動釋放 mutex (通過調用unlock())。不允許拷貝。
unique_lock:通用 mutex 封裝類,與lock_guard不同,還支持延遲鎖、計時鎖、遞歸鎖、移交鎖的持有權,以及使用條件變量。不允許拷貝,但允許轉移(move)。
借助這些封裝類,可以把容器改寫為:
template <typename T> class container { std::recursive_mutex _lock; std::vector<T> _elements; public: void add(T element) { std::lock_guard<std::recursive_mutex> locker(_lock); _elements.push_back(element); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { std::lock_guard<std::recursive_mutex> locker(_lock); add(va_arg(arguments, T)); } va_end(arguments); } void dump() { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements) std::cout << e << std::endl; } };
讀者可能會提出,dump()方法不更改容器的狀態,應該設為 const。但如果你添加 const 關鍵字,會得到如下編譯錯誤:
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex' to ‘std::recursive_mutex &'
一個 mutex (不管何種風格)必須被持有和釋放,這意味著lock()unlock方法必被調用,這兩個方法是 non-const 的。所以,邏輯上lock_guard的聲明不能是 const (若該方法 為 const,則 mutex 也為 const)。這個問題的解決辦法是,將 mutex 設為mutable。mutable允許由 const 方法更改 mutex 狀態。不過,這種用法僅限于隱式的,或「元(meta)」狀態——譬如,運算過的高速緩存、檢索完成的數據,使得下次調用能瞬間完成;或者,改變像 mutex 之類的位元,僅僅作為一個對象的實際狀態的補充。
template <typename T> class container { mutable std::recursive_mutex _lock; std::vector<T> _elements; public: void dump() const { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements) std::cout << e << std::endl; } };
這些封裝類鎖的構造函數可以通過重載的聲明來指定鎖的策略。可用的策略有:
defer_lock_t類型的defer_lock:不持有 mutex
try_to_lock_t類型的try_to_lock: 嘗試持有 mutex 而不阻塞線程
adopt_lock_t類型的adopt_lock:假定調用它的線程已持有 mutex
這些策略的聲明方式如下:
struct defer_lock_t { }; struct try_to_lock_t { }; struct adopt_lock_t { }; constexpr std::defer_lock_t defer_lock = std::defer_lock_t(); constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t(); constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();
除了這些 mutex 封裝類之外,標準庫還提供了兩個方法用于鎖住一個或多個 mutex:
lock:鎖住 mutex,通過一個避免了死鎖的算法(通過調用lock(),try_lock()和unlock()實現)
try_lock:嘗試通過調用try_lock()來調用多個 mutex,調用次序由 mutex 的指定次序而定
下面是一個死鎖案例:有一個元素容器,以及一個exchange()函數用于互換兩個容器里的某個元素。為了實現線程安全,這個函數通過一個和容器關聯的 mutex,對這兩個容器的訪問進行同步。
template <typename T> class container { public: std::mutex _lock; std::set<T> _elements; void add(T element) { _elements.insert(element); } void remove(T element) { _elements.erase(element); } }; void exchange(container<int>& cont1, container<int>& cont2, int value) { cont1._lock.lock(); std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch to simulate the deadlock cont2._lock.lock(); cont1.remove(value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); }
假如這個函數在兩個線程中被調用,在其中一個線程中,一個元素被移出容器 1 而加到容器 2;在另一個線程中,它被移出容器 2 而加到容器 1。這可能導致死鎖——當一個線程剛持有第一個鎖,程序馬上切入另一個線程的時候。
int main() { srand((unsigned int)time(NULL)); container<int> cont1; cont1.add(1); cont1.add(2); cont1.add(3); container<int> cont2; cont2.add(4); cont2.add(5); cont2.add(6); std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3); std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6); t1.join(); t2.join(); return 0; }
要解決這個問題,可以使用std::lock,保證所有的鎖都以不會死鎖的方式被持有:
void exchange(container<int>& cont1, container<int>& cont2, int value) { std::lock(cont1._lock, cont2._lock); cont1.remove(value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); }
C++ 11 提供的另一個同步機制是條件變量,用于阻塞一個或多個線程,直到接收到另一個線程的通知信號,或暫停信號,或偽喚醒信號。在<condition_variable>頭文件里,有兩個風格的條件變量實現:
condition_variable:所有需要等待這個條件變量的線程,必須先持有一個std::unique_lock
condition_variable_any:更通用的實現,任何滿足鎖的基本條件(提供lock()和unlock()功能)的類型都可以使用;在性能和系統資源占用方面可能消耗更多,因而只有在它的靈活性成為必需的情況下才應優先使用
條件變量的工作機制如下:
至少有一個線程在等待某個條件成立。等待的線程必須先持有一個unique_lock鎖。這個鎖被傳遞給wait()方法,這會釋放 mutex,阻塞線程直至條件變量收到通知信號。當收到通知信號,線程喚醒,重新持有鎖。
至少有一個線程在發送條件成立的通知信號。信號的發送可以用notify_one()方法, 只解鎖任意一個正在等待通知信號的線程,也可以用notify_all()方法, 解鎖所有等待條件成立信號的線程。
在多核處理器系統上,由于使條件喚醒完全可預測的某些復雜機制的存在,可能發生偽喚醒,即一個線程在沒有別的線程發送通知信號時也會喚醒。因而,當線程喚醒時,檢查條件是否成立是必要的。而且,偽喚醒可能多次發生,所以條件檢查要在一個循環里進行。
下面的代碼展示使用條件變量進行線程同步的實例: 幾個工作員線程在運行過程中會產生錯誤,他們將錯誤碼存在一個隊列里。一個記錄員線程處理這些錯誤碼,將錯誤碼從記錄隊列里取出并打印出來。工作員會在發生錯誤時,給記錄員發送信號。記錄員則等待條件變量的通知信號。為了避免偽喚醒,等待工作放在一個檢查布爾值的循環內。
#include <thread> #include <mutex> #include <condition_variable> #include <iostream> #include <queue> #include <random> std::mutex g_lockprint; std::mutex g_lockqueue; std::condition_variable g_queuecheck; std::queue<int> g_codes; bool g_done; bool g_notified; void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_notified = true; g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); while(!g_notified) // used to avoid spurious wakeups { g_queuecheck.wait(locker); } // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } g_notified = false; } } int main() { // initialize a random generator std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); // start the logger std::thread loggerthread(loggerfunc); // start the working threads std::vector<std::thread> threads; for(int i = 0; i < 5; ++i) { threads.push_back(std::thread(workerfunc, i+1, std::ref(generator))); } // work for the workers to finish for(auto& t : threads) t.join(); // notify the logger to finish and wait for it g_done = true; loggerthread.join(); return 0; }
運行這個程序,輸出如下(注意這個輸出在每次運行下都會改變,因為每個工作員線程的工作和休眠的時間間隔是任意的):
[logger] running...
[worker 1] running...
[worker 2] running...
[worker 3] running...
[worker 4] running...
[worker 5] running...
[worker 1] an error occurred: 101
[worker 2] an error occurred: 201
[logger] processing error: 101
[logger] processing error: 201
[worker 5] an error occurred: 501
[logger] processing error: 501
[worker 3] an error occurred: 301
[worker 4] an error occurred: 401
[logger] processing error: 301
[logger] processing error: 401
上面的wait()有兩個重載:
其中一個只需要傳入一個unique_lock;這個重載方法釋放鎖,阻塞線程并將其添加到一個等待該條件變量的線程隊列里;該線程在收到條件變量通知信號或偽喚醒時喚醒,這時鎖被重新持有,函數返回。
另外一個在unique_lock之外,還接收一個謂詞(predicate),循環直至其返回 false;這個重載可用于避免偽喚醒,其功能類似于:
while(!predicate()) wait(lock);
于是,上面例子中布爾值g_notified可以不用,而代之以wait的接收謂詞的重載,用于確認狀態隊列的狀態(是否為空):
void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); g_queuecheck.wait(locker, [&](){return !g_codes.empty();}); // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } } }
除了可重載的wait(),還有另外兩個等待方法,都有類似的接收謂詞以避免偽喚醒的重載方法:
wait_for:阻塞線程,直至收到條件變量通知信號,或指定時間段已過去。
wait_until:阻塞線程,直到收到條件變量通知信號,或指定時間點已達到。
這兩個方法如果不傳入謂詞,會返回一個cv_status,告知是到達設定時間還是線程因條件變量通知信號或偽喚醒而喚醒。
標準庫還提供了notify_all_at_thread_exit方法,實現了通知其他線程某個給定線程已經結束,以及銷毀所有thread_local實例的機制。引入這個方法的原因是,在使用thread_local時, 等待一些通過非join()機制引入的線程可能造成錯誤行為,因為在等待的線程恢復或可能結束之后,他們的析構方法可能還在被調用(參看N3070和N2880)。特別的,對這個函數的一個調用,必須發生在線程剛好退出之前。下面是一個notify_all_at_thread_exit和condition_variable搭配使用來同步兩個線程的實例:
std::mutex g_lockprint; std::mutex g_lock; std::condition_variable g_signal; bool g_done; void workerfunc(std::mt19937& generator) { { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker running..." << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker finished..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); g_done = true; std::notify_all_at_thread_exit(g_signal, std::move(lock)); } int main() { // initialize a random generator std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); std::cout << "main running..." << std::endl; std::thread worker(workerfunc, std::ref(generator)); worker.detach(); std::cout << "main crunching..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "main waiting for worker..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); while(!g_done) // avoid spurious wake-ups g_signal.wait(lock); std::cout << "main finished..." << std::endl; return 0; }
如果 worker 在主線程之前結束,輸出如下:
main running...
worker running...
main crunching...
worker finished...
main waiting for worker...
main finished...
如果主線程在 worker 線程之前結束,輸出如下:
main running...
worker running...
main crunching...
main waiting for worker...
worker finished...
main finished...
感謝你能夠認真閱讀完這篇文章,希望小編分享的“C++11中線程鎖和條件變量的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。