C++多線程 第四章 同步并發(fā)操作
第四章 同步并發(fā)操作
等待事件
設想一個情景:你正坐在一輛從哈爾濱駛向郴州的綠皮火車上,這趟車需要耗時2天2夜,合計3000公里的路程.
于是在這里,我們將你和司機視作為兩個線程.你的任務是在目的地下車,司機的任務是將車開到目的地.
假設你和司機坐在同一個車廂內(nèi),并且你是個不說話就會死的話癆( ) 司機:倒了八輩子血霉.
綠皮火車作為共享資源由互斥元所保護,而這意味著你將較長時間內(nèi)無法行動.
于是,這里你將有幾個策略來解決這個問題:
-
你可以一直在司機的耳旁喋喋不休討論是否到達了目的地.
選擇這一選擇,你需要硬撐兩天兩夜并且影響司機開車的效率,更不用提司機可能嫌你太吵而把你丟出車外. -
你可以設一個約兩天兩夜的鬧鐘.
然后喝下昏睡紅茶.
選擇這一選擇,你可能會提前或延遲醒來,提前醒來后你可能仍會去打擾司機,而延遲醒來你就等著補票+改簽吧. -
你可以跟司機溝通好,讓司機到達后通知你醒來.
然后喝下昏睡紅茶.
這是最佳的選擇,因為這樣可以讓你在合適的時間醒來而又不至于惹怒火車司機.
上面所描述的情景,便是一個經(jīng)典的 等待事件情景.
其中,1對應著自旋鎖;2對應著有粗略時間預測的線程休眠;3對應著條件變量機制.
- 令線程睡眠指定時長:
std::this_thread::sleep_for(<時長>);
- 令線程睡眠到執(zhí)行時間點:
std::this_thread::sleep_until(<時間點>);
條件變量
C++提供了兩個條件變量的實現(xiàn):std::condition_variable 和 std::condition_variable_any.
這兩個實現(xiàn)都在<condition_variable>中實現(xiàn).
兩者都需要與互斥元一起工作以提供恰當?shù)耐?前者僅限于std::mutex,后者則可以為各類互斥元.
下面是一個使用條件變量的例子:
std::queue<double>data_queue;
std::condition_variable data_cond;
std::mutex lk;
const int SIZE = 100;
void data_preparation()
{
for (is_run()) {
double data = sqrt(i);
std::lock_guard<std::mutex>guard(lk);
data_queue.push(data);
data_cond.notify_one();
}
return;
}
void data_process()
{
double temp_data;
while (true) {
std::unique_lock<std::mutex>ul(lk);
data_cond.wait(ul, [] {return !data_queue.empty(); });
temp_data = data_queue.front();
data_queue.pop();
ul.unlock();
std::cout << std::format("the temp_data is {:..15f}",
temp_data
) << std::endl;
if (!preparaing_data())
break;
}
return;
}
條件變量可以多次檢測使用,如果等待線程只打算等待一次,那么條件為true時它就不會再等待這個條件變量了,因而可以使用 期值(future).
期值與異步
C++標準庫通過future為類一次性事件進行建模.如果一個線程需要等待特定的一次性事件,那么它就會獲得一個future來代表這一事件.
C++標準庫中有兩類future,是由
- 唯一future(unique futures,std::future<>)
- 共享future(shared futures,std::shared_future<>)
它們是參照std::unique_ptr和std::shared_ptr建立的.
一個共享的future可以很好地用來線程間的通信.
通常而言,std::future 的使用需要和 std::async 配合.
在不需要立刻得到結果的時候,可以使用std::async來啟動一個 異步任務(asynchronous task).
std::async返回一個std::future對象,只要你在std::future對象上調(diào)用get(),線程就會阻塞直至future就緒.
下面是使用期值的一個實例:
#include <iostream>
#include <format>
#include <thread>
#include <future>
long fib(int n)
{
if (n == 1 || n == 0)
return 1;
return fib(n - 1) + fib(n - 2);
}
int main()
{
std::future<long>result = std::async(fib, 42);
for (int i = 0; i < 10; i++)
std::cout << "main thread is waiting" << std::endl;
std::cout << std::format("the result is {}",
result.get()
) << std::endl;
return 0;
}
std::async實際上還有一個參數(shù)用于決定其是否啟動一個新線程.
當該參數(shù)指定為 std::launch::deferred 時,表明該函數(shù)調(diào)用會延遲至get()或await()執(zhí)行而不啟動新線程.
當該參數(shù)指定為 std::launch::async 時,表明該函數(shù)會啟動新線程來處理異步任務.
而默認情況下,該參數(shù)為 std::launch::deferred|std::launch::async,由具體實現(xiàn)來選擇.
下面是一個小小的實驗:
#include <iostream>
#include <format>
#include <thread>
#include <future>
long fib(int n)
{
if (n == 1 || n == 0)
return 1;
long temp = fib(n - 1) + fib(n - 2);
if (temp == 14930352)
std::cout << std::format("now the temp is {},and id is {}",
temp,
std::this_thread::get_id()
) << std::endl;
return temp;
}
int main()
{
std::future<long>result_1 = std::async(std::launch::deferred,fib, 35);
std::future<long>result_2 = std::async(std::launch::async, fib, 35);
std::cout << std::format("the main thread id is {}",
std::this_thread::get_id()
) << std::endl;
for (int i = 0; i < 10; i++)
std::cout << "main thread is waiting" << std::endl;
std::cout << std::format("the result_1 is {},the result_2 is {}",
result_1.get(),
result_2.get()
) << std::endl;
return 0;
}
運行結果如下:
the main thread id is 8432
main thread is waiting
main thread is waiting
main thread is waiting
main thread is waiting
main thread is waiting
main thread is waiting
main thread is waiting
main thread is waiting
main thread is waiting
main thread is waiting
now the temp is 14930352,and id is 123304
now the temp is 14930352,and id is 8432
the result_1 is 14930352,the result_2 is 14930352
將任務與期值關聯(lián)
std::packaged_task<> 將一個std::future綁定到一個函數(shù)或可調(diào)用對象上,當std::packaged_task<>對象被調(diào)用時,它就調(diào)用相關聯(lián)的函數(shù)或可調(diào)用對象,并且讓future就緒,將返回值作為關聯(lián)數(shù)據(jù)儲存.
std::packaged_task<>類模板的模板參數(shù)為函數(shù)簽名,例如int(std::string&).
當你構造std::packaged_task實例的時候,你必須傳入一個函數(shù)或可調(diào)用對象,類型無需嚴格匹配(隱式轉換).
為了更好地理解std::packaged_task的使用,下面給出一個例子:
#include <iostream>
#include <thread>
#include <future>
#define ADD [](int a,int b){return a+b;}
void task_thread(int x, int y)
{
std::packaged_task<int(int, int)> task(ADD);
std::future<int> result = task.get_future();
task(x, y);
std::cout << "task_thread main thread:" << result.get() << std::endl;
task.reset();
result = task.get_future();
std::thread td(move(task), x, y);
td.join();
std::cout << "task_thread aysnc:" << result.get() << std::endl;
}
int main()
{
std::cout << "please input x and y:" << std::endl;
int x, y;
std::cin >> x >> y;
task_thread(x,y);
return 0;
}
通過對該例子的理解,我們發(fā)現(xiàn):所謂的std::packaged_task實際上就是一個函數(shù)與期值的封裝.
同時,我們發(fā)現(xiàn)可以使用 std::packaged_task<>::reset() 來重置共享狀態(tài),這無疑為其提供了良好的可復用性.
下面的例子為我們揭示了一般GUI程序中其他線程如何與繪圖線程結合:
#include <iostream>
#include <format>
#include <random>
#include <thread>
#include <mutex>
#include <future>
#include <vector>
#include <deque>
#include <windows.h>
#include "include/graphics.h"
#pragma comment(lib,"graphics64.lib")
#define PRESSED(nVirtKey) ((GetKeyState(nVirtKey) & (1<<(sizeof(SHORT)*8-1))) != 0)
#define TOGGLED(nVirtKey) ((GetKeyState(nVirtKey) & 1) != 0)
typedef struct point_2d {
public:
double x, y, r;
}point_2d;
std::mutex lk;
std::deque<std::packaged_task<void(point_2d)>>tasks;
std::deque<point_2d>datas;
void mainGuiTask()
{
ege::initgraph(640, 480);
ege::setcaption(L"parallel draw");
ege::setbkcolor(ege::BLACK);
ege::setcolor(ege::LIGHTGREEN);
for (; ege::is_run(); delay_fps(60)) {
ege::cleardevice();
std::packaged_task<void(point_2d)>gui_task;
point_2d task_data;
{
std::lock_guard<std::mutex>guard(lk);
if (tasks.empty())
continue;
gui_task = std::move(tasks.front());
tasks.pop_front();
task_data = std::move(datas.front());
datas.pop_front();
}
gui_task(task_data);
}
ege::closegraph();
return;
}
template<typename Func>
std::future<void>postTask(Func func)
{
std::packaged_task<void(point_2d)>post_task(func);
std::future<void>res = post_task.get_future();
std::lock_guard<std::mutex>guard(lk);
tasks.push_back(std::move(post_task));
return res;
}
void mainVKboardTask()
{
std::uniform_real_distribution<double> u_x(0.0, 640.0);
std::uniform_real_distribution<double> u_y(0.0, 480.0);
std::uniform_real_distribution<double> u_r(50.0, 150.0);
std::default_random_engine engine(time(0));
point_2d temp_point;
POINT cursor;
double temp_x, temp_y, temp_r;
for (; ege::is_run();) {
if (PRESSED(32)) {
datas.push_front(temp_point);
postTask(
[](point_2d ilist){ ege::circle(
ilist.x,
ilist.y,
ilist.r
);
return;
}
);
GetCursorPos(&cursor);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
temp_x = u_x(engine);
temp_y = u_y(engine);
temp_r = u_r(engine);
temp_point = { temp_x,temp_y,temp_r };
}
return;
}
int main()
{
std::thread gui_thread(mainGuiTask);
std::thread mouse_thread(mainVKboardTask);
gui_thread.join();
mouse_thread.join();
return 0;
}
即通過一個std::packaged_task隊列進行通信.
預示
在前面,我們注意到在std::packaged_task的使用中,std::packaged_task與函數(shù)相綁定,std::future通過與std::packaged_task綁定來等待任務完成.
而這里我們要引入 std::promise.
我們知道,當有一個需要處理大量網(wǎng)絡連接的應用程序時,通常傾向于在獨立的線程上分別處理每一個連接.但隨著連接數(shù)的增加,大量線程會消耗大量操作系統(tǒng)資源,并可能導致大量的上下文切換.
因而,在具有超大量網(wǎng)絡連接的應用程序中,通常用少量線程來處理連接,每個線程一次處理多個連接.考慮這類連接,數(shù)據(jù)包將以基本上隨機的順序來自待處理的各個連接,以基本上隨機的順序排隊發(fā)送.
std::promise<> 提供了一種設置值方式,它可以在這之后通過相關聯(lián)的std::future對象進行讀取.等待中的線程可以阻塞std::future,同時提供數(shù)據(jù)的線程可以使用配對中的std::promise來設置值以令std::future就緒.
下面是使用std::promise的一個簡單例子:
#include <iostream>
#include <future>
#include <chrono>
void threadFun1(std::promise<int>& p)
{
std::this_thread::sleep_for(std::chrono::seconds(2));
int iVal = 233;
std::cout << "input:" << iVal << std::endl;
p.set_value(iVal);
return;
}
void threadFun2(std::future<int>& f)
{
auto iVal = f.get();
std::cout << "receive:" << iVal << std::endl;
return;
}
int main()
{
std::promise<int> pr1;
std::future<int> fu1 = pr1.get_future();
std::thread t1(threadFun1, std::ref(pr1));
std::thread t2(threadFun2, std::ref(fu1));
t1.join();
t2.join();
return 0;
}
不難發(fā)現(xiàn),std::promise為線程間通信提供了一種合適的機制.
然而,需要補充的:如果銷毀std::promise時未設置值,則會存入一個異常.
為future保存異常
考慮下面這樣一個情景:
double square_root(double x)
{
if(x<0)
throw std::out_of_range("x<0");
return sqrt(x);
}
其單線程的版本為:
double y=square_root(-1);
而假若以異步的形式調(diào)用,有:
std::future<double>f=std::async(square_root,-1);
double y=f.get();
兩者行為完全一致自然是最理想的.
但是,事實上,實際情況是: 如果作為std::async一部分的函數(shù)調(diào)用引發(fā)了異常,該異常會被存儲在future中,代替所存儲的值,future變?yōu)榫途w,并且對get()的調(diào)用會引發(fā)所存儲的異常.
這同樣發(fā)生在std::packaged_task發(fā)生異常時.
而std::promise也提供了一種顯式方式存儲異常:使用set_exception()而不是set_value來使std::future存儲異常.
std::promise<double>some_promise;
try{
some_promise.set_value(calculate_value());
}
catch(...){
some_promise.set_exception(std::current_exception());
}
std::current_exception用于獲得已引發(fā)的異常,而std::copy_exception()則可以在不引發(fā)的情況下直接存儲新的異常.例如:
some_promise.set_exception(std::copy_exception(std::logic_error("foo")));
等待自多個線程
盡管std::future能處理從一個線程向另一個線程轉移數(shù)據(jù)所需的全部必須的同步,但是get()卻會移動資源的所有權.
因而,為了讓多個線程能夠等待同一個時間,應該使用 std::shared_future.
std::future是可移動的,std::shared_future是可復制的.
然而,為了避免數(shù)據(jù)競爭,對std::shared_future進行共享仍然需要鎖的保護.
需要補充的,引用了異步狀態(tài)的std::shared_future實例可以通過引用這些狀態(tài)的std::future實例來構造.
但是,從std::future到std::shared_future實際上發(fā)生了隱式的所有權轉移.
而且std::future本身有一個方法std::future::share()用于顯式轉換為std::shared_future.
std::promise<double>pr;
std::shared_future<double>fu = pr.get_future().share();
有時間限制的等待
時鐘
就C++標準庫所關注而言,時鐘是時間信息的來源.
時鐘的當前時間可以通過該時鐘類的靜態(tài)成員now()來獲取.
例如std::chrono::system_clock::now()返回系統(tǒng)時鐘的當前時間.
時鐘的節(jié)拍周期是由分數(shù)秒決定的.
如果一個時鐘以均勻速率計時且不能被調(diào)整,則該時鐘被稱為勻速(steady)時鐘.如果時鐘是勻速的,則時鐘類的is_steady靜態(tài)數(shù)據(jù)成員為true.
通常而言,std::chrono::system_clock是不勻速的,因為時鐘可以調(diào)整.
如果需要勻速時鐘,使用std::chrono::system_clock.
時間段
時間段均由std::chrono::duration<>類模板處理
標準庫在std::chrono命名空間中為各種時間提供了一組預定義的typedef,包括nanoseconds,microseconds,milliseconds,seconds,minutes和hours.
在無需截斷值的場合,時間段之間的轉換是隱式的,顯式轉換可以通過std::chrono::duration_cast實現(xiàn)
時間段支持算術運算.
基于時間段的等待是通過 std::chrono::duration<> 實現(xiàn)的.例如:
std::future<int>f=std::async(some_task);
if(f.wait_for(std::chrono::milliseconds(35))==std::future_status::ready)
do_something_with(f.get());
這個例子表示等待future最多35毫秒.
如果等待超時,將返回std::future_status::timeout,否則返回std::future_status::ready.
如果任務推遲,那么返回std::future_status::deferred.
時間點
時間點通過std::chrono::time_point<>類模板實例來表示.
時間點的值是時間的長度,因而一個特定時間點被稱為時鐘的紀元.
時鐘可以共享紀元或者擁有獨立的紀元.
下面這個例子演示了一個具有超時的條件變量的使用.
#include <iostream>
#include <condition_variable>
#include <future>
#include <mutex>
#include <chrono>
#include <omp.h>
std::condition_variable cv;
std::mutex m;
bool done;
bool wait_loop()
{
auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(1000);
std::unique_lock<std::mutex>lk(m);
while (!done)
if (cv.wait_until(lk, timeout) == std::cv_status::timeout)
break;
return done;
}
int main()
{
double begin_time = omp_get_wtime();
std::future<bool>fu = std::async(std::launch::async, wait_loop);
if (fu.get())
std::cout << "task done" << std::endl;
else
std::cout << "time out" << std::endl;
double run_time = omp_get_wtime() - begin_time;
std::cout << "time:" << run_time << std::endl;
return 0;
}
接受超時的函數(shù)
超時的最簡單用法,是將延遲添加到特定線程的處理過程中,一邊在他無所事事時避免占用其他線程的處理時間.
例如:
- std::this_thread::sleep_for()
- std::this_thread::sleep_until()
睡眠并不是唯一接受超時的工具,事實上future也可以與超時結合使用.
如果互斥元支持的話,甚至可以試圖在互斥元獲得鎖時使用超時.
例如:
- std::timed_mutex
- std::recursive_timed_mutex
這兩種類型均支持try_lock_for()和try_lock_until()成員函數(shù),它們可以在指定時間段內(nèi)或指定時間點之前嘗試獲取所.
接受超時的函數(shù):
| 類/名稱空間 | 函數(shù) | 返回值 |
|---|---|---|
| std::this_thread命名空間 | sleep_for(duration) sleep_until(time_point) |
none |
| std::condition_variable std::condition_variable_any |
wait_for(lock,duration) wait_until(lock,time_point) |
std::cv_status::timeout std::cv_status::no_timeout |
| std::timed_mutex std::recursive_timed_mutex |
try_lock_for(duration) try_lock_until(time_point) |
bool-true獲得鎖 |
| std::unique_lock<TimedLockable> | unique_lock(lockable,duration) unique_lock(lockable,time_point) |
bool-true獲得鎖 |
| std::future<ValueType> std::shared_future<ValueType> |
wait_for(duration) wait_until(time_point) |
std::future_status::timeout超時 std::future_status::ready就緒 std::future_status::deferred還未開始 |
操作同步
函數(shù)式編程(functional programming,FP): 一種編程風格,函數(shù)調(diào)用的結果僅單純依賴于該函數(shù)的參數(shù)而不依賴于任何外部狀態(tài).
函數(shù)式編程意味著以同樣參數(shù)運行同一個函數(shù)多次將得到相同的結果.
為了說明FP編程的思想,我們在這里通過一個簡單的快速排序實現(xiàn)來說明:
template<typename T>
std::list<T>sequential_quick_sort(std::list<T>input)
{
if(input.empty)
return input;
std::list<T>result;
result.splice(result.begin(),input,input.begin());
T const& pivot = *result.begin();
auto divide_point=std::partition(
input.begin(),
input.end(),
[&](T const&t){return t<pivot;}
);
std::list<T>lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),divide_point);
auto new_lower(sequential_quick_sort(std::move(lower_part)));
auto new_higher(sequential_quick_sort(std::move(input)));
result.splice(result.end(),new_higher);
result.splice(result.begin(),new_lower);
return result;
}
由于采用了FP式編程風格,現(xiàn)在我們可以通過future輕易將其并行化.
template<typename T>
std::list<T>parallel_quick_sort(std::list<T>input)
{
if(input.empty)
return input;
std::list<T>result;
result.splice(result.begin(),input,input.begin());
T const& pivot = *result.begin();
auto divide_point=std::partition(
input.begin(),
input.end(),
[&](T const&t){return t<pivot;}
);
std::list<T>lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),divide_point);
std::future<std::list<T>>new_lower(std::async(
¶llel_quick_sort<T>,
std::move(lower_part))
);
auto new_higher(parallel_quick_sort(std::move(input)));
result.splice(result.end(),new_higher);
result.splice(result.begin(),new_lower.get());
return result;
}
然而,與其使用std::async()不如自行編寫spawn_task()函數(shù)作為std::packaged_task和std::thread的簡單封裝.
template<typename F,typename A>
std::future<std::result_of<F(A&&)>::type>spawn_task(F&& f,A&& a)
{
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type(A&&)>task(std::move(f));
std::future<result_type>res(task.get_future());
std::thread t(std::move(task),std::move);
t.detach();
return res;
}
函數(shù)式編程并不是唯一的避開共享可變數(shù)據(jù)的并發(fā)編程范式;另一種范式為CSP(Communicating Sequential Process,通信順序處理),這種范式下線程在概念上獨立,沒有共享數(shù)據(jù),但是具有允許消息在它們之間進行傳遞的通信通道.
具有消息傳遞的同步
CSP的實現(xiàn)很簡單:若沒有共享數(shù)據(jù),則每個線程可以完全獨立地推理得到,只需基于它對所接收到的消息如何進行反應.
因而每個線程實際上可以等效為一個狀態(tài)機:當它接收到消息時,它會根據(jù)初始狀態(tài)進行操作,并以某種方式更新其狀態(tài),且可能想其他線程發(fā)送一個或多個消息.
編寫這種線程的一種方式,是將其形式化并實現(xiàn)一個有限狀態(tài)機模型.

浙公網(wǎng)安備 33010602011771號