C++多線程 第八章 設計并發代碼
第八章 設計并發代碼
數據劃分工作
在處理開始前在線程間劃分數據方面,C++與MPI或OpenMP的方式較為相似.
一個任務被分成一個并行任務集,工作的線程獨立運行這些任務.并且在最后的化簡步驟中合并這些結果.
盡管這種方法是很有效的,但是只有在數據可以實現劃分時,才可如此.
考慮這樣一種情景:快速排序算法有兩個基本步驟,基于一個關鍵值將數據劃分為兩部分,一部分在關鍵值之前,一部分在關鍵值之后,然后遞歸地排序這兩個部分.
這種情境下,將無法通過預先劃分數據來實現并行. 此時應該通過遞歸劃分總體任務.
下面給出一個使用待排序塊棧的并行快速排序:
#include <iostream>
#include <thread>
#include <future>
#include <atomic>
#include <algorithm>
#include <vector>
#include <stack>
#include <list>
template<typename T>
struct sorter {
struct chunk_to_sort {
std::list<T>data;
std::promise<std::list<T>>inner_promise;
};
unsigned const max_thread_count;
std::stack<chunk_to_sort>chunks;
std::vector<std::thread>threads;
std::atomic<bool>end_of_data;
sorter() :
max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false) { return; }
~sorter()
{
end_of_data = true;
for (unsigned i = 0; i < threads.size(); i++)
if(threads[i].joinable())
threads[i].join();
return;
}
void sort_chunk(std::shared_ptr<chunk_to_sort>const& chunk)
{
chunk->inner_promise.set_value(do_sort(chunk->data));
return;
}
void try_sort_chunk()
{
std::shared_ptr<chunk_to_sort>chunk = std::make_shared<chunk_to_sort>(std::move(chunks.top()));
chunks.pop();
if (chunk)
sort_chunk(chunk);
return;
}
void sort_thread()
{
while (!end_of_data) {
try_sort_chunk();
std::this_thread::yield();
}
return;
}
std::list<T>do_sort(std::list<T>& chunk_data)
{
if (chunk_data.empty())
return chunk_data;
std::list<T>result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_var = *(result.begin());
typename std::list<T>::iterator divide_point = std::partition(chunk_data.begin(),
chunk_data.end(),
[&](T const& val) {return val < partition_var; }
);
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data, chunk_data.begin(), divide_point);
std::future<std::list<T>>new_lower = new_lower_chunk.inner_promise.get_future();
chunks.push(std::move(new_lower_chunk));
if (threads.size() < max_thread_count)
threads.push_back(std::thread(&sorter<T>::sort_thread, this));
std::list<T>new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
while (new_lower.wait_for(std::chrono::seconds(0)) != std::future_status::ready)
try_sort_chunk();
result.splice(result.begin(), new_lower.get());
return result;
}
};
template<typename T>
std::list<T>parallel_quick_sort(std::list<T>input)
{
if (input.empty())
return input;
sorter<T>s;
return s.do_sort(input);
}
int main()
{
std::list<double>datas, results;
double temp_data;
for (int i = 0; i < 10; i++) {
std::cin >> temp_data;
datas.push_back(temp_data);
}
results = parallel_quick_sort(datas);
for (auto iter : results)
std::cout << iter << std::endl;
return 0;
}
其中,do_sort中new_higher通過不斷分治,將每次所得前半段保留在本線程,而將后半段的任務作為塊壓入棧中供其他線程進行處理.
try_sort_chunk()函數即用于處理塊棧中的任務,其存在于sort_thread()與do_sort()中,體現了一定的線程池思想.
以任務類型劃分工作
然而,通過給每個線程分配不同數據塊在線程間劃分工作仍然是基于線程會對每個數據塊做同樣工作的假設.
劃分工作的另一種方法是使得線程變得專業化,即 不同線程執行不同任務.
這種劃分工作的方式源自于將并發中的關注點分離.每個線程都有不同的任務,并且獨立于別的線程來工作.
然而,多線程關鍵點分離主要有兩個危害,首先是有肯能分離錯誤的關鍵點,導致大量線程間共享數據.其次是有可能的死鎖,不同的線程都以等待彼此作為結束.
這兩種情況都可以總結為線程間存在過多通信.
如果你的任務是由很多獨立數據項運行同樣的操作序列組成的話,就可以使用 管道 來開發系統可能的并發性.
數據通過一系列操作從一端流入,并且從另一端流出.
為了用這種方式劃分工作,你在管道的每個步驟都創造一個獨立的線程:即序列中的每個操作都有一個線程.當操作完成時,數據元素被放入隊列中供下一個線程獲得.
使用管道處理并行數據,處理整個分批會花費更長的時間,但是更為平滑有規律.
影響并發代碼性能的因素
處理器的數量與結構 是多線程程序的性能首要和關鍵因素.
如果線程數量超過硬件線程數導致算力浪費,那么稱其為 過度訂閱.
而即使你已經考慮了程序中所有運行的線程,你仍然會被其他同時運行的程序影響.
如果兩個線程同時在不同的處理器上運行,它們同時讀取同樣的數據通常不會有問題.
但是,如果其中一個線程修改了數據,這個修改需要花費時間傳播到另一個處理器的緩存.
如果存在一個處理器已經準備好更新一個值,而另一個處理器已經在做了而導致等待改動傳播的情況,那么這種情況稱為 高競爭(high contention).如果處理器很少需要互相等待,則稱為低競爭(low contention).
- 乒乓緩存: 在存在高競爭的循環中,共享數據在各處理器的緩存間來回傳遞,被稱為 乒乓緩存(cache ping-pong).
為了解決乒乓緩存問題,最有效的方式是盡可能避免兩個線程競爭同一個內存位置.
- 偽共享: 處理器緩存的最小單位通常不是一個內存地址,而是一小塊被稱為 緩存行(cache line) 的內存,這些內存塊一般大小32~64字節,因而有可能存在緩存行由線程共享而其中數據不共享導致的復雜所有權問題.這就是 偽共享.
偽共享問題可能導致潛在的乒乓緩存問題,這是我們所不希望看到的.
為了減少偽共享所產生的影響,我們應當在劃分時盡量讓同一個操作相鄰的數據.
并行算法中的異常安全
異常安全是好的C++代碼的一個基本方面.
而并行算法通常比普通算法需要考慮更多關于異常方面的問題.
如果線性算法中的操作拋出異常,該算法只需要確保它能夠處理好以避免資源泄露及破碎的不變量.
而在并行算法程序中,如果一個函數產生大量異常,那么該應用就會被終止.
我們現在來看一個例子:
#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <numeric>
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first, Iterator last, T result)
{
result = std::accumulate(first, last, result());
return;
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<T>results(num_threads);
std::vector<std::thread>threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(accumulate_block<Iterator, T>(),
block_start,
block_end,
std::ref(results[i])
);
block_start = block_end;
}
results[num_threads - 1] = std::accumulate(block_start, last, results[num_threads - 1]);
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
return std::accumulate(results.begin(), results.end(), init);
}
const int NUM = 1234567;
const int TURNS = 100;
int main()
{
for (int j = 0; j < TURNS; j++) {
std::chrono::steady_clock::time_point begin_time;
std::vector<unsigned long>datas;
unsigned long temp_data;
double run_time;
begin_time = std::chrono::steady_clock::now();
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
temp_data = parallel_accumulate(datas.begin(),
datas.end(),
0.0
);
run_time = (std::chrono::steady_clock::now() - begin_time).count() / 1e9;
std::cout << "data sum:" << temp_data << std::endl;
std::cout << "run_time:" << run_time << "s" << std::endl;
std::cout << "turn:" << j + 1 << std::endl;
}
return 0;
}
這個例子為我們提供了一個并行版本的std::accumulate()函數.然而,它并不是異常安全的.
在其中,std::thread,std::accumulate與accumulate_block()都有可能出現異常.
于是,針對上面所述問題,對代碼進行修改:
#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <numeric>
template<typename Iterator,typename T>
struct accumulate_block
{
T operator()(Iterator first, Iterator last)
{
return std::accumulate(first, last, T());
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<T>>futures(num_threads - 1);
std::vector<std::thread>threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<T(Iterator, Iterator)>the_task((accumulate_block<Iterator, T>()));
futures[i] = the_task.get_future();
threads[i] = std::thread(std::move(the_task), block_start, block_end);
block_start = block_end;
}
T last_result = std::accumulate(block_start, last, T());
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
T result = init;
for (unsigned long i = 0; i < (num_threads - 1); i++)
result = result + futures[i].get();
result = result + last_result;
return result;
}
const int NUM = 1234567;
const int TURNS = 100;
int main()
{
for (int j = 0; j < TURNS; j++) {
std::chrono::steady_clock::time_point begin_time;
std::vector<unsigned long>datas;
unsigned long temp_data;
double run_time;
begin_time = std::chrono::steady_clock::now();
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
temp_data = parallel_accumulate(datas.begin(),
datas.end(),
0.0
);
run_time = (std::chrono::steady_clock::now() - begin_time).count() / 1e9;
std::cout << "data sum:" << temp_data << std::endl;
std::cout << "run_time:" << run_time << "s" << std::endl;
std::cout << "turn:" << j + 1 << std::endl;
}
return 0;
}
第一個改變:函數調用accumulate_block直接返回結果,而不是返回存儲地址應用.使用std::packaged_task與std::future來保證異常安全.
第二個改變:使用std::vector<std::future<T>>來管理future.當運行任務時,future將捕獲結果與異常.
如此,如果多于一個工作線程拋出異常,只有一個異常會被傳播.
再考慮到std::thread可能沒被join()的可能,于是再給出一個例子:
#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <numeric>
template<typename Iterator,typename T>
struct accumulate_block
{
T operator()(Iterator first, Iterator last)
{
return std::accumulate(first, last, T());
}
};
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) { return; }
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<T>>futures(num_threads - 1);
std::vector<std::thread>threads(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<T(Iterator, Iterator)>the_task((accumulate_block<Iterator, T>()));
futures[i] = the_task.get_future();
threads[i] = std::thread(std::move(the_task), block_start, block_end);
block_start = block_end;
}
T last_result = std::accumulate(block_start, last, T());
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
T result = init;
for (unsigned long i = 0; i < (num_threads - 1); i++)
result = result + futures[i].get();
result = result + last_result;
return result;
}
const int NUM = 1234567;
const int TURNS = 100;
int main()
{
for (int j = 0; j < TURNS; j++) {
try {
std::chrono::steady_clock::time_point begin_time;
std::vector<unsigned long>datas;
unsigned long temp_data;
double run_time;
begin_time = std::chrono::steady_clock::now();
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
temp_data = parallel_accumulate(datas.begin(),
datas.end(),
0.0
);
run_time = (std::chrono::steady_clock::now() - begin_time).count() / 1e9;
std::cout << "data sum:" << temp_data << std::endl;
std::cout << "run_time:" << run_time << "s" << std::endl;
std::cout << "turn:" << j + 1 << std::endl;
}
catch (...) {
std::cout << "something wrong happened" << std::endl;
}
}
return 0;
}
其中使用join_threads類對線程進行管理,同時補充了try-catch語句塊.
下面是一個通過std::async()實現的并行版本std::accumulate()
#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <numeric>
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
unsigned long const max_chunk_size = 25;
if (length <= max_chunk_size)
return std::accumulate(first, last, init);
else {
Iterator mid_point = first;
std::future<T>first_half_result = std::async(
parallel_accumulate<Iterator, T>,
first,
mid_point,
init
);
T second_half_result = parallel_accumulate(mid_point, last, T());
return first_half_result.get() + second_half_result;
}
}
const int NUM = 25;
const int TURNS = 100;
int main()
{
for (int j = 0; j < TURNS; j++) {
try {
std::chrono::steady_clock::time_point begin_time;
std::vector<unsigned long>datas;
unsigned long temp_data;
double run_time;
begin_time = std::chrono::steady_clock::now();
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
temp_data = parallel_accumulate(datas.begin(),
datas.end(),
0.0
);
run_time = (std::chrono::steady_clock::now() - begin_time).count() / 1e9;
std::cout << "data sum:" << temp_data << std::endl;
std::cout << "run_time:" << run_time << "s" << std::endl;
std::cout << "turn:" << j + 1 << std::endl;
}
catch (...) {
std::cout << "something wrong happened" << std::endl;
}
}
return 0;
}
這個版本通過遞歸方式將工作劃分為多個異步任務執行.
很顯然,這是異常安全的.但是,也同樣很顯然,當NUM數量略微變大,其就發生了棧溢出.這是因為未使用任務棧導致的.
用并發提高響應性
很多現代圖形用戶接口框架是事件驅動的,使用者通過鍵盤輸入或移動鼠標在用戶接口上執行操作,產生一系列事件或消息,而稍后應用就會處理它.
為了確保所有事件與消息被正確處理,通常應用都有下面所示的一個事件循環:
while(true)
{
event_data event=get_event();
if(event.type==quit)
break;
process(event);
}
顯然,API的細節是不同的,但是結構通常是一樣的,等待一個事件,處理它,然后等待下一個事件.
通過用并發分離關注點,可以將長任務放到一個新線程上執行,并且用一個專用的GUI線程來處理時間.
我們把第四章我們所學的內容拿出來重新學習:
#include <iostream>
#include <format>
#include <random>
#include <thread>
#include <mutex>
#include <future>
#include <vector>
#include <deque>
#include <windows.h>
#include "include/graphics.h"
#pragma comment(lib,"graphics64.lib")
#define PRESSED(nVirtKey) ((GetKeyState(nVirtKey) & (1<<(sizeof(SHORT)*8-1))) != 0)
#define TOGGLED(nVirtKey) ((GetKeyState(nVirtKey) & 1) != 0)
typedef struct point_2d {
public:
double x, y, r;
}point_2d;
std::mutex lk;
std::deque<std::packaged_task<void(point_2d)>>tasks;
std::deque<point_2d>datas;
void mainGuiTask()
{
ege::initgraph(640, 480);
ege::setcaption(L"parallel draw");
ege::setbkcolor(ege::BLACK);
ege::setcolor(ege::LIGHTGREEN);
for (; ege::is_run(); delay_fps(60)) {
ege::cleardevice();
std::packaged_task<void(point_2d)>gui_task;
point_2d task_data;
{
std::lock_guard<std::mutex>guard(lk);
if (tasks.empty())
continue;
gui_task = std::move(tasks.front());
tasks.pop_front();
task_data = std::move(datas.front());
datas.pop_front();
}
gui_task(task_data);
}
ege::closegraph();
return;
}
template<typename Func>
std::future<void>postTask(Func func)
{
std::packaged_task<void(point_2d)>post_task(func);
std::future<void>res = post_task.get_future();
std::lock_guard<std::mutex>guard(lk);
tasks.push_back(std::move(post_task));
return res;
}
void mainVKboardTask()
{
std::uniform_real_distribution<double> u_x(0.0, 640.0);
std::uniform_real_distribution<double> u_y(0.0, 480.0);
std::uniform_real_distribution<double> u_r(50.0, 150.0);
std::default_random_engine engine(time(0));
point_2d temp_point;
POINT cursor;
double temp_x, temp_y, temp_r;
for (; ege::is_run();) {
if (PRESSED(32)) {
datas.push_front(temp_point);
postTask(
[](point_2d ilist){ ege::circle(
ilist.x,
ilist.y,
ilist.r
);
return;
}
);
GetCursorPos(&cursor);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
temp_x = u_x(engine);
temp_y = u_y(engine);
temp_r = u_r(engine);
temp_point = { temp_x,temp_y,temp_r };
}
return;
}
int main()
{
std::thread gui_thread(mainGuiTask);
std::thread mouse_thread(mainVKboardTask);
gui_thread.join();
mouse_thread.join();
return 0;
}
這個程序便為我們揭示了一般多線程GUI程序的設計.
在實踐中中設計并發代碼
std::for_each的并行實現
std::for_each在概念上很簡單,輪流在范圍內的每個元素上調用用戶所提供的函數.
為了實現并行版本,只需要將范圍劃分為集合分配到每個線程上處理.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
{
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<void>>futures(num_threads - 1);
std::vector<std::thread>threads(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<void(void)>task(
[=]() {
std::for_each(block_start, block_end, f);
return;
}
);
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task));
block_start = block_end;
}
std::for_each(block_start, last, f);
for (unsigned long i = 0; i < (num_threads - 1); i++)
futures[i].get();
return;
}
std::mutex lk;
void test_func(int data)
{
std::lock_guard<std::mutex>guard(lk);
std::cout << "the data is:" << data << std::endl;
std::cout << "from thread:" << std::this_thread::get_id() << std::endl;
return;
}
const int SIZE = 1000;
int main()
{
std::vector<int>datas;
for (int i = 0; i < SIZE; i++)
datas.push_back(i);
parallel_for_each(datas.begin(), datas.end(), test_func);
return 0;
}
上面便是并行后的std::for_each函數.
然而,通過std::async遞歸劃分任務,其代碼量還可進一步減少.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
{
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
if (length < (2 * min_per_thread))
std::for_each(first, last, f);
else {
Iterator const mid_point = first + length / 2;
std::future<void>first_half = std::async(¶llel_for_each<Iterator, Func>,
first,
mid_point,
f
);
parallel_for_each(mid_point, last, f);
first_half.get();
}
return;
}
std::mutex lk;
void test_func(int data)
{
std::lock_guard<std::mutex>guard(lk);
std::cout << "the data is:" << data << std::endl;
std::cout << "from thread:" << std::this_thread::get_id() << std::endl;
return;
}
const int SIZE = 1000;
int main()
{
std::vector<int>datas;
for (int i = 0; i < SIZE; i++)
datas.push_back(i);
parallel_for_each(datas.begin(), datas.end(), test_func);
return 0;
}
這種通過std::async劃分的方式還是很方便的.
std::find的并行實現
std::find是下一個考慮的有用的算法,因為它是不用處理完所有元素就可以完成的幾個算法之一.
std::find只需要范圍內第一個元素符合搜索準則便不需要再檢查其他元素.
于是,為了滿足這種需求,我們需要在找到符合要求的元素后中斷其他線程.
為了找到中斷條件,一種方法是通過一個原子變量作為一個標志,并在處理完每個元素后檢查這個標志.
而關于如何返回值和傳遞異常有兩個選擇:可以使用future數組或者std::packaged_task來轉移值和異常,然后在主線程中處理返回的結果;或者使用std::promise來從工作線程中直接設置最終結果.
在std::find的并行實現中,std::promise更符合要求.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match)
{
struct find_element
{
void operator()(Iterator begin,
Iterator end,
MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag)
{
try {
for (; (begin != end) && !done_flag->load(); begin++)
if (*begin == match) {
result->set_value(begin);
done_flag->store(true);
return;
}
}
catch (...) {
try {
result->set_exception(std::current_exception());
done_flag->store(true);
}
catch (...) {}
}
}
};
unsigned long const length = std::distance(first, last);
if (!length)
return last;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::promise<Iterator>result;
std::atomic<bool>done_flag(false);
std::vector<std::thread>threads(num_threads - 1);
{
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);
block_start = block_end;
}
find_element()(block_start, last, match, &result, &done_flag);
}
if (!done_flag.load())
return last;
return result.get_future().get();
}
const int NUM = 100;
int main()
{
std::vector<double>datas;
double temp_data;
for (int i = 0; i < 100; i++) {
temp_data = sqrt(i);
datas.push_back(temp_data);
}
auto iter = parallel_find(datas.begin(),
datas.end(),
sqrt(15)
);
std::cout << "try to find:" << *iter << std::endl;
std::cout << "the index is:" << (int)(iter - datas.begin()) << std::endl;
return 0;
}
同樣的,上面的std::find算法也存在一個std::async的版本
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match, std::atomic<bool>& done)
{
try {
unsigned long const length = std::distance(first, last);
unsigned long const min_per_threads = 25;
if (length < (2 * min_per_threads)) {
for (; (first != last) && !done.load(); first++)
if (*first == match) {
done = true;
return first;
}
return last;
}
else {
Iterator const mid_point = first + (length / 2);
std::future<Iterator>async_result = std::async(¶llel_find<Iterator,MatchType>, mid_point, last, match, std::ref(done));
Iterator const direct_result = parallel_find(first, mid_point, match, done);
return (direct_result == mid_point) ? async_result.get() : direct_result;
}
}
catch (...) {
done = true;
throw;
}
}
const int NUM = 100;
int main()
{
std::vector<double>datas;
std::atomic<bool>done;
double temp_data;
for (int i = 0; i < 100; i++) {
temp_data = sqrt(i);
datas.push_back(temp_data);
}
auto iter = parallel_find(datas.begin(),
datas.end(),
sqrt(15),
done
);
std::cout << "try to find:" << *iter << std::endl;
std::cout << "the index is:" << (int)(iter - datas.begin()) << std::endl;
return 0;
}
在使用std::async版本的參數中,多了一個std::atomic<bool>的flag.
這是因為在遞歸劃分任務時,每一個線程都需要共享這個flag.
std::partial_sum的并行實現
std::partial_sum計算了一個范圍內的總和.因此每個元素都被這個元素及它先前元素的和所代替.
一種用來決定范圍內部分和的方法就是計算獨立塊的部分和,然后將第一個塊中計算得到的最后一個元素的值加到下一個塊的元素,并以此類推.
同原始劃分成塊一樣,也可以并行加上前一個塊的部分和.如果每個塊的最后一個元素首先被更新,那么當第二個線程更新下一個塊的時候,第一個線程可以更新這個塊中剩下的元素,并以此類推.這與我們前面所學的管道相似.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename ValueType>
void parallel_partial_sum(Iterator first, Iterator last,ValueType temp)
{
unsigned long const length = std::distance(first, last);
if (!length)
return;
struct process_chunk
{
void operator()(Iterator begin,
Iterator last,
std::future<ValueType>* previous_end_value,
std::promise<ValueType>* end_value)
{
try {
Iterator end = last;
++end;
std::partial_sum(begin, end, begin);
if (previous_end_value) {
ValueType& addend = previous_end_value->get();
*last += addend;
if (end_value)
end_value->set_value(*last);
std::for_each(begin,
last,
[addend]( ValueType& item) {item += addend; }
);
}
else if (end_value)
end_value->set_value(*last);
}
catch (...) {
if (end_value)
end_value->set_exception(std::current_exception());
else
throw;
}
}
};
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::thread>threads(num_threads - 1);
std::vector<std::promise<ValueType>>end_values(num_threads - 1);
std::vector<std::future<ValueType>>previous_end_values;
previous_end_values.reserve(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_last = block_start;
std::advance(block_last, block_size - 1);
threads[i] = std::thread(process_chunk(), block_start, block_last, (i != 0) ? &previous_end_values[i - 1] : 0, &end_values);
block_start = block_last;
block_start++;
previous_end_values.push_back(end_values[i].get_future());
}
Iterator final_element = block_start;
std::advance(final_element, std::distance(block_start, last) - 1);
process_chunk()(block_start, final_element, (num_threads > 1) ? &previous_end_values.back() : 0, 0);
return ;
}
const int NUM = 100;
int main()
{
std::vector<unsigned long>datas;
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
parallel_partial_sum(datas.begin(), datas.end(),datas[0]);
return 0;
}
下面是通過成對更新的partial_sum的并行實現.
由于教材所提供的std::partial_sum的兩個并行例子都存在較大問題,此處僅展示教材代碼.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
struct barrier
{
std::atomic<unsigned>count, spaces, generation;
barrier(unsigned count_) :
count(count_), spaces(count_), generation(0) { return; }
void wait()
{
unsigned const gen = generation.load();
if (!--spaces) {
spaces = count.load();
generation++;
}
else {
while (generation.load() == gen)
std::this_thread::yield();
}
return;
}
void done_waiting()
{
count--;
if (!--spaces) {
spaces = count.load();
generation++;
}
return;
}
};
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator>
void parallel_partial_sum(Iterator first, Iterator last)
{
typedef typename Iterator::value_type value_type;
struct process_element
{
void operator()(Iterator first,
Iterator last,
std::vector<value_type>& buffer,
unsigned i,
barrier& b)
{
value_type& ith_element = *(first + i);
bool update_source = false;
for (unsigned step = 0, stride = 1; stride <= i; step++, stride *= 2) {
value_type const& source = (step % 2) ? buffer[i] : ith_element;
value_type& dest = (step % 2) ? ith_element : buffer[i];
value_type const& buffer[i - stride] : *(first + i - stride);
dest = source + addend;
update_source = !(step % 2);
b.wait();
}
if (update_source)
ith_element = buffer[i];
b.done_waiting();
}
};
unsigned long const length = std::distance(first, last);
if (length <= 1)
return;
std::vector<value_type>buffer(length);
barrier b(length);
std::vector<std::thread>threads(length - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (length - 1); i++)
threads[i] = std::thread(process_element(), first, last, std::ref(buffer), i, std::ref(b));
process_element()((first, last, buffer, length - 1, b));
return;
}
const int NUM = 10000;
int main()
{
std::vector<unsigned long>datas;
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
parallel_partial_sum(datas.begin(), datas.end());
for (auto iter : datas)
std::cout << iter << std::endl;
return 0;
}

浙公網安備 33010602011771號