C++多線程 第三章 在線程間共享數據
第三章 在線程間共享數據
共享數據基本問題
如果所有共享數據都只讀,那就沒有問題.
不變量(invariants): 對特定數據結構總為真的語句.例如:"該變量表示線程數量."
修改線程之間共享數據的一個常見潛在問題就是破壞不變量.
競爭條件(race condition): 線程競爭執行各自的操作,導致不變量的破壞.
數據競爭(data race): 因對當個對象的并發修改而產生的特定類型的競爭條件.
軟件事務內存(STM): 所需的一系列數據修改和讀取被存儲在一個事務日志中,然后在單個步驟中提交.如果該提交因為數據結構已被另一個線程修改而無法進行,該事務將重新啟動.
使用互斥元
互斥元(mutex): 在訪問共享數據結構之前,鎖定(lock) 該數據相關互斥元;當訪問數據結構完成后, 解鎖(unlock) 該互斥元.線程庫會保證一旦某個線程鎖定了某個互斥元,所有試圖鎖定相同互斥元的其他線程都需要等待.
- 創建互斥元:
std::mutex some_mutex;
- 鎖定互斥元:
some_mutex.lock();
- 解鎖互斥元:
some_mutex.unlock();
- 使用RAII慣用語法的互斥元:
std::lock_guard<std::mutex>guard(some_mutex);
為了解釋互斥元的使用,在這里使用一個簡單的例子對其進行解釋:
#include <iostream>
#include <thread>
#include <windows.h>
int count = 0;
void func_1()
{
for (int i = 0; i < 10000; i++)
count++;
return;
}
void func_2()
{
for (int i = 0; i < 10000; i++)
count++;
return;
}
int main()
{
std::thread t_1(func_1);
std::thread t_2(func_2);
t_1.join();
t_2.join();
for (int i = 0; i < 10000; i++)
count++;
std::cout << "the value of count is :" << count;
return 0;
}
由于對全局變量count的搶用,事實上,這個程序最終得到的結果是隨機的.
然而,通過對std::mutex的使用,我們可以盡可能避免這一問題:
#include <iostream>
#include <thread>
#include <mutex>
#include <windows.h>
int count = 0;
std::mutex count_mutex;
void func_1()
{
std::lock_guard<std::mutex>guard(count_mutex);
for (int i = 0; i < 10000; i++)
count++;
return;
}
void func_2()
{
std::lock_guard<std::mutex>guard(count_mutex);
for (int i = 0; i < 10000; i++)
count++;
return;
}
int main()
{
std::thread t_1(func_1);
std::thread t_2(func_2);
t_1.join();
t_2.join();
for (int i = 0; i < 10000; i++)
count++;
std::cout << "the value of count is :" << count;
return 0;
}
在上面的代碼中,我們首先創建了一個全局std::mutex互斥量.
然后,在使用的過程中,我們使用RAII慣用語法std::lock_guard來對其進行管理,保證了過程的完整性.
保護并行數據
由于并行常常帶來一些意想不到的問題,所以我們需要思考如何更好地保護并行程序中的數據,下面是一個有趣的例子:
#include <iostream>
#include <thread>
#include <mutex>
#include <format>
std::mutex func_mutex;
template<typename Function>
void doSomething(Function func)
{
std::lock_guard<std::mutex>guard(func_mutex);
int data = 100;
func(data);
std::cout << std::format("the data is: {}", data);
return;
}
void badFunc(int& data)
{
data = 200;
return;
}
int main()
{
std::thread t(&doSomething<void(int&)>, badFunc);
t.join();
std::lock_guard<std::mutex>guard(func_mutex);
return 0;
}
在上面的例子中,我們設計了一個函數doSomething,其接收外部的函數來對數據進行操作.
然而,我們模擬了一個惡意函數badFunc傳入的情景: 它通過引用繞開了鎖并修改了數據!
這在大多數情況下當然不是我們想要的.
因而記住: 不要將指向受保護數據的指針與引用傳遞到鎖的范圍之外.
死鎖
死鎖(deadlock): 一對線程中的每一個都需要同時鎖定兩個互斥元來執行一些操作,并且每個線程都擁有了一個互斥元,同時等待另外一個.兩個線程都無法繼續.
下面是一個死鎖的例子:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex_1;
std::mutex mutex_2;
void func_1()
{
for (int i = 0; i < 10; i++) {
std::cout << "now is func_1" << std::endl;
std::lock_guard<std::mutex>guard_1(mutex_1);
std::lock_guard<std::mutex>guard_2(mutex_2);
}
std::cout << "func_1" << std::endl;
return;
}
void func_2()
{
for (int i = 0; i < 10; i++) {
std::cout << "now is func_2" << std::endl;
std::lock_guard<std::mutex>guard_2(mutex_2);
std::lock_guard<std::mutex>guard_1(mutex_1);
}
std::cout << "func_2" << std::endl;
return;
}
int main()
{
std::thread t_1(func_1);
std::thread t_2(func_2);
t_1.join();
t_2.join();
return 0;
}
為了避免死鎖,常見的建議是始終使用相同的順序鎖定這兩個互斥元.這在大多數情況有效.
但是,實際上,有一種更為方便的方法: 通過std::lock同時鎖定兩個或更多互斥元.
- 同時鎖定多個互斥元:
std::lock(mutex_1,mutex_2[,other...]);
- 將已鎖定的互斥元的所有權轉移到lock_guard:
std::lock_guard<std::mutex>guard(mutex_1,std::adopt_lock);
我們利用std::lock,解決上面的死鎖問題:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex_1;
std::mutex mutex_2;
void func_1()
{
for (int i = 0; i < 10; i++) {
std::lock(mutex_1, mutex_2);
std::cout << "now is func_1" << std::endl;
std::lock_guard<std::mutex>guard_1(mutex_1,std::adopt_lock);
std::lock_guard<std::mutex>guard_2(mutex_2,std::adopt_lock);
}
std::cout << "func_1" << std::endl;
return;
}
void func_2()
{
for (int i = 0; i < 10; i++) {
std::lock(mutex_2, mutex_1);
std::cout << "now is func_2" << std::endl;
std::lock_guard<std::mutex>guard_2(mutex_2,std::adopt_lock);
std::lock_guard<std::mutex>guard_1(mutex_1,std::adopt_lock);
}
std::cout << "func_2" << std::endl;
return;
}
int main()
{
std::thread t_1(func_1);
std::thread t_2(func_2);
t_1.join();
t_2.join();
return 0;
}
然而,死鎖的來源遠不止鎖定.例如下面這個例子:
#include <iostream>
#include <thread>
void func(std::thread& t)
{
t.join();
return;
}
int main()
{
std::thread t1, t2;
std::thread temp_1(func, std::ref(t2));
t1 = std::move(temp_1);
std::thread temp_2(func, std::ref(t1));
t2 = std::move(temp_2);
t1.join();
return 0;
}
其通過兩個線程之間的互相調用實現了死鎖.
上面的例子為我們說明了死鎖現象的防不勝防.為了盡量避免死鎖的出現,我們有下面幾點建議:
- 避免嵌套鎖:如果你已經持有一個鎖,就別再獲取鎖.
- 在持有鎖時,避免調用用戶提供的代碼.
- 以固定順序獲取鎖:在每個線程中以相同順序獲得鎖.
- 使用鎖層次:
鎖層次通過對線程當前層次值,上一次層次值的保存,結合鎖層次值,在不符合鎖層次時拋出logic_error來解決死鎖.
- hierarchical_mutex:
class hierarchical_mutex
{
private:
std::mutex internal_mutex;
unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;
static thread_local unsigned long this_thread_hierarchy_value;
void checkForHierarchyViolation()
{
if (this_thread_hierarchy_value <= hierarchy_value)
throw std::logic_error("mutex hierarchy violated");
}
void updateHierarchyValue()
{
previous_hierarchy_value = this_thread_hierarchy_value;
this_thread_hierarchy_value = hierarchy_value;
}
public:
explicit hierarchical_mutex(unsigned long value)
:hierarchy_value(value), previous_hierarchy_value(0){ return; }
void lock()
{
checkForHierarchyViolation();
internal_mutex.lock();
updateHierarchyValue();
}
void unlock()
{
this_thread_hierarchy_value = previous_hierarchy_value;
internal_mutex.unlock();
}
bool try_lock()
{
checkForHierarchyViolation();
if (!internal_mutex.try_lock())
return false;
updateHierarchyValue();
return true;
}
};
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
下面再給出一個使用鎖層次的實例:
#include <iostream>
#include <thread>
#include <mutex>
hierarchical_mutex mutex_1(1000);
hierarchical_mutex mutex_2(500);
void func_1()
{
try {
for (int i = 0; i < 10; i++) {
std::cout << "now is func_1" << std::endl;
std::lock_guard<hierarchical_mutex>guard_1(mutex_1);
std::lock_guard<hierarchical_mutex>guard_2(mutex_2);
}
}
catch (std::logic_error) {
std::cout << "func_1" << std::endl;
}
return;
}
void func_2()
{
try {
for (int i = 0; i < 10; i++) {
std::cout << "now is func_2" << std::endl;
std::lock_guard<hierarchical_mutex>guard_2(mutex_2);
std::lock_guard<hierarchical_mutex>guard_1(mutex_1);
}
}
catch (std::logic_error) {
std::cout << "func_2" << std::endl;
}
return;
}
int main()
{
std::thread t_1(func_1);
std::thread t_2(func_2);
t_1.join();
t_2.join();
return 0;
}
由于func_2處存在不符合的層次值的情況,因而該線程的循環很快終止.也避免了死鎖.
靈活鎖定
通過松弛不變量,std::unique_lock比std::lock_guard提供了更多的靈活性,一個std::unique_lock實例不總是擁有與之相關聯的互斥元.
使用std::unique_lock與std::defer_lock相結合,可以很方便地實現std::lock_guard與std::adopt_lock相結合的效果.
std::adopt_lock表示互斥元已被鎖上,std::defer_lock則表示互斥元暫未被鎖上.
- 將未被鎖定的互斥元記錄到unique_lock:
std::unique_lock<std::mutex> ulock(mutex_1,std::defer_lock);
下面是通過其解決死鎖問題的方式:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex_1;
std::mutex mutex_2;
void func_1()
{
for (int i = 0; i < 10; i++) {
std::unique_lock<std::mutex>ulock_1(mutex_1, std::defer_lock);
std::unique_lock<std::mutex>ulock_2(mutex_2, std::defer_lock);
std::lock(mutex_1, mutex_2);
std::cout << "now is func_1" << std::endl;
}
std::cout << "func_1" << std::endl;
return;
}
void func_2()
{
for (int i = 0; i < 10; i++) {
std::unique_lock<std::mutex>ulock_2(mutex_2, std::defer_lock);
std::unique_lock<std::mutex>ulock_1(mutex_1, std::defer_lock);
std::lock(mutex_2, mutex_1);
std::cout << "now is func_2" << std::endl;
}
std::cout << "func_2" << std::endl;
return;
}
int main()
{
std::thread t_1(func_1);
std::thread t_2(func_2);
t_1.join();
t_2.join();
return 0;
}
因為std::unique_lock實例并沒有擁有與其相關的互斥元,所以通過四處移動(moving)實例,互斥元的所有權可以在實例之間進行轉移.
單一全局實例
設想一個 延遲初始化(lazy initialization) 的例子.這在單線程代碼中很常見:每個請求資源的操作首先檢查它是否已經初始化,如果沒有就在使用之前初始化.
std::shared_ptr<some_resource> resource_ptr;
void foo()
{
if(!resource_ptr)
resource_ptr.reset(new some_resource);
resource_ptr->do_something();
return;
}
一般來說,在其中使用互斥元的做法為:
std::shared_ptr<some_resource>resource_ptr;
std::mutex resource_mutex;
void foo()
{
std::unique_lock<std::mutex>lk(resource_mutex);
if(!resource_ptr)
resource_ptr.reset(new some_resource);
lk.unlock();
resource_ptr->do_something();
return;
}
然而,這樣會有很大的非必要序列化問題.
于是,有人提出了臭名昭著的 二次檢查鎖定(double-checked locking) 模式.
這一模式已被證明是災難性的.
void double_checked_locking()
{
if(!resource_ptr){
std::lock_guard<std::mutex>lk(resource_mutex);
if(!resource_ptr)
resource_ptr.reset(new some_resource);
}
resource_ptr->do_something();
return;
}
由于在線程對指針所指向內存進行修改時可能尚未flush,這可能導致數據競爭的問題,新的數據被更新的數據覆蓋.
為了解決上面情景的問題,C++提供了 std::call_once 與 std::once_flag 來處理這種情景.
使用std::call_once可以使得某函數只被一個線程執行,且效率比std::mutex高.
我們通過下面這個例子來說明:
#include <iostream>
#include <format>
#include <thread>
#include <mutex>
#include <omp.h>
std::shared_ptr<double>resource_ptr_1;
std::shared_ptr<double>resource_ptr_2;
std::mutex resource_mutex;
std::once_flag resource_flag;
void init_resource()
{
resource_ptr_2.reset(new double);
return;
}
void foo_1()
{
std::unique_lock<std::mutex>lk(resource_mutex);
if (!resource_ptr_1)
resource_ptr_1.reset(new double);
lk.unlock();
return;
}
void foo_2()
{
std::call_once(resource_flag, init_resource);
return;
}
int main()
{
double temp_time, run_time;
temp_time = omp_get_wtime();
std::thread t1(foo_1);
std::thread t2(foo_1);
t1.join(), t2.join();
run_time = omp_get_wtime() - temp_time;
std::cout << std::format("the runtime_1 is {:.15f}s",
run_time
) << std::endl;;
temp_time = omp_get_wtime();
std::thread t3(foo_2);
std::thread t4(foo_2);
t3.join(), t4.join();
run_time = omp_get_wtime() - temp_time;
std::cout << std::format("the runtime_2 is {:.15f}s",
run_time
) << std::endl;;
return 0;
}
其運行結果為:
the runtime_1 is 0.004862599889748s
the runtime_2 is 0.000809999997728s
在C++中,如果需要單一全局實例,那么還可以通過static變量來實現.
在C++11之前,對static變量的初始化可能造成數據競爭.但是現在static可以用作std::call_once的替代品.
讀寫互斥元
讀寫互斥元(reader-writer): 由單個"寫"線程獨占訪問或共享,由多個"讀"線程并發訪問.
C++標準庫目前沒有直接提供這樣的互斥元,但是boost庫提供了.
- 創建一個共享鎖
mutable boost::shared_mutex entry_mutex;
- 鎖定一個共享鎖
std::lock_guard<boost::shared_mutex> guard(entry_mutex);
- 共享鎖定一個共享鎖
boost::shared_lock<boost::shared_mutex>lk(entry_mutex);
- 獨占鎖定一個共享鎖
std::unique_lock<boost::shared_mutex>lk(entry_mutex);
如果一個線程擁有一個共享鎖,試圖獲取獨占鎖的線程會被阻塞,知道其他線程全部撤回他們的鎖.
如果一個線程擁有獨占鎖,其他線程都不能獲取共享鎖或獨占鎖.
共享鎖可以用于許多情景,其中一個與我們最貼切的情景就是通過并行串口COM進行串口通信時數據的讀寫.
遞歸鎖
在前面第二章我們提到過,對同一個std::mutex進行多次鎖定是一個 未定義行為(undefined behavior).
所以是否存在一個可以多次鎖定的互斥元呢?答案是:是的,那就是遞歸鎖.
- 創建一個遞歸鎖
std::recursive_mutex some_mutex;
這個互斥元是可以鎖定多次的.但是,相對的,當你多次lock后,你也需要多次unlock才能解除對其的鎖定.
在開發中使用遞歸鎖是不推薦的.

浙公網安備 33010602011771號