<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      C++一些新的特性的理解(二)

      1 C++11多線程thread

      重點:

      • join和detach的使用場景
      • thread構造函數參數
      • 綁定c函數
      • 綁定類函數
      • 線程封裝基礎類
      • 互斥鎖mutex
      • condition notify、wait
      • lock_guard/unique_lock
      • function和bind
      • 異步future/packaged_task/promise
      • 線程池的實現,線程池涉及的技術點

      1.1 thread

      std::thread 在 #include 頭文件中聲明,因此使用 std::thread 時需要包含 #include 頭文件。

      1.1.1語法

      構造函數

      • 默認構造函數
      //創建一個空的thread執行對象。
      thread() _NOEXCEPT
      { // construct with no thread
      _Thr_set_null(_Thr);
      }
      
      • 初始化構造函數
      //創建std::thread執行對象,該thread對象可被joinable,新產生的線程會調用threadFun函數,該函
      數的參數由 args 給出
      template<class Fn, class... Args>
      explicit thread(Fn&& fn, Args&&... args);
      
      • 拷貝構造函數
      // 拷貝構造函數(被禁用),意味著 thread 不可被拷貝構造。
      thread(const thread&) = delete;
      
      • Move構造函數
      /move 構造函數,調用成功之后 x 不代表任何 thread 執行對象。
      注意:可被 joinable 的 thread 對象必須在他們銷毀之前被主線程 join 或者將其設置為
      detached。
      thread(thread&& x)noexcept
      
      #include<iostream>
      #include<thread>
      using namespace std;
      void threadFun(int& a) // 引用傳遞
      {
      	cout << "this is thread fun !" << endl;
      	cout << " a = " << (a += 10) << endl;
      }
      int main()
      {
      	int x = 10;
      	thread t1(threadFun, std::ref(x));
      	thread t2(std::move(t1)); // t1 線程失去所有權
      	thread t3;
      	t3 = std::move(t2); // t2 線程失去所有權
      	//t1.join(); // ?
      	t3.join();
      	cout << "Main End " << "x = " << x << endl;
      	return 0;
      }
      

      主要函數

      1.1.2 簡單線程的創建

      1. 傳入0個值
      2. 傳入2個值
      3. 傳入引用
      4. 傳入類函數
      5. detach
      6. move
      #include<iostream>
      #include<thread>
      using namespace std;
      //1 傳入0個值
      void func1()
      {
      	cout << "func1 into" << endl;
      }
      //2 傳入2個值
      void func2(int a,int b)
      {
      	cout << "func2 a + b =" << a + b << endl;
      }
      
      //3 傳入引用
      void func3(int &c)
      {
      	cout << "func3 c = " << &c << endl;
      }
      
      class A
      {
      public:
      	//4.傳入類函數
      	void func4(int a)
      	{
      		cout << "thread:" << _name << " ,func a=" << a << endl;
      	}
      	void setName(string name)
      	{
      		_name = name;
      	}
      	void displayName()
      	{
      		cout << "this:" << this << ",name:" << _name << endl;
      	}
      	void play()
      	{
      		std::cout << "play call!" << std::endl;
      	}
      private:
      	string _name;
      };
      //5.detach
      void func5()
      {
      	cout << "func5 into sleep" << endl;
      	std::this_thread::sleep_for(std::chrono::seconds(1));
      	cout << "func 5 leave" << endl;
      }
      
      // 6. move
      void func6()
      {
      	cout << "this is func6 !" << endl;
      }
      
      int main()
      {
      	//1.傳入0個值
      	cout << "\n\n main1-----------------------------------\n";
      	std::thread t1(&func1);
      	t1.join();
      	//2.傳入2個值
      	cout << "\n\n main2-----------------------------------\n";
      	int a = 10;
      	int b = 20;
      	std::thread t2(&func2, a, b);
      	t2.join();
      	//3.傳入引用
      	cout << "\n\n main3-----------------------------------\n";
      	int c = 10;
      	std::thread t3(func3, std::ref(c));
      	t3.join();
      	cout << "main c =" << &c << ", " << c << endl;
      	//4.傳入類函數
      	cout << "\n\n main4-----------------------------------\n";
      	std::unique_ptr<A> a4_ptr = make_unique<A>();
      	a4_ptr->setName("daren");
      	std::thread t4(&A::func4, a4_ptr.get(), 10);
      	t4.join();
      	//5.detach
      	cout << "\n\n main5-----------------------------------\n";
      	std::thread t5(&func5);
      	t5.detach();
      	cout << "\n main5 end\n";
      	// 6.move
      	cout << "\n\n main6--------------------------\n";
      	int x = 10;
      	thread t6_1(func6);
      	thread t6_2(std::move(t6_1)); // t6_1 線程失去所有權
      
      	t6_2.join();
      
      
      	return 0;
      }
      

      1.1.3 線程封裝

      zero_thread.h

      #ifndef ZERO_THREAD_H
      #define ZERO_THREAD_H
      #include <thread>
      class ZERO_Thread	
      {
      public:
      ZERO_Thread(); // 構造函數
      virtual ~ZERO_Thread(); // 析構函數
      bool start();
      void stop();
      bool isAlive() const; // 線程是否存活.
      std::thread::id id() { return th_->get_id(); }
      std::thread* getThread() { return th_; }
      void join(); // 等待當前線程結束, 不能在當前線程上調用
      void detach(); //能在當前線程上調用
      static size_t CURRENT_THREADID();
      protected:
      void threadEntry();
      virtual void run() = 0; // 運行
      protected:
      bool running_; //是否在運行
      std::thread* th_;
      };
      #endif // ZERO_THREAD_H
      

      zero_thread.cpp

      #include "ZERO_Thread.h"
      #include "zero_thread.h"
      #include <sstream>
      #include <iostream>
      #include <exception>
      ZERO_Thread::ZERO_Thread() :
      	running_(false), th_(NULL)
      {
      }
      ZERO_Thread::~ZERO_Thread()
      {
      	if (th_ != NULL)
      	{
      		//如果到調用析構函數的時候,調用者還沒有調用join則觸發detach,此時是一個比較危險的動作,用戶必須知道他在做什么
      			if (th_->joinable())
      			{
      				std::cout << "~ZERO_Thread detach\n";
      				th_->detach();
      			}
      		delete th_;
      		th_ = NULL;
      	}
      	std::cout << "~ZERO_Thread()" << std::endl;
      }
      bool ZERO_Thread::start()
      {
      	if (running_)
      	{
      		return false;
      	}
      	try
      	{
      		th_ = new std::thread(& ZERO_Thread::threadEntry, this);
      	}
      	catch (...)
      	{
      		throw "[ZERO_Thread::start] thread start error";
      	}
      	return true;
      }
      void ZERO_Thread::stop()
      {
      	running_ = false;
      }
      bool ZERO_Thread::isAlive() const
      {
      	return running_;
      }
      void ZERO_Thread::join()
      {
      	if (th_->joinable())
      	{
      		th_->join(); // 不是detach才去join
      	}
      }
      void ZERO_Thread::detach()
      {
      	th_->detach();
      }
      size_t ZERO_Thread::CURRENT_THREADID()
      {
      	// 聲明為thread_local的本地變量在線程中是持續存在的,不同于普通臨時變量的生命周期,
      	// 它具有static變量一樣的初始化特征和生命周期,即使它不被聲明為static。
      	static thread_local size_t threadId = 0;
      	if (threadId == 0)
      	{
      		std::stringstream ss;
      		ss << std::this_thread::get_id();
      		threadId = strtol(ss.str().c_str(), NULL, 0);
      	}
      	return threadId;
      }
      void ZERO_Thread::threadEntry()
      {
      	running_ = true;
      	try
      	{
      		run(); // 函數運行所在
      	}	
      	catch (std::exception& ex)
      	{
      		running_ = false;
      		throw ex;
      	}
      	catch (...)
      	{
      		running_ = false;
      		throw;
      	}
      	running_ = false;
      }
      
      

      main.cpp

      #include <iostream>
      #include <chrono>
      #include "zero_thread.h"
      using namespace std;
      class A : public ZERO_Thread
      {
      public:
      	void run()
      	{
      		while (running_)
      		{
      			cout << "print A " << endl;
      			std::this_thread::sleep_for(std::chrono::seconds(5));
      		}
      		cout << "----- leave A " << endl;
      	}
      };
      class B : public ZERO_Thread
      {
      public:
      	void run()
      	{
      		while (running_)
      		{
      			cout << "print B " << endl;
      			std::this_thread::sleep_for(std::chrono::seconds(2));
      		}
      		cout << "----- leave B " << endl;
      	}
      };
      int main()
      {
      	{
      		A a;
      		a.start();
      		B b;
      		b.start();
      		std::this_thread::sleep_for(std::chrono::seconds(5));
      		a.stop();
      		a.join();
      		b.stop();
      		b.join();
      	}
      	cout << "Hello World!" << endl;
      	return 0;
      }
      

      1.1.4 進一步閱讀

      https://cplusplus.com/reference/thread/this_thread/

      • get_id
        Get thread id (function )
      • yield
        Yield to other threads (function )
      • sleep_until
        Sleep until time point (function )
      • sleep_for
        Sleep for time span (function )

      1.2互斥量

      mutex又稱互斥量,C++ 11中與 mutex相關的類(包括鎖類型)和函數都聲明在 頭文件中,所以如果
      你需要使用 std::mutex,就必須包含 頭文件。
      C++11提供如下4種語義的互斥量(mutex)

      • std::mutex,獨占的互斥量,不能遞歸使用。
      • std::time_mutex,帶超時的獨占互斥量,不能遞歸使用。
      • std::recursive_mutex,遞歸互斥量,不帶超時功能。
      • std::recursive_timed_mutex,帶超時的遞歸互斥量。

      1.2.1 獨占互斥量std::mutex

      std::mutex介紹

      下面以 std::mutex 為例介紹 C++11 中的互斥量用法。
      std::mutex 是C++11 中最基本的互斥量,std::mutex 對象提供了獨占所有權的特性——即不支持遞歸地
      對 std::mutex 對象上鎖,而 std::recursive_lock 則可以遞歸地對互斥量對象上鎖。

      std::mutex 的成員函數

      • 構造函數,std::mutex不允許拷貝構造,也不允許 move 拷貝,最初產生的mutex對象是處于unlocked 狀態的。
      • lock(),調用線程將鎖住該互斥量。線程調用該函數會發生下面 3 種情況:(1). 如果該互斥量當前沒有被鎖住,則調用線程將該互斥量鎖住,直到調用 unlock之前,該線程一直擁有該鎖。(2). 如果當前互斥量被其他線程鎖住,則當前的調用線程被阻塞住。(3). 如果當前互斥量被當前調用線程鎖住,則會產生死鎖(deadlock)。
      • unlock(), 解鎖,釋放對互斥量的所有權。
      • try_lock(),嘗試鎖住互斥量,如果互斥量被其他線程占有,則當前線程也不會被阻塞。線程調用該函數也會出現下面 3 種情況,(1). 如果當前互斥量沒有被其他線程占有,則該線程鎖住互斥量,直到該線程調用 unlock 釋放互斥量。(2). 如果當前互斥量被其他線程鎖住,則當前調用線程返回false,而并不會被阻塞掉。(3). 如果當前互斥量被當前調用線程鎖住,則會產生死鎖(deadlock)。

      范例1-2-mutex1

      #include <iostream>       // std::cout
      #include <thread>         // std::thread
      #include <mutex>          // std::mutex
      
      volatile int counter(0); // non-atomic counter
      std::mutex mtx;           // locks access to counter
      
      void increases_10k()
      {
          for (int i=0; i<10000; ++i) {
              // 1. 使用try_lock的情況
              if (mtx.try_lock()) {   // only increase if currently not locked:
                  ++counter;
                  mtx.unlock();
              }
              // 2. 使用lock的情況
              //        {
              //            mtx.lock();
              //            ++counter;
              //            mtx.unlock();
              //        }
          }
      }
      
      int main()
      {
          std::thread threads[10];
          for (int i=0; i<10; ++i)
              threads[i] = std::thread(increases_10k);
      
          for (auto& th : threads) th.join();
          std::cout << " successful increases of the counter "  << counter << std::endl;
      
          return 0;
      }
      

      1.2.2 遞歸互斥量std::recursive_mutex

      遞歸鎖允許同一個線程多次獲取該互斥鎖,可以用來解決同一線程需要多次獲取互斥量時死鎖的問題。

      死鎖范例1-2-mutex2-dead-lock

      //死鎖范例1-2-mutex2-dead-lock
      #include <iostream>
      #include <thread>
      #include <mutex>
      
      struct Complex
      {
          std::mutex mutex;
          int i;
      
          Complex() : i(0){}
      
          void mul(int x)
          {
              std::lock_guard<std::mutex> lock(mutex);
              i *= x;
          }
      
          void div(int x)
          {
              std::lock_guard<std::mutex> lock(mutex);
              i /= x;
          }
      
          void both(int x, int y)
          {
              //lock_guard 構造函數加鎖, 析構函數釋放鎖
              std::lock_guard<std::mutex> lock(mutex);
              mul(x); // 獲取不了鎖
              div(y);
          }
      
          void init()
          {
              //lock_guard 構造函數加鎖, 析構函數釋放鎖
              std::lock_guard<std::mutex> lock(mutex);
              sub_init();
          }
          void sub_init()
          {
              std::lock_guard<std::mutex> lock(mutex);
          }
      };
      
      int main(void)
      {
          Complex complex;
      
          complex.both(32, 23);
          std::cout << "main finish\n";
          return 0;
      }
      
      

      運行后出現死鎖的情況。在調用both時獲取了互斥量,在調用mul時又要獲取互斥量,但both的并沒有
      釋放,從而產生死鎖。
      使用遞歸鎖

      //遞歸鎖1-2-recursive_mutex1
      #include <iostream>
      #include <thread>
      #include <mutex>
      
      struct Complex
      {
          std::recursive_mutex mutex;
          int i;
      
          Complex() : i(0){}
      
          void mul(int x)
          {
              std::lock_guard<std::recursive_mutex> lock(mutex);
              i *= x;
          }
      
          void div(int x)
          {
              std::unique_lock<std::recursive_mutex> lock(mutex);
      
      
              i /= x;
          }
      
          void both(int x, int y)
          {
              std::lock_guard<std::recursive_mutex> lock(mutex);
              mul(x);
              div(y);
          }
      };
      
      int main(void)
      {
          Complex complex;
      
          complex.both(32, 23);  //因為同一線程可以多次獲取同一互斥量,不會發生死鎖
      
          std::cout << "main finish\n";
          return 0;
      }
      
      

      雖然遞歸鎖能解決這種情況的死鎖問題,但是盡量不要使用遞歸鎖,主要原因如下:

      1. 需要用到遞歸鎖的多線程互斥處理本身就是可以簡化的,允許遞歸很容易放縱復雜邏輯的產生,并
        且產生晦澀,當要使用遞歸鎖的時候應該重新審視自己的代碼是否一定要使用遞歸鎖;
      2. 遞歸鎖比起非遞歸鎖,效率會低;
      3. 遞歸鎖雖然允許同一個線程多次獲得同一個互斥量,但可重復獲得的最大次數并未具體說明,一旦
        超過一定的次數,再對lock進行調用就會拋出std::system錯誤。

      1.2.3 帶超時的互斥量std::timed_mutex和std::recursive_timed_mutex

      std::timed_mutex比std::mutex多了兩個超時獲取鎖的接口:try_lock_for和try_lock_until

      #include <iostream>
      #include <thread>
      #include <mutex>
      #include <chrono>
      std::timed_mutex mutex;
      void work()
      {
      std::chrono::milliseconds timeout(100);
      while (true)
      {
      if (mutex.try_lock_for(timeout))
      {
      std::cout << std::this_thread::get_id() << ": do work with the
      mutex" << std::endl;
      std::chrono::milliseconds sleepDuration(250);
      std::this_thread::sleep_for(sleepDuration);
      mutex.unlock();
      std::this_thread::sleep_for(sleepDuration);
      }
      else
      {
      std::cout << std::this_thread::get_id() << ": do work without the
      mutex" << std::endl;
      std::chrono::milliseconds sleepDuration(100);
      std::this_thread::sleep_for(sleepDuration);
      }
      }
      }
      int main(void)
      {
      std::thread t1(work);
      std::thread t2(work);
      t1.join();
      t2.join();
      std::cout << "main finish\n";
      return 0;
      }
      

      1.2.4 lock_guard和unique_lock的使用和區別

      相對于手動lock和unlock,我們可以使用RAII(通過類的構造析構)來實現更好的編碼方式。
      RAII:也稱為“資源獲取就是初始化”,是c++等編程語言常用的管理資源、避免內存泄露的方法。它保證
      在任何情況下,使用對象時先構造對象,最后析構對象。

      1 unique_lock,lock_guard的使用

      這里涉及到unique_lock,lock_guard的使用。

      #include <iostream>       // std::cout
      #include <thread>         // std::thread
      #include <mutex>          // std::mutex, std::lock_guard
      #include <stdexcept>      // std::logic_error
      
      std::mutex mtx;
      
      void print_even (int x) {
          if (x%2==0) std::cout << x << " is even\n";
          else throw (std::logic_error("not even"));
      }
      
      void print_thread_id (int id) {
          try {
      //        這里的lock_guard換成unique_lock是一樣的。
              // using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
              std::lock_guard<std::mutex> lck (mtx);
              print_even(id);
          }
          catch (std::logic_error&) {
              std::cout << "[exception caught]\n";
          }
      }
      
      int main ()
      {
          std::thread threads[10];
          // spawn 10 threads:
          for (int i=0; i<10; ++i)
              threads[i] = std::thread(print_thread_id,i+1);
      
          for (auto& th : threads) th.join();
      
          return 0;
      }
      
      

      2 unique_lock,lock_guard的區別

      • unique_lock與lock_guard都能實現自動加鎖和解鎖,但是前者更加靈活,能實現更多的功能。
      • unique_lock可以進行臨時解鎖和再上鎖,如在構造對象之后使用lck.unlock()就可以進行解鎖,lck.lock()進行上鎖,而不必等到析構時自動解鎖。
      #include <iostream>
      #include <deque>
      #include <thread>
      #include <mutex>
      #include <condition_variable>
      #include <unistd.h>
      
      std::deque<int> q;
      std::mutex mu;
      std::condition_variable cond;
      int count = 0;
      
      void fun1() {
          while (true) {
      //        {
              std::unique_lock<std::mutex> locker(mu); // 能否換成lock_guard
              q.push_front(count++);
              locker.unlock();        // 這里是不是必須的?
              cond.notify_one();
      //        }
              sleep(1);
          }
      }
      
      void fun2() {
          while (true) {
              std::unique_lock<std::mutex> locker(mu);
              cond.wait(locker, [](){return !q.empty();});
              auto data = q.back();
              q.pop_back();
      //        locker.unlock(); // 這里是不是必須的?
              std::cout << "thread2 get value form thread1: " << data << std::endl;
          }
      }
      int main() {
          std::thread t1(fun1);
          std::thread t2(fun2);
          t1.join();
          t2.join();
          return 0;
      }
      

      條件變量的目的就是為了,在沒有獲得某種提醒時長時間休眠; 如果正常情況下, 我們需要一直循環
      (+sleep), 這樣的問題就是CPU消耗+時延問題,條件變量的意思是在cond.wait這里一直休眠直到
      cond.notify_one喚醒才開始執行下一句; 還有cond.notify_all()接口用于喚醒所有等待的線程。

      那么為什么必須使用unique_lock呢?

      原因: 條件變量在wait時會進行unlock再進入休眠, lock_guard并無該操作接口
      wait: 如果線程被喚醒或者超時那么會先進行lock獲取鎖, 再判斷條件(傳入的參數)是否成立, 如果成立則wait函數返回否則釋放鎖繼續休眠
      notify:進行notify動作并不需要獲取鎖
      使用場景:需要結合notify+wait的場景使用unique_lock; 如果只是單純的互斥使用lock_guard

      3 總結

      lock_guard

      1. std::lock_guard 在構造函數中進行加鎖,析構函數中進行解鎖。
      2. 鎖在多線程編程中,使用較多,因此c++11提供了lock_guard模板類;在實際編程中,我們也可以根
        據自己的場景編寫resource_guard RAII類,避免忘掉釋放資源。

      std::unique_lock

      1. unique_lock 是通用互斥包裝器,允許延遲鎖定、鎖定的有時限嘗試、遞歸鎖定、所有權轉移和與條件變量一同使用。
      2. unique_lock比lock_guard使用更加靈活,功能更加強大。
      3. 使用unique_lock需要付出更多的時間、性能成本。

      1.3 條件變量

      互斥量是多線程間同時訪問某一共享變量時,保證變量可被安全訪問的手段。但單靠互斥量無法實現線
      程的同步。線程同步是指線程間需要按照預定的先后次序順序進行的行為。C++11對這種行為也提供了
      有力的支持,這就是條件變量。條件變量位于頭文件condition_variable下。
      http://www.cplusplus.com/reference/condition_variable/condition_variable
      條件變量使用過程:

      1. 擁有條件變量的線程獲取互斥量;
      2. 循環檢查某個條件,如果條件不滿足則阻塞直到條件滿足;如果條件滿足則向下執行;
      3. 某個線程滿足條件執行完之后調用notify_one或notify_all喚醒一個或者所有等待線程。

      1.3.1 成員函數

      1 wait函數

      函數原型

      void wait (unique_lock<mutex>& lck);
      template <class Predicate>
      void wait (unique_lock<mutex>& lck, Predicate pred);
      

      包含兩種重載,第一種只包含unique_lock對象,另外一個Predicate 對象(等待條件),這里必須使用
      unique_lock,因為wait函數的工作原理:

      • 當前線程調用wait()后將被阻塞并且函數會解鎖互斥量,直到另外某個線程調用notify_one或者
        notify_all喚醒當前線程;一旦當前線程獲得通知(notify),wait()函數也是自動調用lock(),同理不
        能使用lock_guard對象。
      • 如果wait沒有第二個參數,第一次調用默認條件不成立,直接解鎖互斥量并阻塞到本行,直到某一
        個線程調用notify_one或notify_all為止,被喚醒后,wait重新嘗試獲取互斥量,如果得不到,線程
        會卡在這里,直到獲取到互斥量,然后無條件地繼續進行后面的操作。
      • 如果wait包含第二個參數,如果第二個參數不滿足,那么wait將解鎖互斥量并堵塞到本行,直到某
        一個線程調用notify_one或notify_all為止,被喚醒后,wait重新嘗試獲取互斥量,如果得不到,線
        程會卡在這里,直到獲取到互斥量,然后繼續判斷第二個參數,如果表達式為false,wait對互斥
        量解鎖,然后休眠,如果為true,則進行后面的操作。

      2 wait_for函數

      函數原型:

      template <class Rep, class Period>
      cv_status wait_for (unique_lock<mutex>& lck,
      const chrono::duration<Rep,Period>& rel_time);
      template <class Rep, class Period, class Predicate>
      bool wait_for (unique_lock<mutex>& lck,
      const chrono::duration<Rep,Period>& rel_time, Predicate
      pred);
      

      和wait不同的是,wait_for可以執行一個時間段,在線程收到喚醒通知或者時間超時之前,該線程都會
      處于阻塞狀態,如果收到喚醒通知或者時間超時,wait_for返回,剩下操作和wait類似。

      3 wait_until函數

      函數原型:

      template <class Clock, class Duration>
      cv_status wait_until (unique_lock<mutex>& lck,
      const chrono::time_point<Clock,Duration>& abs_time);
      template <class Clock, class Duration, class Predicate>
      bool wait_until (unique_lock<mutex>& lck,
      const chrono::time_point<Clock,Duration>& abs_time,
      Predicate pred);
      

      與wait_for類似,只是wait_until可以指定一個時間點,在當前線程收到通知或者指定的時間點超時之
      前,該線程都會處于阻塞狀態。如果超時或者收到喚醒通知,wait_until返回,剩下操作和wait類似

      4 notify_one函數

      函數原型:

      void notify_one() noexcept;
      

      解鎖正在等待當前條件的線程中的一個,如果沒有線程在等待,則函數不執行任何操作,如果正在等待
      的線程多余一個,則喚醒的線程是不確定的。

      5 notify_all函數

      函數原型:

      void notify_all() noexcept;
      

      解鎖正在等待當前條件的所有線程,如果沒有正在等待的線程,則函數不執行任何操作.

      1.3.2 范例

      使用條件變量實現一個同步隊列,同步隊列作為一個線程安全的數據共享區,經常用于線程之間數據讀
      取。

      sync_queue.h

      //同步隊列的實現1-3-condition-sync-queue
      #ifndef SYNC_QUEUE_H
      #define SYNC_QUEUE_H
      #include<list>
      #include<mutex>
      #include<thread>
      #include<condition_variable>
      #include <iostream>
      template<typename T>
      class SyncQueue
      {
      private:
      bool IsFull() const
      {
      return _queue.size() == _maxSize;
      }
      bool IsEmpty() const
      {
      return _queue.empty();
      }
      public:
      SyncQueue(int maxSize) : _maxSize(maxSize)
      {
      }
      void Put(const T& x)
      {
      std::lock_guard<std::mutex> locker(_mutex);
      while (IsFull())
      {
      std::cout << "full wait..." << std::endl;
      _notFull.wait(_mutex);
      }
      _queue.push_back(x);
      _notFull.notify_one();
      }
      void Take(T& x)
      {
      std::lock_guard<std::mutex> locker(_mutex);
      while (IsEmpty())
      {
      std::cout << "empty wait.." << std::endl;
      _notEmpty.wait(_mutex);
      }
      x = _queue.front();
      _queue.pop_front();
      _notFull.notify_one();
      }
      bool Empty()
      {
      std::lock_guard<std::mutex> locker(_mutex);
      return _queue.empty();
      }
      bool Full()
      {
      std::lock_guard<std::mutex> locker(_mutex);
      return _queue.size() == _maxSize;
      }
      size_t Size()
      {
      std::lock_guard<std::mutex> locker(_mutex);
      return _queue.size();
      }
      int Count()
      {
      return _queue.size();
      }
      private:
      std::list<T> _queue; //緩沖區
      std::mutex _mutex; //互斥量和條件變量結合起來使用
      std::condition_variable_any _notEmpty;//不為空的條件變量
      std::condition_variable_any _notFull; //沒有滿的條件變量
      int _maxSize; //同步隊列最大的size
      };
      #endif // SYNC_QUEUE_H
      

      main.cpp

      #include <iostream>
      #include "sync_queue.h"
      #include <thread>
      #include <iostream>
      #include <mutex>
      using namespace std;
      SyncQueue<int> syncQueue(5);
      void PutDatas()
      {
      for (int i = 0; i < 20; ++i)
      {
      syncQueue.Put(888);
      }
      std::cout << "PutDatas finish\n";
      }
      void TakeDatas()
      {
      int x = 0;
      for (int i = 0; i < 20; ++i)
      {
      syncQueue.Take(x);
      std::cout << x << std::endl;
      }
      std::cout << "TakeDatas finish\n";
      }
      int main(void)
      {
      std::thread t1(PutDatas);
      std::thread t2(TakeDatas);
      t1.join();
      t2.join();
      std::cout << "main finish\n";
      return 0;
      }
      

      代碼中用到了std::lock_guard,它利用RAII機制可以保證安全釋放mutex。

      std::lock_guard<std::mutex> locker(_mutex);
      while (IsFull())
      {
      std::cout << "full wait..." << std::endl;
      _notFull.wait(_mutex);
      }
      

      可以改成

      std::lock_guard<std::mutex> locker(_mutex);
      _notFull.wait(_mutex, [this] {return !IsFull();});
      

      兩種寫法效果是一樣的,但是后者更簡潔,條件變量會先檢查判斷式是否滿足條件,如果滿足條件則重
      新獲取mutex,然后結束wait繼續往下執行;如果不滿足條件則釋放mutex,然后將線程置為waiting狀
      態繼續等待。
      這里需要注意的是,wait函數中會釋放mutex,而lock_guard這時還擁有mutex,它只會在出了作用域
      之后才會釋放mutex,所以這時它并不會釋放,但執行wait時會提前釋放mutex

      從語義上看這里使用lock_guard會產生矛盾,但是實際上并不會出問題,因為wait提前釋放鎖之后會處
      于等待狀態,在被notify_one或者notify_all喚醒后會先獲取mutex,這相當于lock_guard的mutex在
      釋放之后又獲取到了
      ,因此,在出了作用域之后lock_guard自動釋放mutex不會有問題。
      這里應該用unique_lock,因為unique_lock不像lock_guard一樣只能在析構時才釋放鎖,它可以隨時釋
      放鎖,因此在wait時讓unique_lock釋放鎖從語義上更加準確。
      使用unique_lock和condition_variable_variable改寫condition-sync-queue,改寫為用等待一個判斷式的方法來實現一個簡單的隊列。

      condition-sync-queue2

      #ifndef SIMPLE_SYNC_QUEUE_H
      #define SIMPLE_SYNC_QUEUE_H
      #include <thread>
      #include <condition_variable>
      #include <mutex>
      #include <list>
      #include <iostream>
      
      template<typename T>
      class SimpleSyncQueue
      {
      public:
          SimpleSyncQueue(){}
      
          void Put(const T& x)
          {
              std::lock_guard<std::mutex> locker(_mutex);
              _queue.push_back(x);
              _notEmpty.notify_one();
          }
      
          void Take(T& x)
          {
              std::unique_lock<std::mutex> locker(_mutex);
              _notEmpty.wait(locker, [this]{return !_queue.empty(); });
      
              x = _queue.front();
              _queue.pop_front();
          }
      
          bool Empty()
          {
              std::lock_guard<std::mutex> locker(_mutex);
              return _queue.empty();
          }
      
          size_t Size()
          {
              std::lock_guard<std::mutex> locker(_mutex);
              return _queue.size();
          }
      
      private:
          std::list<T> _queue;
          std::mutex _mutex;
          std::condition_variable _notEmpty;
      };
      #endif // SIMPLE_SYNC_QUEUE_H
      
      

      main.cpp

      #include <iostream>
      #include "sync_queue.h"
      #include <thread>
      #include <iostream>
      #include <mutex>
      using namespace std;
      SimpleSyncQueue<int> syncQueue;
      
      void PutDatas()
      {
          for (int i = 0; i < 20; ++i)
          {
              syncQueue.Put(888);
          }
      }
      
      void TakeDatas()
      {
          int x = 0;
      
          for (int i = 0; i < 20; ++i)
          {
              syncQueue.Take(x);
              std::cout << x << std::endl;
          }
      }
      
      int main(void)
      {
          std::thread t1(PutDatas);
          std::thread t2(TakeDatas);
      
          t1.join();
          t2.join();
      
           std::cout << "main finish\n";
          return 0;
      }
      

      1.4 原子變量

      具體參考:http://www.cplusplus.com/reference/atomic/atomic/

      // atomic::load/store example
      #include <iostream> // std::cout
      #include <atomic> // std::atomic, std::memory_order_relaxed
      #include <thread> // std::thread
      //std::atomic<int> count = 0;//錯誤初始化
      std::atomic<int> count(0); // 準確初始化
      void set_count(int x)
      {
      std::cout << "set_count:" << x << std::endl;
      count.store(x, std::memory_order_relaxed); // set value atomically
      }
      void print_count()
      {
      int x;
      do {
      x = count.load(std::memory_order_relaxed); // get value atomically
      } while (x==0);
      std::cout << "count: " << x << '\n';
      }
      int main ()
      {
      std::thread t1 (print_count);
      std::thread t2 (set_count, 10);
      t1.join();
      t2.join();
      std::cout << "main finish\n";
      return 0;
      }
      

      1.5. 異步操作

      • std::future : 異步指向某個任務,然后通過future特性去獲取任務函數的返回結果。
      • std::aysnc: 異步運行某個任務函數
      • std::packaged_task :將任務和feature綁定在一起的模板,是一種封裝對任務的封裝。
      • std::promise
        參考C++官方手冊的范例。

      1.5.1 std::aysnc和std::future

      std::future期待一個返回,從一個異步調用的角度來說,future更像是執行函數的返回值,C++標準庫使用std::future為一次性事件建模,如果一個事件需要等待特定的一次性事件,那么這線程可以獲取一
      個future對象來代表這個事件。
      異步調用往往不知道何時返回,但是如果異步調用的過程需要同步,或者說后一個異步調用需要使用前
      一個異步調用的結果。這個時候就要用到future。
      線程可以周期性的在這個future上等待一小段時間,檢查future是否已經ready,如果沒有,該線程可以
      先去做另一個任務,一旦future就緒,該future就無法復位(無法再次使用這個future等待這個事
      件),所以future代表的是一次性事件。

      future的類型

      在庫的頭文件中聲明了兩種future,唯一future(std::future)和共享future(std::shared_future)這
      兩個是參照。
      std::unique_ptr和std::shared_ptr設立的,前者的實例是僅有的一個指向其關聯事件的實例,而后者可
      以有多個實例指向同一個關聯事件,當事件就緒時,所有指向同一事件的std::shared_future實例會變成
      就緒。

      future的使用

      std::future是一個模板,例如std::future,模板參數就是期待返回的類型,雖然future被用于線程間通
      信,但其本身卻并不提供同步訪問,熱門必須通過互斥元或其他同步機制來保護訪問。
      future使用的時機是當你不需要立刻得到一個結果的時候,你可以開啟一個線程幫你去做一項任務,并
      期待這個任務的返回,但是std::thread并沒有提供這樣的機制,這就需要用到std::async和std::future
      (都在頭文件中聲明)
      std::async返回一個std::future對象,而不是給你一個確定的值(所以當你不需要立刻使用此值的時候才
      需要用到這個機制)。當你需要使用這個值的時候,對future使用get(),線程就會阻塞直到future就
      緒,然后返回該值。

      #include<iostream>
      #include<future>
      #include<thread>
      using namespace std;
      
      int find_result_to_add()
      {
      	//std::this_thread::sleep_for(std::chrono::seconds(2));
      	std::cout << "find_result_to add" << std::endl;
      	return 1 + 1;
      }
      
      int find_result_to_add2(int a,int b)
      {
      	//std::this_thread::sleep_for(std::chrono::seconds(5));
      	return a + b;
      }
      
      void do_other_thing()
      {
      	std::cout << "do_other_things" << std::endl;
      }
      int main()
      {
      	std::future<int> result = std::async(find_result_to_add);
      	do_other_thing();
      	std::cout << "result: " << result.get() << std::endl;
      	auto result2 = std::async(find_result_to_add2, 10, 20);
      	std::cout << "result2:" << result2.get() << std::endl;
      	return 0;
      }
      

      跟thread類似,async允許你通過將額外的參數添加到調用中,來將附加參數傳遞給函數。如果傳入的
      函數指針是某個類的成員函數,則還需要將類對象指針傳入(直接傳入,傳入指針,或者是std::ref封
      裝)。
      默認情況下,std::async是否啟動一個新線程,或者在等待future時,任務是否同步運行都取決于你給的
      參數。這個參數為std::launch類型

      • std::launch::defered表明該函數會被延遲調用,直到在future上調用get()或者wait()為止
      • std::launch::async,表明函數會在自己創建的線程上運行
      • std::launch::any = std::launch::defered | std::launch::async
      • std::launch::sync = std::launch::defered
      enum class launch
      {
      async,deferred,sync=deferred,any=async|deferred
      };
      

      PS:默認選項參數被設置為std::launch::any。如果函數被延遲運行可能永遠都不會運行。

      1.5.2 std::packaged_task

      如果說std::async和std::feature還是分開看的關系的話,那么std::packaged_task就是將任務和feature
      綁定在一起的模板,是一種封裝對任務的封裝。
      The class template std::packaged_task wraps any Callable target (function, lambda expression,bind expression, or another function object) so that it can be invoked asynchronously. Its return value or exception thrown is stored in a shared state which can be accessed through std::future objects.

      可以通過std::packaged_task對象獲取任務相關聯的feature,調用get_future()方法可以獲得
      std::packaged_task對象綁定的函數的返回值類型的future。std::packaged_task的模板參數是函數簽
      名。

      PS:例如int add(int a, intb)的函數簽名就是int(int, int)

      #include <iostream>
      #include <future>
      using namespace std;
      int add(int a, int b, int c)
      {
      std::cout << "call add\n";
      return a + b + c;
      }
      void do_other_things()
      {
      std::cout << "do_other_things" << std::endl;
      }
      int main()
      {
      std::packaged_task<int(int, int, int)> task(add); // 封裝任務
      do_other_things();
      std::future<int> result = task.get_future();
      task(1, 1, 2); //必須要讓任務執行,否則在get()獲取future的值時會一直阻塞
      std::cout << "result:" << result.get() << std::endl;
      return 0;
      }
      

      1.5.3 std::promise

      std::promise提供了一種設置值的方式,它可以在這之后通過相關聯的std::future對象進行讀取。換種說法,之前已經說過std::future可以讀取一個異步函數的返回值了,那么這個std::promise就提供一種方式手動讓future就緒。

      //1-6-promise
      // std::promise和std::future配合,可以在線程之間傳遞數據。
      #include <future>
      #include <string>
      #include <thread>
      #include <iostream>
      using namespace std;
      void print(std::promise<std::string>& p)
      {
          p.set_value("There is the result whitch you want.");
      }
      
      void print2(std::promise<int>& p)
      {
          p.set_value(1);
      }
      
      void do_some_other_things()
      {
          std::cout << "Hello World" << std::endl;
      }
      
      int main()
      {
          std::promise<std::string> promise;
      
          std::future<std::string> result = promise.get_future();
          std::thread t(print, std::ref(promise));
          do_some_other_things();
          std::cout <<"result " << result.get() << std::endl;
          t.join();
      
          std::promise<int> promise2;
      
          std::future<int> result2 = promise2.get_future();
          std::thread t2(print2, std::ref(promise2));
          do_some_other_things();
          std::cout << "result2 " << result2.get() << std::endl;
          t2.join();
          return 0;
      }
      
      

      由此可以看出在promise創建好的時候future也已經創建好了
      線程在創建promise的同時會獲得一個future,然后將promise傳遞給設置他的線程,當前線程則持有
      future,以便隨時檢查是否可以取值.

      1.5.4 總結

      future的表現為期望,當前線程持有future時,期望從future獲取到想要的結果和返回,可以把future當
      做異步函數的返回值。而promise是一個承諾,當線程創建了promise對象后,這個promise對象向線程承諾他必定會被人設置一個值,和promise相關聯的future就是獲取其返回的手段。

      2 Function和bind用法

      在設計回調函數的時候,無可避免地會接觸到可回調對象。在C++11中,提供了std::function和std::bind兩個方法來對可回調函數進行統一和封裝。
      C++語言中有幾種可調用對象:lambda表達式、bind創建的對象以及重載了函數調用運算符的類。
      和其他對象一樣,可調用對象也有類型。例如,每個lambda有它自己唯一的(未命名)類類型;函數及函數指針的類型則由其返回值類型和實參類型決定。

      2.1 function的用法

      包含頭文件:#include

      1. 保存普通函數
      //保存普通函數
      void func1(int a)
      {
      cout << a << endl;
      }
      //1. 保存普通函數
      std::function<void(int a)> func;
      func = func1;
      func(2); //2
      

      2.保存lambda表達式

      std::function<void()> func_1 = [](){cout << "hello world" << endl;};
      func_1(); //hello world
      

      3.保存成員函數

      //保存成員函數
      class A{
      public:
      A(string name) : name_(name){}
      void func3(int i) const {cout <<name_ << ", " << i << endl;}
      private:
      string name_;
      };
      //3 保存成員函數
      std::function<void(const A&,int)> func3_ = &A::func3;
      A a("darren");
      func3_(a, 1);
      
      

      完整代碼:范例:2-1-function

      #include <iostream>
      #include <functional>
      using namespace std;
      //保存普通函數
      void func1(int a)
      {
      cout << a << endl;
      }
      //保存成員函數
      class A{
      public:
      A(string name) : name_(name){}
      void func3(int i) const {cout <<name_ << ", " << i << endl;}
      private:
      string name_;
      };
      int main()
      {
      cout << "main1 -----------------" << endl;
      //1. 保存普通函數
      std::function<void(int a)> func1_;
      func1_ = func1;
      func1_(2); //2
      cout << "\n\nmain2 -----------------" << endl;
      //2. 保存lambda表達式
      std::function<void()> func2_ = [](){cout << "hello lambda" << endl;};
      func2_(); //hello world
      cout << "\n\nmain3 -----------------" << endl;
      //3 保存成員函數
      std::function<void(const A&,int)> func3_ = &A::func3;
      A a("darren");
      func3_(a, 1);
      return 0;
      }
      
      

      2.2bind用法

      可將bind函數看作是一個通用的函數適配器,它接受一個可調用對象,生成一個新的可調用對象來“適
      應”原對象的參數列表。
      調用bind的一般形式:auto newCallable = bind(callable, arg_list);
      其中,newCallable本身是一個可調用對象,arg_list是一個逗號分隔的參數列表,對應給定的callable的
      參數。即,當我們調用newCallable時,newCallable會調用callable,并傳給它arg_list中的參數。

      3 可變模板參數

      C++11的新特性--可變模版參數(variadic templates)是C++11新增的最強大的特性之一,它對參數進
      行了高度泛化,它能表示0到任意個數、任意類型的參數

      3.1 可變模版參數的展開

      可變參數模板語法

      template <class... T>
      void f(T... args);
      

      上面的可變模版參數的定義當中,省略號的作用有兩個:

      1. 聲明一個參數包T... args,這個參數包中可以包含0到任意個模板參數;
      2. 在模板定義的右邊,可以將參數包展開成一個一個獨立的參數。

      上面的參數args前面有省略號,所以它就是一個可變模版參數,我們把帶省略號的參數稱為“參數包”,
      它里面包含了0到N(N>=0)個模版參數。我們無法直接獲取參數包args中的每個參數的,只能通過展
      開參數包的方式來獲取參數包中的每個參數,這是使用可變模版參數的一個主要特點,也是最大的難
      點,即如何展開可變模版參數。
      可變模版參數和普通的模版參數語義是一致的,所以可以應用于函數和類,即可變模版參數函數和可變
      模版參數類,然而,模版函數不支持偏特化,所以可變模版參數函數和可變模版參數類展開可變模版參
      數的方法還不盡相同,下面我們來分別看看他們展開可變模版參數的方法。

      3.1.1 可變模版參數函數

      //3-1-variable-parameter 一個簡單的可變模版參數函數
      #include <iostream>
      using namespace std;
      template <class... T>
      void f(T... args)
      {
      cout << sizeof...(args) << endl; //打印變參的個數
      }
      int main()
      {
      f(); //0
      f(1, 2); //2
      f(1, 2.5, ""); //3
      return 0;
      }
      

      上面的例子中,f()沒有傳入參數,所以參數包為空,輸出的size為0,后面兩次調用分別傳入兩個和三個
      參數,故輸出的size分別為2和3。由于可變模版參數的類型和個數是不固定的,所以我們可以傳任意類
      型和個數的參數給函數f。這個例子只是簡單的將可變模版參數的個數打印出來,如果我們需要將參數包
      中的每個參數打印出來的話就需要通過一些方法了。
      展開可變模版參數函數的方法一般有兩種:

      1. 通過遞歸函數來展開參數包,
      2. 是通過逗號表達式來展開參數包。
        下面來看看如何用這兩種方法來展開參數包。

      遞歸函數方式展開參數包

      通過遞歸函數展開參數包,需要提供一個參數包展開的函數和一個遞歸終止函數,遞歸終止函數正是用
      來終止遞歸的.

      //3-1-variable-parameter2 遞歸函數方式展開參數包
      #include <iostream>
      using namespace std;
      //遞歸終止函數
      void print()
      {
      cout << "empty" << endl;
      }
      //展開函數
      template <class T, class ...Args>
      void print(T head, Args... rest)
      {
      cout << "parameter " << head << endl;
      print(rest...);
      }
      int main(void)
      {
      print(1,2,3,4);
      return 0;
      }
      

      上例會輸出每一個參數,直到為空時輸出empty。展開參數包的函數有兩個,一個是遞歸函數,另外一
      個是遞歸終止函數,參數包Args...在展開的過程中遞歸調用自己,每調用一次參數包中的參數就會少一
      個,直到所有的參數都展開為止,當沒有參數時,則調用非模板函數print終止遞歸過程。
      上面的遞歸終止函數還可以寫成這樣:

      template <class T>
      void print(T t)
      {
      cout << t << endl;
      }
      

      逗號表達式展開參數包

      遞歸函數展開參數包是一種標準做法,也比較好理解,但也有一個缺點,就是必須要一個重載的遞歸終止
      函數,即必須要有一個同名的終止函數來終止遞歸,這樣可能會感覺稍有不便。有沒有一種更簡單的方
      式呢?其實還有一種方法可以不通過遞歸方式來展開參數包,這種方式需要借助逗號表達式和初始化列
      表。比如前面print的例子可以改成這樣:

      #include <iostream>
      using namespace std;
      template <class T>
      void printarg(T t)
      {
      cout << t << endl;
      }
      template <class ...Args>
      void expand(Args... args)
      {
      int arr[] = {(printarg(args), 0)...};
      }
      int main()
      {
      expand(1,2,3,4);
      return 0;
      }
      
      

      這個例子將分別打印出1,2,3,4四個數字。這種展開參數包的方式,不需要通過遞歸終止函數,是直接在expand函數體中展開的, printarg不是一個遞歸終止函數,只是一個處理參數包中每一個參數的函數。
      expand函數中的逗號表達式:(printarg(args), 0),先執行printarg(args),再得到逗號表達式的結果0。
      同時還用到了C++11的另外一個特性——初始化列表,通過初始化列表來初始化一個變長數組,{(printarg(args), 0)...}將會展開成((printarg(arg1),0), (printarg(arg2),0), (printarg(arg3),0), etc... ),最終會創建一個元素值都為0的數組int arr[sizeof...(Args)]。由于是逗號表達式,在創建數組的過程中會先執行逗號表達式前面的部分printarg(args)打印出參數,也就是說在構造int數組的過程中就將參數包展開了,這個數組的目的純粹是為了在數組構造的過程展開參數包。我們可以把上面的例子再進一步改進一下,將函數作為參數,就可以支持lambda表達式了,從而可以少寫一個遞歸終止函數了,具體代碼如下:

      #include <iostream>
      using namespace std;
      template<class F, class... Args>void expand(const F& f, Args&&...args)
      {
      //這里用到了完美轉發
      initializer_list<int>{(f(std::forward< Args>(args)),0)...};
      }
      int main()
      {
      expand([](int i){cout<<i<<endl;}, 1,2,3);
      return 0;
      }
      

      4 實現C++線程池

      見課上分析
      重點

      • 可變參數
      • std::future
      • decltype
      • packaged_task
      • bind
      • 支持可變參數列表
      • 支持獲取任務返回值

      zero_threadpool.h

      //zero_threadpool.h
      #ifndef ZERO_THREADPOOL_H
      #define ZERO_THREADPOOL_H
      
      #include <future>
      #include <functional>
      #include <iostream>
      #include <queue>
      #include <mutex>
      #include <memory>
      #ifdef WIN32
      #include <windows.h>
      #else
      #include <sys/time.h>
      #endif
      using namespace std;
      
      
      
      void getNow(timeval *tv);
      int64_t getNowMs();
      
      #define TNOW      getNow()
      #define TNOWMS    getNowMs()
      
      /////////////////////////////////////////////////
      /**
       * @file zero_thread_pool.h
       * @brief 線程池類,采用c++11來實現了,
       * 使用說明:
       * ZERO_ThreadPool tpool;
       * tpool.init(5);   //初始化線程池線程數
       * //啟動線程方式
       * tpool.start();
       * //將任務丟到線程池中
       * tpool.exec(testFunction, 10);    //參數和start相同
       * //等待線程池結束
       * tpool.waitForAllDone(1000);      //參數<0時, 表示無限等待(注意有人調用stop也會推出)
       * //此時: 外部需要結束線程池是調用
       * tpool.stop();
       * 注意:
       * ZERO_ThreadPool::exec執行任務返回的是個future, 因此可以通過future異步獲取結果, 比如:
       * int testInt(int i)
       * {
       *     return i;
       * }
       * auto f = tpool.exec(testInt, 5);
       * cout << f.get() << endl;   //當testInt在線程池中執行后, f.get()會返回數值5
       *
       * class Test
       * {
       * public:
       *     int test(int i);
       * };
       * Test t;
       * auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);
       * //返回的future對象, 可以檢查是否執行
       * cout << f.get() << endl;
       */
      
      class ZERO_ThreadPool
      {
      protected:
          struct TaskFunc
          {
              TaskFunc(uint64_t expireTime) : _expireTime(expireTime)
              { }
      
              std::function<void()>   _func;
              int64_t                _expireTime = 0;	//超時的絕對時間
          };
          typedef shared_ptr<TaskFunc> TaskFuncPtr;
      public:
          /**
          * @brief 構造函數
          *
          */
          ZERO_ThreadPool();
      
          /**
          * @brief 析構, 會停止所有線程
          */
          virtual ~ZERO_ThreadPool();
      
          /**
          * @brief 初始化.
          *
          * @param num 工作線程個數
          */
          bool init(size_t num);
      
          /**
          * @brief 獲取線程個數.
          *
          * @return size_t 線程個數
          */
          size_t getThreadNum()
          {
              std::unique_lock<std::mutex> lock(mutex_);
      
              return threads_.size();
          }
      
          /**
          * @brief 獲取當前線程池的任務數
          *
          * @return size_t 線程池的任務數
          */
          size_t getJobNum()
          {
              std::unique_lock<std::mutex> lock(mutex_);
              return tasks_.size();
          }
      
          /**
          * @brief 停止所有線程, 會等待所有線程結束
          */
          void stop();
      
          /**
          * @brief 啟動所有線程
          */
          bool start(); // 創建線程
      
          /**
          * @brief 用線程池啟用任務(F是function, Args是參數)
          *
          * @param ParentFunctor
          * @param tf
          * @return 返回任務的future對象, 可以通過這個對象來獲取返回值
          */
          template <class F, class... Args>
          auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
          {
              return exec(0,f,args...);
          }
      
          /**
          * @brief 用線程池啟用任務(F是function, Args是參數)
          *
          * @param 超時時間 ,單位ms (為0時不做超時控制) ;若任務超時,此任務將被丟棄
          * @param bind function
          * @return 返回任務的future對象, 可以通過這個對象來獲取返回值
          */
          /*
          template <class F, class... Args>
          它是c++里新增的最強大的特性之一,它對參數進行了高度泛化,它能表示0到任意個數、任意類型的參數
          auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
          std::future<decltype(f(args...))>:返回future,調用者可以通過future獲取返回值
          返回值后置
          */
          template <class F, class... Args>
          auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
          {
              int64_t expireTime =  (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  // 獲取現在時間
              //定義返回值類型
              using RetType = decltype(f(args...));  // 推導返回值
              // 封裝任務
              auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
      
              TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  // 封裝任務指針,設置過期時間
              fPtr->_func = [task]() {  // 具體執行的函數
                  (*task)();
              };
      
              std::unique_lock<std::mutex> lock(mutex_);
              tasks_.push(fPtr);              // 插入任務
              condition_.notify_one();        // 喚醒阻塞的線程,可以考慮只有任務隊列為空的情況再去notify
      
              return task->get_future();;
          }
      
          /**
          * @brief 等待當前任務隊列中, 所有工作全部結束(隊列無任務).
          *
          * @param millsecond 等待的時間(ms), -1:永遠等待
          * @return           true, 所有工作都處理完畢
          *                   false,超時退出
          */
          bool waitForAllDone(int millsecond = -1);
      
      protected:
          /**
          * @brief 獲取任務
          *
          * @return TaskFuncPtr
          */
          bool get(TaskFuncPtr&task);
      
          /**
          * @brief 線程池是否退出
          */
          bool isTerminate() { return bTerminate_; }
      
          /**
          * @brief 線程運行態
          */
          void run();
      
      protected:
      
          /**
          * 任務隊列
          */
          queue<TaskFuncPtr> tasks_;
      
          /**
          * 工作線程
          */
          std::vector<std::thread*> threads_;
      
          std::mutex                mutex_;
      
          std::condition_variable   condition_;
      
          size_t                    threadNum_;
      
          bool                      bTerminate_;
      
          std::atomic<int>          atomic_{ 0 };
      };
      
      #endif // ZERO_THREADPOOL_H
      

      zero_threadpool.cpp

      #include "zero_threadpool.h"
      
      ZERO_ThreadPool::ZERO_ThreadPool()
          :  threadNum_(1), bTerminate_(false)
      {
      }
      
      ZERO_ThreadPool::~ZERO_ThreadPool()
      {
          stop();
      }
      
      bool ZERO_ThreadPool::init(size_t num)
      {
          std::unique_lock<std::mutex> lock(mutex_);
      
          if (!threads_.empty())
          {
              return false;
          }
      
          threadNum_ = num;
          return true;
      }
      
      void ZERO_ThreadPool::stop()
      {
          {
              std::unique_lock<std::mutex> lock(mutex_);
      
              bTerminate_ = true;
      
              condition_.notify_all();
          }
      
          for (size_t i = 0; i < threads_.size(); i++)
          {
              if(threads_[i]->joinable())
              {
                  threads_[i]->join();
              }
              delete threads_[i];
              threads_[i] = NULL;
          }
      
          std::unique_lock<std::mutex> lock(mutex_);
          threads_.clear();
      }
      
      bool ZERO_ThreadPool::start()
      {
          std::unique_lock<std::mutex> lock(mutex_);
      
          if (!threads_.empty())
          {
              return false;
          }
      
          for (size_t i = 0; i < threadNum_; i++)
          {
              threads_.push_back(new thread(&ZERO_ThreadPool::run, this));
          }
          return true;
      }
      
      bool ZERO_ThreadPool::get(TaskFuncPtr& task)
      {
          std::unique_lock<std::mutex> lock(mutex_);
      
          if (tasks_.empty())
          {
              condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });
          }
      
          if (bTerminate_)
              return false;
      
          if (!tasks_.empty())
          {
              task = std::move(tasks_.front());  // 使用了移動語義
      
              tasks_.pop();
      
              return true;
          }
      
          return false;
      }
      
      void ZERO_ThreadPool::run()  // 執行任務的線程
      {
          //調用處理部分
          while (!isTerminate()) // 判斷是不是要停止
          {
              TaskFuncPtr task;
              bool ok = get(task);        // 讀取任務
              if (ok)
              {
                  ++atomic_;
                  try
                  {
                      if (task->_expireTime != 0 && task->_expireTime  < TNOWMS )
                      {
                          //超時任務,是否需要處理?
                      }
                      else
                      {
                          task->_func();  // 執行任務
                      }
                  }
                  catch (...)
                  {
                  }
      
                  --atomic_;
      
                  //任務都執行完畢了
                  std::unique_lock<std::mutex> lock(mutex_);
                  if (atomic_ == 0 && tasks_.empty())
                  {
                      condition_.notify_all();  // 這里只是為了通知waitForAllDone
                  }
              }
          }
      }
      
      bool ZERO_ThreadPool::waitForAllDone(int millsecond)
      {
          std::unique_lock<std::mutex> lock(mutex_);
      
          if (tasks_.empty())
              return true;
      
          if (millsecond < 0)
          {
              condition_.wait(lock, [this] { return tasks_.empty(); });
              return true;
          }
          else
          {
              return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty(); });
          }
      }
      
      
      int gettimeofday(struct timeval &tv)
      {
      #if WIN32
          time_t clock;
          struct tm tm;
          SYSTEMTIME wtm;
          GetLocalTime(&wtm);
          tm.tm_year   = wtm.wYear - 1900;
          tm.tm_mon   = wtm.wMonth - 1;
          tm.tm_mday   = wtm.wDay;
          tm.tm_hour   = wtm.wHour;
          tm.tm_min   = wtm.wMinute;
          tm.tm_sec   = wtm.wSecond;
          tm. tm_isdst  = -1;
          clock = mktime(&tm);
          tv.tv_sec = clock;
          tv.tv_usec = wtm.wMilliseconds * 1000;
      
          return 0;
      #else
          return ::gettimeofday(&tv, 0);
      #endif
      }
      
      void getNow(timeval *tv)
      {
      #if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUX
      
          int idx = _buf_idx;
          *tv = _t[idx];
          if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc)
          {
              addTimeOffset(*tv, idx);
          }
          else
          {
              TC_Common::gettimeofday(*tv);
          }
      #else
          gettimeofday(*tv);
      #endif
      }
      
      int64_t getNowMs()
      {
          struct timeval tv;
          getNow(&tv);
      
          return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
      }
      
      

      main.cpp

      #include <iostream>
      #include <zero_threadpool.h>
      using namespace std;
      
      void func0()
      {
          cout << "func0()" << endl;
      }
      
      void func1(int a)
      {
          cout << "func1() a=" << a << endl;
      }
      
      void func2(int a, string b)
      {
          cout << "func2() a=" << a << ", b=" << b<< endl;
      }
      
      
      void test1() // 簡單測試線程池
      {
          ZERO_ThreadPool threadpool;
          threadpool.init(1);
          threadpool.start(); // 啟動線程池
          // 假如要執行的任務
          threadpool.exec(1000,func0);
          threadpool.exec(func1, 10);
          threadpool.exec(func2, 20, "darren");
          threadpool.waitForAllDone();
          threadpool.stop();
      }
      
      int func1_future(int a)
      {
          cout << "func1() a=" << a << endl;
          return a;
      }
      
      string func2_future(int a, string b)
      {
          cout << "func1() a=" << a << ", b=" << b<< endl;
          return b;
      }
      
      void test2() // 測試任務函數返回值
      {
          ZERO_ThreadPool threadpool;
          threadpool.init(1);
          threadpool.start(); // 啟動線程池
          // 假如要執行的任務
          std::future<decltype (func1_future(0))> result1 = threadpool.exec(func1_future, 10);
          std::future<string> result2 = threadpool.exec(func2_future, 20, "darren");
      //  auto result2 = threadpool.exec(func2_future, 20, "darren");
      
          std::cout << "result1: " << result1.get() << std::endl;
          std::cout << "result2: " << result2.get() << std::endl;
          threadpool.waitForAllDone();
          threadpool.stop();
      }
      
      class Test
      {
      public:
          int test(int i){
              cout << _name << ", i = " << i << endl;
              return i;
          }
          void setName(string name){
              _name = name;
          }
          string _name;
      };
      
      void test3() // 測試類對象函數的綁定
      {
          ZERO_ThreadPool threadpool;
          threadpool.init(1);
          threadpool.start(); // 啟動線程池
          Test t1;
          Test t2;
          t1.setName("Test1");
          t2.setName("Test2");
          auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
          auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
          threadpool.waitForAllDone();
          cout << "t1 " << f1.get() << endl;
          cout << "t2 " << f2.get() << endl;
      }
      int main()
      {
      //    test1(); // 簡單測試線程池
      //    test2(); // 測試任務函數返回值
          test3(); // 測試類對象函數的綁定
          cout << "main finish!" << endl;
          return 0;
      }
      
      

      5異常處理

      C++ Core Guidelines (isocpp.github.io)
      std::exception_ptr
      Make exception_ptr
      重點參考:MSVC 中的異常處理

      5.1 異常處理基本語法

      C++的提供的關于異常的三個關鍵字: try{ throw } catch{ }

      #include <stdexcept>
      #include <limits>
      #include <iostream>
      using namespace std;
      void MyFunc(int c)
      {
      if (c > numeric_limits< char> ::max())
      throw invalid_argument("throw MyFunc argument too large.");
      //...
      }
      int main()
      {
      try
      {
      MyFunc(256); //cause an exception to throw
      }
      catch (invalid_argument& e)
      {
      cerr << "catch " << e.what() << endl;
      return -1;
      }
      //...
      return 0;
      }
      

      try在塊中,如果引發異常,則它將被其類型與異常匹配的第一個關聯catch塊捕獲。 換言之,執行
      throw語句跳轉到catch語句。 如果未找到可用的 catch 塊,std::terminate則將調用并退出
      程序。 在 c + + 中,可能會引發任何類型;但是,我們建議你引發直接或間接從std::exception派生
      的類型。 在上面的示例中,異常類型invalid_argument 在標頭文件的標準庫 中定義。
      語法比較簡單:throw(拋出)一個數據,然后再用catch(捕獲)接收。throw的數據類型可以是任意
      的,所以當然也可以是一個對象:

      struct Test
      {
      Test(const char* s, int i, double d)
      : s(s)
      , i(i)
      , d(d) {};
      const char* s;
      int i;
      double d;
      void print() const
      {
      printf("%s %d %.2f\n", s, i, d);
      }
      };
      int main()
      {
      try
      {
      throw Test("LLF", 520, 13.14);
      }
      catch (const Test& e)
      {
      e.print();
      }
      }
      
      

      5.2 基本指導原則

      強大的錯誤處理對于任何編程語言都很有挑戰性。 盡管異常提供了多個支持良好錯誤處理的功能,但它
      們無法為你完成所有工作。 若要實現異常機制的優點,請在設計代碼時記住異常。

      • 使用斷言來檢查絕不應發生的錯誤。 使用異常來檢查可能出現的錯誤,例如,公共函數參數的輸入
        驗證中的錯誤。 有關詳細信息,請參閱 異常與斷言 部分。
      • 當處理錯誤的代碼與通過一個或多個干預函數調用檢測到錯誤的代碼分離時,使用異常。 當處理錯誤的代碼與檢測到錯誤的代碼緊密耦合時,考慮是否使用錯誤代碼而不是在性能關鍵循環中。
      • 對于可能引發或傳播異常的每個函數,請提供以下三種異常保證之一:強保障、基本保證或nothrow (noexcept) 保證。 有關詳細信息,請參閱 如何:設計異常安全性
      • 按值引發異常,按引用來捕獲異常。不要捕獲無法處理的內容。
      • 不要使用 c + + 11 中已棄用的異常規范。 有關詳細信息,請參閱異常規范和 noexcept 部分。
      • 應用時使用標準庫異常類型。 從exception 類層次結構派生自定義異常類型。
      • 不允許對析構函數或內存釋放函數進行轉義。

      5.3 Exception 類

      對上面代碼的分析,可以看到,發生異常時拋出一個對象而不是一個簡單的數據類型,可以傳遞更多的
      錯誤信息,但是這樣的話,我們需要針對不同的異常情況定義不同的類。有沒有統一的解決方法?
      C++給出來了一個標準的異常類Exception。
      看一下定義:

      /**
      * @brief Base class for all library exceptions.
      *
      * This is the base class for all exceptions thrown by the standard
      * library, and by certain language expressions. You are free to derive
      * your own %exception classes, or use a different hierarchy, or to
      * throw non-class data (e.g., fundamental types).
      */
      class exception
      {
      public:
      exception() noexcept { }
      virtual ~exception() noexcept;
      exception(const exception&) = default;
      exception& operator=(const exception&) = default;
      exception(exception&&) = default;
      exception& operator=(exception&&) = default;
      /** Returns a C-style character string describing the general cause
      * of the current error. */
      virtual const char* what() const noexcept;
      };
      

      主要就是定義了一個what的虛函數,返回C_style的字符串,主要作用就是描述發生一場的原因。在使用
      的時候,往往需要自定義一個異常類:

      #include<exception>
      #include<iostream>
      using namespace std;
      class MyException:public exception
      {
      public:
      const char* what()const throw(){ //throw () 表示不允許任何異常產生
      return "ERROR! Don't divide a number by integer zero.\n";
      }
      };
      void check(int y) throw(MyException)
      { //throw (MyException)表示只允許myException的異常發生
      if(y==0) throw MyException();
      }
      int main()
      {
      int x=100,y=0;
      try{
      check(y);
      cout<<x/y;
      }catch(MyException& me){
      cout<<me.what();
      cout << "finish exception\n";
      return -1;
      }
      cout << "finish ok\n";
      return 0;
      }
      

      5.4 標準異常擴展

      C++定義了一些標準的異常,用于各種場景,他們都是繼承自std::exception的:

      下表是對上面層次結構中出現的每個異常的說明:

      異常 描述
      std::exception 該異常是所有標準 C++ 異常的父類。
      std::bad_alloc 該異常可以通過 new 拋出。
      std::bad_cast 該異常可以通過 dynamic_cast 拋出。
      std::bad_exception 這在處理 C++ 程序中無法預期的異常時非常有用。
      std::bad_typeid 該異常可以通過 typeid 拋出。
      std::logic_error 理論上可以通過讀取代碼來檢測到的異常。
      std::domain_error 當使用了一個無效的數學域時,會拋出該異常
      std::invalid_argument 當使用了無效的參數時,會拋出該異常。
      std::length_error 當創建了太長的 std::string 時,會拋出該異常。
      std::out_of_range 該異常可以通過方法拋出,例如 std::vector 和std::bitset<>::operator。
      std::runtime_error 理論上不可以通過讀取代碼來檢測到的異常。
      std::overflow_error 當發生數學上溢時,會拋出該異常。
      std::range_error 當嘗試存儲超出范圍的值時,會拋出該異常。
      std::underflow_error 當發生數學下溢時,會拋出該異常。

      5.5 std::exception_ptr

      根據官方文檔的介紹 std::exception_ptr是一個指向 exception object 的共享智能指針。
      關鍵在于理解 “exception object” 是什么,是std::exception類的對象嗎?這種理解是不準的,按我的理
      解,所謂“exception object” 應該是被throw拋出的對象,根據我們上面的學習,塔既可以是int、double
      等簡單的數據類型、也可以是自定義的類對象,當然也可以是std::exception類對象。
      有四個操作std::exception_ptr的函數:

      //5-5-exception_ptr exception_ptr example
      #include <iostream> // std::cout
      #include <exception> // std::exception_ptr, std::current_exception,
      std::rethrow_exception
      #include <stdexcept> // std::logic_error
      int main ()
      {
      std::exception_ptr p;
      try {
      throw std::logic_error("some logic_error exception"); // throws
      } catch(const std::exception& e) {
      p = std::current_exception();
      std::cout << "exception caught, but continuing...\n";
      }
      std::cout << "(after exception)\n";
      try {
      std::rethrow_exception (p);
      } catch (const std::exception& e) {
      std::cout << "exception caught: " << e.what() << '\n';
      }
      return 0;
      }
      
      
      • 首先定義了一個 std::exception_ptr變量p
      • 然后在第一個try中,拋出了一個標準異常(見上)
      • 在第一個catch中,調用 current_exception() ,這樣就讓p指向了捕獲的異常對象
      • 然后在第二個try中,調用 rethrow_exception ,將異常重新拋出
      • 然后在第二個catch中,依然正常的捕獲到了這個異常對象
      // 5-5-make_exception_ptr make_exception_ptr example
      #include <iostream> // std::cout
      #include <exception> // std::make_exception_ptr, std::rethrow_exception
      #include <stdexcept> // std::logic_error
      int main ()
      {
      auto p = std::make_exception_ptr(std::logic_error("logic_error"));
      try {
      std::rethrow_exception (p); // 重新拋出異常
      } catch (const std::exception& e) {
      std::cout << "exception caught: " << e.what() << '\n'; // 捕獲異常
      }
      return 0;
      }
      

      首先創建了一個異常make_exception_ptr
      然后再try中拋出該異常
      接著在catch捕獲拋出的異常。

      //5-5-nested_exception nested_exception example
      #include <iostream> // std::cerr
      #include <exception> // std::exception, std::throw_with_nested,
      std::rethrow_if_nested
      #include <stdexcept> // std::logic_error
      // recursively print exception whats:
      void print_what (const std::exception& e)
      {
      std::cout << __FUNCTION__ << ", L"<< __LINE__ << ", what:" << e.what() <<
      '\n';
      try {
      std::rethrow_if_nested(e);
      } catch (const std::exception& nested) {
      std::cerr << "nested: ";
      print_what(nested);
      }
      }
      // throws an exception nested in another:
      void throw_nested()
      {
      try {
      throw std::logic_error ("first");
      } catch (const std::exception& e) {
      std::throw_with_nested(std::logic_error("second"));
      }
      }
      int main ()
      {
      try {
      std::cout << __FUNCTION__ << ", L"<< __LINE__ << std::endl;
      throw_nested();
      } catch (std::exception& e) {
      std::cout << __FUNCTION__ << ", L"<< __LINE__ << std::endl;
      print_what(e);
      }
      return 0;
      }
      
      

      推薦一個零聲學院免費教程,個人覺得老師講得不錯,
      分享給大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,
      fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,
      TCP/IP,協程,DPDK等技術內容,點擊立即學習:
      服務器
      音視頻
      dpdk
      Linux內核

      posted @ 2022-08-28 18:59  飄雨的河  閱讀(292)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 午夜福利片1000无码免费| 日产精品99久久久久久| 久久青草国产精品一区| 庆安县| 亚洲av永久无码精品天堂久久| 日本一区三区高清视频| 中国女人熟毛茸茸A毛片| 欧洲精品码一区二区三区| 内射视频福利在线观看| 免费十八禁一区二区三区| 99热久久这里只有精品| 成人亚欧欧美激情在线观看| 国产亚洲精品久久77777| 在线免费观看亚洲天堂av| 亚洲熟女精品一区二区| 亚洲一区二区国产av| 国产av中文字幕精品| 扒开双腿疯狂进出爽爽爽| 国产亚洲精品久久久久婷婷图片| 在线亚洲妇色中文色综合| 国产成人精品免费视频大全| 亚洲午夜激情久久加勒比| 中文国产人精品久久蜜桃| 99久久婷婷国产综合精品| 91中文字幕一区二区| 国产精品一区二区三区三级| 99九九视频高清在线| 安泽县| 亚洲综合一区国产精品| 国产精品成人午夜久久| 粗了大了 整进去好爽视频| 亚欧成人精品一区二区乱| 亚洲综合日韩av在线| 亚洲欧美一区二区成人片| 制服丝袜人妻有码无码中文字幕| 久久综合开心激情五月天| 四虎成人在线观看免费| 日本一区三区高清视频| 一区二区偷拍美女撒尿视频| 在线A级毛片无码免费真人| 精品国产成人午夜福利|