<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      C++11線程池

      基于C++11實現線程池的工作原理.

      目錄

      基于C++11實現線程池的工作原理.
      簡介
      線程池的組成
      1、線程池管理器
      2、工作線程
      3、任務接口,
      4、任務隊列
      線程池工作的四種情況.
      1、主程序當前沒有任務要執行,線程池中的任務隊列為空閑狀態.
      2、主程序添加小于等于線程池中線程數量的任務.
      3、主程序添加任務數量大于當前線程池中線程數量的任務.
      4、主程序添加任務數量大于當前線程池中線程數量的任務,且任務緩沖隊列已滿.
      實現
      測試程序
      start() 、stop()
      addTask()、PriorityTaskQueue
      源碼下載

      簡介

      線程池(thread pool):一種線程的使用模式,線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可并發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度??捎镁€程數量應該取決于可用的并發處理器、處理器內核、內存、網絡sockets等的數量。

      線程池的組成

      1、線程池管理器
      創建一定數量的線程,啟動線程,調配任務,管理著線程池。
      本篇線程池目前只需要啟動(start()),停止方法(stop()),及任務添加方法(addTask).
      start()創建一定數量的線程池,進行線程循環.
      stop()停止所有線程循環,回收所有資源.
      addTask()添加任務.

      2、工作線程
      線程池中線程,在線程池中等待并執行分配的任務.
      本篇選用條件變量實現等待與通知機制.

      3、任務接口,
      添加任務的接口,以供工作線程調度任務的執行。

      4、任務隊列
      用于存放沒有處理的任務。提供一種緩沖機制
      同時任務隊列具有調度功能,高優先級的任務放在任務隊列前面;本篇選用priority_queue 與pair的結合用作任務優先隊列的結構.

      線程池工作的四種情況.

      假設我們的線程池大小為3,任務隊列目前不做大小限制.

      1、主程序當前沒有任務要執行,線程池中的任務隊列為空閑狀態.
      此情況下所有工作線程處于空閑的等待狀態,任務緩沖隊列為空.

      2、主程序添加小于等于線程池中線程數量的任務.
      此情況基于情形1,所有工作線程已處在等待狀態,主線程開始添加三個任務,添加后通知(notif())喚醒線程池中的線程開始取(take())任務執行. 此時的任務緩沖隊列還是空。

      3、主程序添加任務數量大于當前線程池中線程數量的任務.
      此情況發生情形2后面,所有工作線程都在工作中,主線程開始添加第四個任務,添加后發現現在線程池中的線程用完了,于是存入任務緩沖隊列。工作線程空閑后主動從任務隊列取任務執行.

      4、主程序添加任務數量大于當前線程池中線程數量的任務,且任務緩沖隊列已滿.
      此情況發生情形3且設置了任務緩沖隊列大小后面,主程序添加第N個任務,添加后發現池子中的線程用完了,任務緩沖隊列也滿了,于是進入等待狀態、等待任務緩沖隊列中的任務騰空通知。
      但是要注意這種情形會阻塞主線程,本篇暫不限制任務隊列大小,必要時再來優化.

      實現

      等待通知機制通過條件變量實現,Logger和CurrentThread,用于調試,可以無視.

      #ifndef _THREADPOOL_HH
      #define _THREADPOOL_HH
      
      #include <vector>
      #include <utility>
      #include <queue>
      #include <thread>
      #include <functional>
      #include <mutex>
      
      #include "Condition.hh"
      
      class ThreadPool{
      public:
        static const int kInitThreadsSize = 3;
        enum taskPriorityE { level0, level1, level2, };
        typedef std::function<void()> Task;
        typedef std::pair<taskPriorityE, Task> TaskPair;
      
        ThreadPool();
        ~ThreadPool();
      
        void start();
        void stop();
        void addTask(const Task&);
        void addTask(const TaskPair&);
      
      private:
        ThreadPool(const ThreadPool&);//禁止復制拷貝.
        const ThreadPool& operator=(const ThreadPool&);
      
        struct TaskPriorityCmp
        {
          bool operator()(const ThreadPool::TaskPair p1, const ThreadPool::TaskPair p2)
          {
              return p1.first > p2.first; //first的小值優先
          }
        };
      
        void threadLoop();
        Task take();
      
        typedef std::vector<std::thread*> Threads;
        typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks;
      
        Threads m_threads;
        Tasks m_tasks;
      
        std::mutex m_mutex;
        Condition m_cond;
        bool m_isStarted;
      };
      
      #endif
      
      //Cpp
      
      #include <assert.h>
      
      #include "Logger.hh" // debug
      #include "CurrentThread.hh" // debug
      #include "ThreadPool.hh"
      
      ThreadPool::ThreadPool()
        :m_mutex(),
        m_cond(m_mutex),
        m_isStarted(false)
      {
      
      }
      
      ThreadPool::~ThreadPool()
      {
        if(m_isStarted)
        {
          stop();
        }
      }
      
      void ThreadPool::start()
      {
        assert(m_threads.empty());
        m_isStarted = true;
        m_threads.reserve(kInitThreadsSize);
        for (int i = 0; i < kInitThreadsSize; ++i)
        {
          m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop, this)));
        }
      
      }
      
      void ThreadPool::stop()
      {
        LOG_TRACE << "ThreadPool::stop() stop.";
        {
          std::unique_lock<std::mutex> lock(m_mutex);
          m_isStarted = false;
          m_cond.notifyAll();
          LOG_TRACE << "ThreadPool::stop() notifyAll().";
        }
      
        for (Threads::iterator it = m_threads.begin(); it != m_threads.end() ; ++it)
        {
          (*it)->join();
          delete *it;
        }
        m_threads.clear();
      }
      
      
      void ThreadPool::threadLoop()
      {
        LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " start.";
        while(m_isStarted)
        {
          Task task = take();
          if(task)
          {
            task();
          }
        }
        LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " exit.";
      }
      
      void ThreadPool::addTask(const Task& task)
      {
        std::unique_lock<std::mutex> lock(m_mutex);
        /*while(m_tasks.isFull())
          {//when m_tasks have maxsize
            cond2.wait();
          }
        */
        TaskPair taskPair(level2, task);
        m_tasks.push(taskPair);
        m_cond.notify();
      }
      
      void ThreadPool::addTask(const TaskPair& taskPair)
      {
        std::unique_lock<std::mutex> lock(m_mutex);
        /*while(m_tasks.isFull())
          {//when m_tasks have maxsize
            cond2.wait();
          }
        */
        m_tasks.push(taskPair);
        m_cond.notify();
      }
      
      ThreadPool::Task ThreadPool::take()
      {
        std::unique_lock<std::mutex> lock(m_mutex);
        //always use a while-loop, due to spurious wakeup
        while(m_tasks.empty() && m_isStarted)
        {
          LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wait.";
          m_cond.wait(lock);
        }
      
        LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wakeup.";
      
        Task task;
        Tasks::size_type size = m_tasks.size();
        if(!m_tasks.empty() && m_isStarted)
        {
          task = m_tasks.top().second;
          m_tasks.pop();
          assert(size - 1 == m_tasks.size());
          /*if (TaskQueueSize_ > 0)
          {
            cond2.notify();
          }*/
        }
      
        return task;
      
      }
      

      測試程序
      start() 、stop()
      測試線程池基本的創建退出工作,及檢測資源是否正?;厥?

      int main()
      {
        {
        ThreadPool threadPool;
        threadPool.start();
      
        getchar();
        }
      
        getchar();
      
        return 0;
      }
      
      ./test.out 
      ```cpp
      2018-11-25 16:50:36.054805 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3680 start.
      2018-11-25 16:50:36.054855 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3680 wait.
      2018-11-25 16:50:36.055633 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3679 start.
      2018-11-25 16:50:36.055676 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3679 wait.
      2018-11-25 16:50:36.055641 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3681 start.
      2018-11-25 16:50:36.055701 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3681 wait.
      2018-11-25 16:50:36.055736 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3682 start.
      2018-11-25 16:50:36.055746 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3682 wait.
      
      2018-11-25 16:51:01.411792 [TRACE] [ThreadPool.cpp:36] [stop] ThreadPool::stop() stop.
      2018-11-25 16:51:01.411863 [TRACE] [ThreadPool.cpp:39] [stop] ThreadPool::stop() notifyAll().
      2018-11-25 16:51:01.411877 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3680 wakeup.
      2018-11-25 16:51:01.411883 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3680 exit.
      2018-11-25 16:51:01.412062 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3682 wakeup.
      2018-11-25 16:51:01.412110 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3682 exit.
      2018-11-25 16:51:01.413052 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3679 wakeup.
      2018-11-25 16:51:01.413098 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3679 exit.
      2018-11-25 16:51:01.413112 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3681 wakeup.
      2018-11-25 16:51:01.413141 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3681 exit.
      

      addTask()、PriorityTaskQueue
      測試添加任務接口,及優先任務隊列.

      主線程首先添加了5個普通任務、 1s后添加一個高優先級任務,當前3個線程中的最先一個空閑后,會最先執行后面添加的priorityFunc().

      std::mutex g_mutex;
      
      void priorityFunc()
      {
        for (int i = 1; i < 4; ++i)
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::lock_guard<std::mutex> lock(g_mutex);
            LOG_DEBUG << "priorityFunc() [" << i << "at thread [ " << CurrentThread::tid() << "] output";// << std::endl;
        }
      
      }
      
      void testFunc()
      {
        // loop to print character after a random period of time
        for (int i = 1; i < 4; ++i)
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::lock_guard<std::mutex> lock(g_mutex);
            LOG_DEBUG << "testFunc() [" << i << "] at thread [ " << CurrentThread::tid() << "] output";// << std::endl;
        }
      
      }
      
      
      int main()
      {
        ThreadPool threadPool;
        threadPool.start();
      
        for(int i = 0; i < 5 ; i++)
          threadPool.addTask(testFunc);
      
        std::this_thread::sleep_for(std::chrono::seconds(1));
      
        threadPool.addTask(ThreadPool::TaskPair(ThreadPool::level0, priorityFunc));
      
        getchar();
        return 0;
      }
      

      ./test.out

      2018-11-25 18:24:20.886837 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4121 start.
      2018-11-25 18:24:20.886893 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.
      2018-11-25 18:24:20.887580 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4120 start.
      2018-11-25 18:24:20.887606 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.
      2018-11-25 18:24:20.887610 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4122 start.
      2018-11-25 18:24:20.887620 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.
      2018-11-25 18:24:21.887779 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4120] output
      2018-11-25 18:24:21.887813 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output
      2018-11-25 18:24:21.888909 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output
      2018-11-25 18:24:22.888049 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4120] output
      2018-11-25 18:24:22.888288 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output
      2018-11-25 18:24:22.889978 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output
      2018-11-25 18:24:23.888467 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4120] output
      2018-11-25 18:24:23.888724 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.
      2018-11-25 18:24:23.888778 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output
      2018-11-25 18:24:23.888806 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.
      2018-11-25 18:24:23.890413 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output
      2018-11-25 18:24:23.890437 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.
      2018-11-25 18:24:24.889247 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [1at thread [ 4120] output
      2018-11-25 18:24:24.891187 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output
      2018-11-25 18:24:24.893163 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output
      2018-11-25 18:24:25.889567 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [2at thread [ 4120] output
      2018-11-25 18:24:25.891477 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output
      2018-11-25 18:24:25.893450 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output
      2018-11-25 18:24:26.890295 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [3at thread [ 4120] output
      2018-11-25 18:24:26.890335 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4120 wait.
      2018-11-25 18:24:26.892265 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output
      2018-11-25 18:24:26.892294 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4121 wait.
      2018-11-25 18:24:26.894274 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output
      2018-11-25 18:24:26.894299 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4122 wait.
      
      2018-11-25 18:24:35.359003 [TRACE] [ThreadPool.cpp:37] [stop] ThreadPool::stop() stop.
      2018-11-25 18:24:35.359043 [TRACE] [ThreadPool.cpp:42] [stop] ThreadPool::stop() notifyAll().
      2018-11-25 18:24:35.359061 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.
      2018-11-25 18:24:35.359067 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4120 exit.
      2018-11-25 18:24:35.359080 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.
      2018-11-25 18:24:35.359090 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4122 exit.
      2018-11-25 18:24:35.359123 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.
      2018-11-25 18:24:35.359130 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4121 exit.
      

      源碼下載
      如果有需要,可以訪問我的GitHub進行下載: https://github.com/BethlyRoseDaisley/ThreadPool

      源碼更新 ---2019-3-7
      1、去除優先隊列 該用deque
      2、去除condition currentthread 源文件
      3、文件重命名
      4、增加簡易logger

      作者 —— 艾露米婭娜
      出處:http://www.rzrgm.cn/ailumiyana/

      參考
      http://www.rzrgm.cn/Tattoo-Welkin/p/10335254.html

      posted @ 2021-05-09 10:09  demianzhang  閱讀(513)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 加勒比无码人妻东京热| 亚洲AV日韩AV永久无码电影| 亚洲欧洲日产国码无码久久99| 久久精品中文字幕免费| 国产微拍一区二区三区四区| 国产99久久亚洲综合精品西瓜tv| 四虎国产精品永久在线下载| jk白丝喷浆| 亚洲av无码乱码在线观看野外| 91久久精品国产性色也| 色熟妇人妻久久中文字幕| 无码人妻丰满熟妇区五十路在线| 躁躁躁日日躁| 99热门精品一区二区三区无码| 国产亚洲欧美精品久久久| 福利一区二区在线视频| 日韩69永久免费视频| 国产一区二区三区免费观看| 激情综合五月| 一个人看的www免费高清视频| 南江县| 亚洲av片在线免费观看| 国产AV福利第一精品| 激情97综合亚洲色婷婷五| 日韩高清不卡免费一区二区| 老鸭窝| 国产成人精彩在线视频| 亚洲免费最大黄页网站| 91亚洲国产成人久久蜜臀| 国产a级三级三级三级| 精品无码国产污污污免费| 少妇无套内谢免费视频| 2022最新国产在线不卡a| gogogo高清在线观看视频中文| 亚洲成av人片天堂网无码| 曰韩高清砖码一二区视频| 性色在线视频精品| 亚洲精品无码久久一线| 亚洲AV日韩AV激情亚洲| 日日摸夜夜添狠狠添欧美| 霍邱县|