C++多線程 第九章 高級線程管理
第九章 高級線程管理
注意:本章內容由于教材本身問題,例子存在較大問題.請自行在理解基礎上重新設計.
在大多數系統上面,為每個可以與其他任務并行執行的任務分配一個單獨的線程是不切實際的.
但線程池允許盡量充分利用硬件提供的并發性.
在線程池幫助下,可以被并發執行的任務被提交到線程池中,在線程池中被放入一個等待隊列.
每個任務都會被某個工作線程從等待隊列中取出來執行.
工作線程的任務就是當空閑時從等待隊列中取出任務來執行.
最簡單的線程池
線程池最簡單的形式是一個含有固定數量工作線程來處理任務的對象.
當有任務要處理的時候,調用一個函數將任務放到等待隊列中.
每個工作線程都是從該隊列中取出任務,執行完任務后繼續從等待隊列取出更多任務來處理.
下面是一個最簡單的線程池的實現:
#include <iostream>
#include <thread>
#include <future>
#include <atomic>
#include <vector>
#include <queue>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
class thread_pool
{
std::atomic_bool done;
std::queue<std::function<void()>>work_queue;
std::vector<std::thread>threads;
join_threads joiner;
void worker_thread()
{
while (!done) {
std::function<void()>task=work_queue.front();
work_queue.pop();
if (task)
task();
else
std::this_thread::yield();
}
return;
}
public:
thread_pool() :
done(false), joiner(threads)
{
unsigned const thread_count = std::thread::hardware_concurrency();
try {
for (unsigned i = 0; i < thread_count; i++)
threads.push_back(std::thread(&thread_pool::worker_thread, this));
}
catch (...) {
done = true;
throw;
}
return;
}
~thread_pool()
{
done = true;
return;
}
template<typename FunctionType>
void submit(FunctionType f)
{
work_queue.push(std::function<void()>(f));
return;
}
};
為了使用線程池,只需要將需要執行的任務submit至任務隊列即可.
在許多情況下,這樣一個簡單的線程池已經足夠使用.
但是在這種情況下很可能會引發死鎖等問題.
在簡單情況下,使用std::async分治可能會是更好的解決方法.
等待線程池的任務
與一般并行程序不同,使用線程池之后,需要等待提交到線程池的任務結束,而不是等待工作線程.
一般并行程序基于std::async實現,而線程池中必須人為使用條件變量來實現.
通過將復雜度移到線程中,可以直接等待任務的結束.
可以讓submit()函數返回一個任務句柄,利用這個句柄可以等待任務結束.
這個任務句柄包裝了條件變量或者其他用來簡化線程池使用的代碼.
下面是如此的一個線程池的代碼:
#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <atomic>
#include <vector>
#include <queue>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
class function_wrapper
{
struct impl_base{
virtual void call() = NULL;
virtual ~impl_base() { return; }
};
std::unique_ptr<impl_base>impl;
template<typename F>
struct impl_type:impl_base
{
F f;
impl_type(F&& f_) :
f(std::move(f_)) { return; }
void call()
{
f();
return;
}
};
public:
template<typename F>
function_wrapper(F&& f) :
impl(new impl_type<F>(std::move(f))) { return; }
function_wrapper() = default;
function_wrapper(function_wrapper&& other) :
impl(std::move(other.impl)) { return; }
void operator()()
{
impl->call();
return;
}
function_wrapper& operator= (function_wrapper&& other)
{
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
class thread_pool
{
std::queue<function_wrapper>work_queue;
std::vector<std::thread>threads;
join_threads joiner;
void worker_thread()
{
while (!done) {
if (!work_queue.empty()) {
function_wrapper task = std::move(work_queue.front());
work_queue.pop();
task();
}
else
if(work_queue.empty())
std::this_thread::yield();
}
}
public:
std::atomic_bool done;
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type>
submit(FunctionType f)
{
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()>task(std::move(f));
std::future<result_type>res(task.get_future());
work_queue.push(std::move(task));
return res;
}
thread_pool() :
done(false), joiner(threads)
{
unsigned const thread_count = std::thread::hardware_concurrency();
try {
for (unsigned i = 0; i < thread_count; i++)
threads.push_back(std::thread(&thread_pool::worker_thread, this));
}
catch (...) {
done = true;
throw;
}
return;
}
~thread_pool()
{
done = true;
return;
}
};
為了演示如何使用該線程池,下面通過一個parallel accumulate來實現.
#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <atomic>
#include <numeric>
#include <vector>
#include <queue>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
class function_wrapper
{
struct impl_base{
virtual void call() = NULL;
virtual ~impl_base() { return; }
};
std::unique_ptr<impl_base>impl;
template<typename F>
struct impl_type:impl_base
{
F f;
impl_type(F&& f_) :
f(std::move(f_)) { return; }
void call()
{
f();
return;
}
};
public:
template<typename F>
function_wrapper(F&& f) :
impl(new impl_type<F>(std::move(f))) { return; }
function_wrapper() = default;
function_wrapper(function_wrapper&& other) :
impl(std::move(other.impl)) { return; }
void operator()()
{
impl->call();
return;
}
function_wrapper& operator= (function_wrapper&& other)
{
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
class thread_pool
{
std::queue<function_wrapper>work_queue;
std::vector<std::thread>threads;
join_threads joiner;
void worker_thread()
{
while (!done) {
if (!work_queue.empty()) {
function_wrapper task = std::move(work_queue.front());
work_queue.pop();
task();
}
else
if(work_queue.empty())
std::this_thread::yield();
}
}
public:
std::atomic_bool done;
virtual void run_pending_task();
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type>
submit(FunctionType f)
{
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()>task(std::move(f));
std::future<result_type>res(task.get_future());
work_queue.push(std::move(task));
return res;
}
thread_pool() :
done(false), joiner(threads)
{
unsigned const thread_count = std::thread::hardware_concurrency();
try {
for (unsigned i = 0; i < thread_count; i++)
threads.push_back(std::thread(&thread_pool::worker_thread, this));
}
catch (...) {
done = true;
throw;
}
return;
}
~thread_pool()
{
done = true;
return;
}
};
void thread_pool::run_pending_task()
{
if (!work_queue.empty()) {
function_wrapper task = std::move(work_queue.front());
work_queue.pop();
task();
}
else
std::this_thread::yield();
return;
}
template<typename Iterator, typename T>
struct accumulate_block
{
T operator()(Iterator first, Iterator last)
{
return std::accumulate(first, last, T());
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const block_size = 25;
unsigned long const num_blocks = (length + block_size - 1) / block_size;
std::vector<std::future<T>>futures(num_blocks - 1);
thread_pool pool;
Iterator block_start = first;
for (unsigned long i = 0; i < (num_blocks - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
auto block_accumulate = [&]() -> T {
return std::accumulate(block_start, block_end, T());
};
futures[i] = pool.submit(block_accumulate);
block_start = block_end;
}
T last_result = accumulate_block<Iterator, T>()(block_start, last);
T result = init;
for (unsigned long i = 0; i < (num_blocks - 1); i++)
result = result + futures[i].get();
result = result + last_result;
return result;
}
int main()
{
std::vector<double>datas;
double result;
try {
for (int i = 0; i < 10; i++)
datas.push_back(sqrt(i));
for (auto iter : datas)
std::cout << iter << " " << std::ends;
std::cout << std::endl;
result = parallel_accumulate(datas.begin(), datas.end(), 0.0);
std::cout << result << std::endl;
}
catch (...) {
std::cout << "wow,something wrong." << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return 0;
}
等待其他任務的任務
當使用線程池來管理任務列表及關聯的線程時,不必通過顯示訪問任務列表來完成.需要做的是修改線程池結構以自動完成這個.
最簡單的訪問來完成這個功能的是在線程池中增加一個新的函數來執行隊列中的任務以及自己管理循環.
高級線程池的實現可能會是在等待函數添加邏輯來處理這種情形,有可能是通過在等待的任務賦予優先級來解決.
下面是一個基于線程池的快速排序,這是教材所提供的內容.
值得注意的是,它與前面一章的內容一樣出現了內部編譯器錯誤.(教材這幾章的錯誤有些多了)
#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <atomic>
#include <type_traits>
#include <numeric>
#include <algorithm>
#include <vector>
#include <list>
#include <queue>
class join_threads {
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
class function_wrapper {
struct impl_base {
virtual void call() = NULL;
virtual ~impl_base() { return; }
};
std::unique_ptr<impl_base>impl;
template<typename F>
struct impl_type : public impl_base {
F f;
impl_type(F&& f_) :
f(std::move(f_)) {
return;
}
void call()
{
f();
return;
}
};
public:
template<typename F>
function_wrapper(F&& f) :
impl(new impl_type<F>(std::move(f))) {
return;
}
function_wrapper() = default;
function_wrapper(function_wrapper&& other) :
impl(std::move(other.impl)) {
return;
}
void operator()()
{
impl->call();
return;
}
function_wrapper& operator= (function_wrapper&& other)
{
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
class thread_pool {
std::queue<function_wrapper>work_queue;
std::vector<std::thread>threads;
join_threads joiner;
void worker_thread()
{
while (!done) {
if (!work_queue.empty()) {
function_wrapper task = std::move(work_queue.front());
work_queue.pop();
task();
}
else
if (work_queue.empty())
std::this_thread::yield();
}
}
public:
std::atomic_bool done;
virtual void run_pending_task();
template<typename FunctionType>
std::future<typename std::invoke_result<FunctionType()>::type>
submit(FunctionType f)
{
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()>task(std::move(f));
std::future<result_type>res(task.get_future());
work_queue.push(std::move(task));
return res;
}
thread_pool() :
done(false), joiner(threads)
{
unsigned const thread_count = std::thread::hardware_concurrency();
try {
for (unsigned i = 0; i < thread_count; i++)
threads.push_back(std::thread(&thread_pool::worker_thread, this));
}
catch (...) {
done = true;
throw;
}
return;
}
~thread_pool()
{
done = true;
return;
}
};
void thread_pool::run_pending_task()
{
if (!work_queue.empty()) {
function_wrapper task = std::move(work_queue.front());
work_queue.pop();
task();
}
else
std::this_thread::yield();
return;
}
template<typename T>
struct sorter {
thread_pool pool;
std::list<T> do_sort(std::list<T>& chunk_data)
{
if (chunk_data.empty())
return chunk_data;
std::list<T> result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();
typename std::list<T>::iterator divide_point = std::partition(
chunk_data.begin(),
chunk_data.end(),
[&](T const& val) {
return val < partition_val;
}
);
std::list<T> new_lower_chunk;
new_lower_chunk.splice(new_lower_chunk.end(), chunk_data, chunk_data.begin(), divide_point);
std::future<std::list<T>> new_lower = pool.submit(
[this, new_lower_chunk = std::move(new_lower_chunk)]() {
return do_sort(new_lower_chunk);
}
);
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
new_lower.wait();
result.splice(result.begin(), new_lower.get());
return result;
}
std::list<T> parallel_quick_sort(std::list<T> input)
{
if (input.empty())
return input;
sorter<T> s;
return s.do_sort(input);
}
};
int main()
{
std::list<double>datas;
sorter<double> temp_sorter;
double temp_data;
try {
for (int i = 0; i < 10; i++) {
std::cin >> temp_data;
datas.push_back(temp_data);
}
std::cout << "before arrange" << std::endl;
for (auto iter : datas)
std::cout << iter << " " << std::ends;
std::cout << std::endl;
temp_sorter.parallel_quick_sort(datas);
std::cout << "after arranged" << std::endl;
for (auto iter : datas)
std::cout << iter << " " << std::ends;
std::cout << std::endl;
}
catch (...) {
std::cout << "wow,something wrong." << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return 0;
}
避免工作隊列上的競爭
每次線程調用submit()時,它向單個共享工作隊列添加一個新的元素.
隨著處理器樹木的增加,工作隊列的競爭會越來越多,這回極大地降低性能.及
即使使用無鎖隊列,乒乓緩存仍然會導致非常耗時.
避免乒乓緩存的一個方法是在每個線程都使用一個單獨的工作隊列.
每個線程將新的任務添加到它自己的隊列中,只有當自己隊列為空的時候才從全局的工作隊列中取任務.
下面便是經過了如此修改后的線程池:
class join_threads {
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
class function_wrapper {
struct impl_base {
virtual void call() = NULL;
virtual ~impl_base() { return; }
};
std::unique_ptr<impl_base>impl;
template<typename F>
struct impl_type : public impl_base {
F f;
impl_type(F&& f_) :
f(std::move(f_)) {
return;
}
void call()
{
f();
return;
}
};
public:
template<typename F>
function_wrapper(F&& f) :
impl(new impl_type<F>(std::move(f))) {
return;
}
function_wrapper() = default;
function_wrapper(function_wrapper&& other) :
impl(std::move(other.impl)) {
return;
}
void operator()()
{
impl->call();
return;
}
function_wrapper& operator= (function_wrapper&& other)
{
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
class thread_pool
{
std::queue<function_wrapper>pool_work_queue;
typedef std::queue<function_wrapper>local_queue_type;
static thread_local std::unique_ptr<local_queue_type>local_work_queue;
std::queue<function_wrapper>work_queue;
std::vector<std::thread>threads;
join_threads joiner;
void worker_thread()
{
local_work_queue.reset(new local_queue_type);
while (!done)
run_pending_task();
}
public:
std::atomic_bool done;
template<typename FunctionType>
std::future<typename std::invoke_result<FunctionType()>::type>submit(FunctionType f)
{
typedef typename std::invoke_result<FunctionType()>::type result_type;
std::packaged_task<result_type()>task(f);
std::future<result_type>res(task.get_future());
if (local_work_queue)
local_work_queue->push(std::move(task));
else
pool_work_queue.push(std::move(task));
return res;
}
void run_pending_task()
{
function_wrapper task;
if (local_work_queue && !local_work_queue->empty()) {
task = std::move(local_work_queue->front());
local_work_queue->pop();
task();
}
else if (!pool_work_queue.empty()) {
task = std::move(pool_work_queue.front());
pool_work_queue.pop();
task();
}
else
std::this_thread::yield();
}
thread_pool() :
done(false), joiner(threads)
{
unsigned const thread_count = std::thread::hardware_concurrency();
try {
for (unsigned i = 0; i < thread_count; i++)
threads.push_back(std::thread(&thread_pool::worker_thread, this));
}
catch (...) {
done = true;
throw;
}
return;
}
~thread_pool()
{
done = true;
return;
}
};
使用本地隊列可以很好地降低對全局隊列的競爭,但是任務分布不均衡可能導致效率降低.
這樣就引出了工作竊取.應當允許線程在其他私有隊列中竊取工作.
工作竊取
為了允許一個空間的線程執行其他線程上的任務,每個工作線程的私有隊列必須在run_pending_task中竊取任務的時候可以被訪問到.
這要求每個工作線程將自己的私有任務隊列向線程池注冊,或每個線程都被線程池分配一個工作隊列.
此外,必須保證工作隊列中的數據被適當的同步與保護.
下面是一個支持工作竊取的線程池例子,與上面的例子一樣,其由教材提供并且存在大量錯誤.
#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <atomic>
#include <type_traits>
#include <numeric>
#include <functional>
#include <algorithm>
#include <vector>
#include <list>
#include <queue>
class join_threads {
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
class function_wrapper {
struct impl_base {
virtual void call() = NULL;
virtual ~impl_base() { return; }
};
std::unique_ptr<impl_base>impl;
template<typename F>
struct impl_type : public impl_base {
F f;
impl_type(F&& f_) :
f(std::move(f_)) {
return;
}
void call()
{
f();
return;
}
};
public:
template<typename F>
function_wrapper(F&& f) :
impl(new impl_type<F>(std::move(f))) {
return;
}
function_wrapper() = default;
function_wrapper(function_wrapper&& other) :
impl(std::move(other.impl)) {
return;
}
void operator()()
{
impl->call();
return;
}
function_wrapper& operator= (function_wrapper&& other)
{
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
template<typename T>
class thread_safe_queue
{
private:
struct node {
std::shared_ptr<T>data;
std::unique_ptr<node>next;
};
std::mutex head_mutex;
std::unique_ptr<node>head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;
node* get_tail()
{
std::lock_guard<std::mutex>tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node>pop_head()
{
std::unique_ptr<node>old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
std::unique_ptr<node>try_pop_head()
{
std::lock_guard<std::mutex>head_lock(head_mutex);
if (head.get() == get_tail())
return std::unique_ptr<node>();
return pop_head();
}
std::unique_ptr<node>try_pop_head(T& value)
{
std::lock_guard<std::mutex>head_lock(head_mutex);
if (head.get() == get_tail())
return std::unique_ptr<node>();
value = std::move(*head->data);
return pop_head();
}
public:
thread_safe_queue() :
head(new node), tail(head.get()) { return; }
thread_safe_queue(const thread_safe_queue& other) = delete;
thread_safe_queue& operator=(const thread_safe_queue& other) = delete;
void push(T new_value)
{
std::shared_ptr<T>new_data(std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node>p(new node);
{
std::lock_guard<std::mutex>tail_lock(tail_mutex);
tail->data = new_data;
node* const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
}
data_cond.notify_one();
return;
}
std::shared_ptr<T>try_pop()
{
std::unique_ptr<node>old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr<T>();
}
bool try_pop(T& value)
{
std::unique_ptr<node>const old_head = try_pop_head(value);
return old_head ? true : false;
}
void empty()
{
std::lock_guard<std::mutex>head_lock(head_mutex);
return (head.get() == get_tail());
}
};
class work_stealing_queue
{
private:
typedef function_wrapper data_type;
std::deque<data_type>the_queue;
mutable std::mutex the_mutex;
public:
work_stealing_queue() { return; }
work_stealing_queue(const work_stealing_queue& other) = delete;
work_stealing_queue& operator=(const work_stealing_queue& other) = delete;
void push(data_type data)
{
std::lock_guard<std::mutex>lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool empty()const
{
std::lock_guard<std::mutex>lock(the_mutex);
return the_queue.empty();
}
bool try_pop(data_type& res)
{
std::lock_guard<std::mutex>lock(the_mutex);
if (the_queue.empty())
return false;
res = std::move(the_queue.front());
the_queue.pop_front();
return true;
}
bool try_steal(data_type& res)
{
std::lock_guard<std::mutex>lock(the_mutex);
if (the_queue.empty())
return false;
res = std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};
class thread_pool
{
typedef function_wrapper task_type;
std::atomic_bool done;
thread_safe_queue<task_type>pool_work_queue;
std::vector<std::unique_ptr<work_stealing_queue>>queues;
std::vector<std::thread>threads;
join_threads joiner;
static thread_local work_stealing_queue* local_work_queue;
static thread_local unsigned my_index;
void worker_thread(unsigned my_index_)
{
my_index = my_index_;
local_work_queue = queues[my_index].get();
while (!done)
run_pending_task();
return;
}
bool pop_task_from_local_queue(task_type& task)
{
return local_work_queue && local_work_queue->try_pop(task);
}
bool pop_task_from_pool_queue(task_type& task)
{
return pool_work_queue.try_pop(task);
}
bool pop_task_from_other_thread_queue(task_type& task)
{
for (unsigned i = 0; i < queues.size(); i++) {
unsigned const index = (my_index + i + 1) % queues.size();
if (queues[index]->try_steal(task))
return true;
return false;
}
}
public:
thread_pool() :
done(false), joiner(threads)
{
unsigned const thread_count = std::thread::hardware_concurrency();
try {
for (unsigned i = 0; i < thread_count; i++) {
queues.push_back(std::unique_ptr<work_stealing_queue>(new work_stealing_queue));
threads.push_back(std::thread(&thread_pool::worker_thread, this, i));
}
}
catch (...) {
done = true;
throw;
}
}
~thread_pool()
{
done = true;
}
template<typename FunctionType>
std::future<typename std::invoke_result<FunctionType()>::type>submit(FunctionType f)
{
typedef typename std::invoke_result<FunctionType()>::type result_type;
std::packaged_task<result_type()>task(f);
std::future<result_type>res(task.get_future());
if (local_work_queue)
local_work_queue->push(std::move(task));
else
pool_work_queue.push(std::move(task));
return res;
}
void run_pending_task()
{
task_type task;
if (pop_task_from_local_queue(task) ||
pop_task_from_pool_queue(task) ||
pop_task_from_other_thread_queue(task))
task();
else
std::this_thread::yield();
return;
}
};
void test_func()
{
std::cout << "this is from:" << std::this_thread::get_id() << std::endl;
return;
}
int main()
{
thread_pool pool;
try {
for (int i = 0; i < 10; i++)
pool.submit([&]() { test_func(); });
}
catch (const std::exception& e) {
std::cerr << "An exception occurred: " << e.what() << '\n';
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return 0;
}
到此,不得不說這本書這一章簡直處處都是災難.
也不知道本書第二版中是否將這些錯誤修正.
中斷線程
在許多場景中,向一個長時間運行的線程發出一個信號告訴線程停止執行是一個可能的行為.
這有可能是因為線程是一個工作線程,而線程池正在被銷毀,或者是因為線程正在執行的工作被用戶取消了.
不管是什么原因,其基本思想是一樣的,你需要從一個線程發送一個信號告訴另一個線程應該停止運行.
而且,你同樣需要讓線程適當地結束而不是簡單地退出而造成線程池不一致的狀態.
啟動和中斷另一個線程
通過flag的方式,你可以設計指定的中斷點.這是通過std::future實現的.
class interrupt_flag
{
public:
void set();
bool is_set()const;
};
thread_local interrupt_flag this_thread_interrupt_flag;
class interruptible_thread
{
std::thread internal_thread;
interrupt_flag* flag;
public:
template<typename FunctionType>
interruptible_thread(FunctionType f)
{
std::promise<interrupt_flag*>p;
internal_thread = std::thread([f, &p] {
p.set_value(&this_thread_interrupt_flag),
f();
}
);
flag = p.get_future().get();
}
void iterrupt()
{
if (flag)
flag->set();
return;
}
};
檢測一個線程是否被中斷
下面給出了一段偽代碼,來演示中斷點的設計.
void interruption_point()
{
if (this_thread_interrupt_flag.is_set())
throw this_thread_interrupted();
}
中斷等待條件變量
現在可以顯式調用interruption_point()來檢測中斷了.
然而,為了一個阻塞等待的時間,需要設計一個新的函數interruptible_wait().
這個函數關于條件變量的實現如下:
class interrupt_flag
{
std::atomic<bool>flag;
std::condition_variable* thread_cond;
std::mutex set_clear_mutex;
public:
void set()
{
flag.store(true, std::memory_order_relaxed);
std::lock_guard<std::mutex>lk(set_clear_mutex);
if (thread_cond)
thread_cond->notify_all();
return;
}
bool is_set()const
{
return flag.load(std::memory_order_relaxed);
}
void set_condition_variable(std::condition_variable& cv)
{
std::lock_guard<std::mutex>lk(set_clear_mutex);
thread_cond = &cv;
return;
}
void clear_condition_variable()
{
std::lock_guard<std::mutex>lk(set_clear_mutex);
thread_cond = 0;
return;
}
struct clear_cv_on_destruct
{
~clear_cv_on_destruct()
{
this_thread_interrupt_flag.clear_condition_variable();
return;
}
};
};
thread_local interrupt_flag this_thread_interrupt_flag;
void interruption_point()
{
if (this_thread_interrupt_flag.is_set())
throw "this_thread_interrupted()";
}
void interruptible_wait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
interruption_point();
cv.wait_for(lk, std::chrono::milliseconds(1));
interruption_point();
return;
}
template<typename Predicate>
void interruptible_wait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk, Predicate pred)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
while (!this_thread_interrupt_flag.is_set() && !pred())
cv.wait_for(lk, std::chrono::milliseconds(1));
interruption_point();
return;
}
中斷在std::condition_variable_any上的等待
std::conditon_variable可以與任何鎖類型配合工作,這使得其更為靈活.
下面是使用condtion_variable的版本:
class interrupt_flag
{
std::atomic<bool>flag;
std::condition_variable* thread_cond;
std::condition_variable_any* thread_cond_any;
std::mutex set_clear_mutex;
public:
interrupt_flag() :
thread_cond(0), thread_cond_any(0) {
return;
}
void set()
{
flag.store(true, std::memory_order_relaxed);
std::lock_guard<std::mutex>lk(set_clear_mutex);
if (thread_cond)
thread_cond->notify_all();
else if (thread_cond_any)
thread_cond_any->notify_all();
return;
}
bool is_set()const
{
return flag.load(std::memory_order_relaxed);
}
void set_condition_variable(std::condition_variable& cv)
{
std::lock_guard<std::mutex>lk(set_clear_mutex);
thread_cond = &cv;
return;
}
void clear_condition_variable()
{
std::lock_guard<std::mutex>lk(set_clear_mutex);
thread_cond = 0;
return;
}
struct clear_cv_on_destruct
{
~clear_cv_on_destruct()
{
this_thread_interrupt_flag.clear_condition_variable();
return;
}
};
template<typename Lockable>
void wait(std::condition_variable_any& cv, Lockable& lk)
{
struct custom_lock
{
interrupt_flag* self;
Lockable& lk;
custom_lock(interrupt_flag* self_, std::condition_variable_any& cond, Lockable& lk_) :
self(self_), lk(lk_)
{
self->set_clear_mutex.lock();
self->thread_cond_any = &cond;
return;
}
~custom_lock()
{
self->thread_cond_any = 0;
self->set_clear_mutex.unlock();
return;
}
void unlock()
{
lk.unlock();
self->set_clear_mutex.unlock();
return;
}
void lock()
{
std::lock(self->set_clear_mutex, lk);
return;
}
};
custom_lock cl(this, cv, lk);
interruption_point();
cv.wait(cl);
interruption_point();
}
};
thread_local interrupt_flag this_thread_interrupt_flag;
template<typename Lockable>
void interruptible_wait(std::condition_variable_any& cv, Lockable& lk)
{
this_thread_interrupt_flag.wait(cv, lk);
return;
}
處理中斷
從被中斷的線程的角度來看,一個中斷只是一個thread_interrupted異常.
這可以像其他異常一樣進行處理.典型的操作是可以使用一個標準的catch塊捕獲它.
為了避免被迫記得每個你傳遞到interruptible_thread中的函數中放一個catch.你可以將此catch快放到你初始化interrupt_flag的包裝器中.
就像是:
internal_thread=std::thread([f,&p]{
p.set_value(&this_thread_interrupt_flag);
try{
f();
}
catch(thread_interrupted const&){
}
return;
}
);
終于,這噩夢一般的第九章到此結束,本書最核心的內容也到此結束.C++多線程系列收工.

浙公網安備 33010602011771號