線程模塊
概述
該模塊基于pthread實現。sylar說,由于c++11中的thread也是由pthread封裝實現的,并且沒有提供讀寫互斥量,讀寫鎖,自旋鎖等,所以自己封裝了pthread。包括以下類:
-
Thread:線程類,構造函數傳入線程入口函數和線程名稱,線程入口函數類型為void(),如果帶參數,則需要用std::bind進行綁定。線程類構造之后線程即開始運行,構造函數在線程真正開始運行之后返回。
-
線程同步類(這部分被拆分到mutex.h)中:
-
Semaphore: 計數信號量,基于sem_t實現
-
Mutex: 互斥鎖,基于pthread_mutex_t實現
-
RWMutex: 讀寫鎖,基于pthread_rwlock_t實現
-
Spinlock: 自旋鎖,基于pthread_spinlock_t實現
-
CASLock: 原子鎖,基于std::atomic_flag實現
線程模塊主要由Thread類實現
- class Thread:實現線程的封裝
關于線程id的問題,在獲取線程id時使用syscall獲得唯一的線程id
進程pid: getpid()
線程tid: pthread_self() //進程內唯一,但是在不同進程則不唯一。
線程pid: syscall(SYS_gettid) //系統內是唯一的
鎖模塊介紹
信號量
信號量(Semaphore)是一種用于多線程同步的機制,能夠控制多個線程對共享資源的訪問。信號量的關鍵作用是通過計數器來管理訪問權限,從而避免競爭條件。信號量有兩個主要操作:
- 等待(P 操作,wait 或 down):減少信號量的值。如果信號量的值為0,則調用線程將被阻塞,直到信號量的值大于0。
- 釋放(V 操作,signal 或 up):增加信號量的值。如果有線程因為等待操作而被阻塞,則喚醒其中一個線程。
有以下信號量函數:
- sem_init(): 初始化一個未命名的信號量。
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
// sem: 指向信號量對象的指針。
// pshared: 指定信號量是否在進程間共享。0 表示信號量在同一進程的線程間共享,非0表示信號量在進程間共享。
// value: 信號量的初始值。
- sem_destroy(): 銷毀一個未命名的信號量。
#include <semaphore.h>
int sem_destroy(sem_t *sem);
// sem: 指向信號量對象的指針。
- sem_wait(): 等待(減少)信號量。如果信號量的值為0,調用線程將阻塞直到信號量的值大于0。
#include <semaphore.h>
int sem_wait(sem_t *sem);
// sem: 指向信號量對象的指針。
- sem_trywait(): 嘗試等待(減少)信號量。如果信號量的值為0,該函數立即返回,并不會阻塞。
#include <semaphore.h>
int sem_trywait(sem_t *sem);
// sem: 指向信號量對象的指針。
- sem_post(): 釋放(增加)信號量。如果有其他線程因等待信號量而被阻塞,該函數會喚醒其中一個線程。
#include <semaphore.h>
int sem_post(sem_t *sem);
// sem: 指向信號量對象的指針。
- sem_getvalue(): 獲取信號量的當前值。
#include <semaphore.h>
int sem_getvalue(sem_t *sem);
// sem: 指向信號量對象的指針。
// sval: 指向整數的指針,用于存儲信號量的當前值。
Sylar對其進行了封裝,形成class Semaphore
// 信號量,它本質上是一個長整型的數
sem_t m_semaphore;
// 構造函數
Semaphore::Semaphore(uint32_t count) {
if(sem_init(&m_semaphore, 0, count)) {
throw std::logic_error("sem_init error");
}
}
// 析構函數
Semaphore::~Semaphore() {
sem_destroy(&m_semaphore);
}
// 獲取信號量
void Semaphore::wait() {
if(sem_wait(&m_semaphore)) {
throw std::logic_error("sem_wait error");
}
}
// 釋放信號量
void Semaphore::notify() {
if(sem_post(&m_semaphore)) {
throw std::logic_error("sem_post error");
}
}
互斥鎖
pthread_mutex_t是Pthreads庫中的一種互斥鎖(Mutex),用于在線程間提供同步機制,確保在多線程環境中對共享資源的互斥訪問。相關函數:
#include <pthread.h>
pthread_mutex_t mutex;
// 初始化
pthread_mutex_init(&mutex, NULL);
// 銷毀互斥鎖
pthread_mutex_destroy(&mutex);
// 加鎖互斥鎖
pthread_mutex_lock(&mutex);
// 嘗試加鎖互斥鎖
pthread_mutex_trylock(&mutex);
// 解鎖互斥鎖
pthread_mutex_unlock(&mutex);
為方便封裝各種鎖,這里定義了3個結構體,都在構造函數時自動lock,在析構時自動unlock,這樣可以簡化鎖的操作,避免忘記解鎖導致死鎖。
- ScopedLockImpl:用來分裝互斥量,自旋鎖,原子鎖
- ReadScopedLockImpl && WriteScopedLockImpl:用來封裝讀寫鎖
Sylar對其進行了封裝,形成class Mutex
// 互斥量
pthread_mutex_t m_mutex;
// 構造函數
Mutex () {
pthread_mutex_init(&m_mutex, nullptr);
}
// 析構函數
~Mutex () {
pthread_mutex_destroy(&m_mutex);
}
// lock(加鎖)
void lock() {
pthread_mutex_lock(&m_mutex);
}
// unlock(解鎖)
void unlock() {
pthread_mutex_unlock(&m_mutex);
}
自旋鎖
與mutex不同,自旋鎖不會使線程進入睡眠狀態,而是在獲取鎖時進行忙等待,直到鎖可用。當鎖被釋放時,等待獲取鎖的線程將立即獲取鎖,從而避免了線程進入和退出睡眠狀態的額外開銷。
Sylar中基于pthread_spinlock_t及其相關函數封裝了class Spinlock
// 自旋鎖定義
pthread_spinlock_t m_mutex;
// 構造函數
Spinlock() {
pthread_spin_init(&m_mutex, 0);
}
// 析構函數
~Spinlock() {
pthread_spin_destroy(&m_mutex);
}
// 加鎖
void lock() {
pthread_spin_lock(&m_mutex);
}
// 解鎖
void unlock() {
pthread_spin_unlock(&m_mutex);
}
讀寫鎖
讀寫鎖是一種同步機制,用于在多線程環境下對共享資源進行訪問控制。與互斥鎖不同,讀寫鎖允許多個線程同時讀取共享資源,但只允許一個線程寫入共享資源。這樣可以提高程序的性能和效率,但需要注意避免讀寫鎖死鎖等問題。
Sylar基于pthread_rwlock_t及其相關函數封裝了class RWMutex
// 讀寫鎖
pthread_rwlock_t m_lock;
// RWMutex(構造函數)
RWMutex() {
pthread_rwlock_init(&m_lock, nullptr);
}
// ~RWMutex(析構函數)
~RWMutex() {
pthread_rwlock_destroy(&m_lock);
}
// rdlock(加讀鎖)
void rdlock() {
pthread_rwlock_rdlock(&m_lock);
}
// wrlock(加寫鎖)
void wrlock() {
pthread_rwlock_wrlock(&m_lock);
}
// unlock(解鎖)
void unlock() {
pthread_rwlock_unlock(&m_lock);
}
原子鎖
在多線程編程中,原子標志位通常用于實現簡單的鎖機制,以確保對共享資源的訪問是互斥的。使用atomic_flag.clear()可以輕松地重置標志位,使之再次可用于控制對共享資源的訪問。需要注意的是,由于該函數是一個原子操作,因此可以安全地在多個線程之間使用,而無需擔心競態條件和數據競爭等問題
class CASLock(原子鎖)實現如下:
- 成員變量
// 線程id
pid_t m_id = -1;
// 線程結構
pthread_t m_thread = 0;
// 線程執行函數
std::function<void()> m_cb;
// 線程名稱
std::string m_name;
// 信號量
Semaphore m_semaphore;
// m_mutex是一個原子布爾類型,具有特殊的原子性質,可以用于實現線程間同步和互斥。
// volatile關鍵字表示該變量可能會被異步修改,因此編譯器不會對其進行優化,而是每次都從內存中讀取該變量的值。
volatile std::atomic_flag m_mutex;
// CASLock(構造函數)
CASLock () {
m_mutex.clear();
}
// lock(加鎖)
void lock() {
while (std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));
}
// unlock(解鎖)
void unlock() {
std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);
}
線程模塊
class Thread的實現
定義了兩個線程局部變量用于指向當前線程以及線程的名稱。
static thread_local是C++中的一個關鍵字組合,用于定義靜態線程本地存儲變量。具體來說,當一個變量被聲明為static thread_local時,它會在每個線程中擁有自己獨立的靜態實例,并且對其他線程不可見。這使得變量可以跨越多個函數調用和代碼塊,在整個程序運行期間保持其狀態和值不變。
需要注意的是,由于靜態線程本地存儲變量是線程特定的,因此它們的初始化和銷毀時機也與普通靜態變量不同。具體來說,在每個線程首次訪問該變量時會進行初始化,在線程結束時才會進行銷毀,而不是在程序啟動或運行期間進行一次性初始化或銷毀。
// 指向當前線程
static thread_local Thread *t_thread = nullptr;
// 指向線程名稱
static thread_local std::string t_thread_name = "UNKNOW";
- Thread(構造函數):初始化線程執行函數、線程名稱,創建新線程。
// thread:指向pthread_t類型的指針,用于返回新線程的ID。
// attr:指向pthread_attr_t類型的指針,該結構體包含一些有關新線程屬性的信息??梢詫⑵湓O置為NULL以使用默認值。
// start_routine:是指向新線程函數的指針,該函數將在新線程中運行。該函數必須采用一個void類型的指針作為參數,并返回一個void類型的指針。
// arg:是指向新線程函數的參數的指針。如果不需要傳遞參數,則可以將其設置為NULL。
Thread::Thread(std::function<void()> cb, const std::string &name)
: m_cb(cb)
, m_name(name) {
if (name.empty()) {
m_name = "UNKNOW";
}
int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
if (rt) {
SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt=" << rt
<< " name=" << name;
throw std::logic_error("pthread_create error");
}
m_semaphore.wait();
}
調用pthread_create函數后,將會創建一個新線程,并開始執行通過start_routine傳遞給它的函數。新線程的ID將存儲在thread指向的變量中。請注意,新線程將在與調用pthread_create函數的線程并發執行的情況下運行。
- ~Thread(析構函數)
Thread::~Thread() {
if (m_thread) {
pthread_detach(m_thread);
}
}
- join(等待線程執行完成)
當調用 pthread_join() 時,當前線程會阻塞,直到指定的線程完成執行。一旦線程結束,當前線程就會恢復執行,并且可以通過 retval 參數來獲取線程的返回值。如果不關心線程的返回值,也可以將 retval 參數設置為 NULL。成功:返回 0 表示線程成功退出。
// thread:要等待的線程ID。
// retval:指向指針的指針,用于存儲線程返回的值。如果不需要獲取返回值,則可以將其設置為NULL。
void Thread::join() {
if (m_thread) {
int rt = pthread_join(m_thread, nullptr);
if (rt) {
SYLAR_LOG_ERROR(g_logger) << "pthread_join thread fail, rt=" << rt
<< " name=" << m_name;
throw std::logic_error("pthread_join error");
}
m_thread = 0;
}
}
- run(線程執行函數)
通過信號量,能夠確保構造函數在創建線程之后會一直阻塞,直到run方法運行并通知信號量,構造函數才會返回。
在構造函數中完成線程的啟動和初始化操作,可能會導致線程還沒有完全啟動就被調用,從而導致一些未知的問題。因此,在出構造函數之前,確保線程先跑起來,保證能夠初始化id,可以避免這種情況的發生。同時,這也可以保證線程的安全性和穩定性。
void *Thread::run(void *arg) {
Thread *thread = (Thread *)arg;
t_thread = thread;
t_thread_name = thread->m_name;
thread->m_id = sylar::GetThreadId();
pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
std::function<void()> cb;
cb.swap(thread->m_cb);
thread->m_semaphore.notify();
cb();
return 0;
}
總結
- 對日志系統的臨界資源進行互斥訪問時,使用自旋鎖而不是互斥鎖。
- mutex使用系統調用將線程阻塞,并等待其他線程釋放鎖后再喚醒它,這種方式適用于長時間持有鎖的情況。而spinlock在獲取鎖時忙等待,即不斷地檢查鎖狀態是否可用,如果不可用則一直循環等待,因此適用于短時間持有鎖的情況。
- 由于mutex會將線程阻塞,因此在高并發情況下可能會出現線程頻繁地進入和退出睡眠狀態,導致系統開銷大。而spinlock雖然不會使線程進入睡眠狀態,但會消耗大量的CPU時間,在高并發情況下也容易導致性能問題。
- 另外,當一個線程嘗試獲取已經被其他線程持有的鎖時,mutex會將該線程阻塞,而spinlock則會在自旋等待中消耗CPU時間。如果鎖的持有時間較短,則spinlock比mutex更適合使用;如果鎖的持有時間較長,則mutex比spinlock
- 在構造函數中創建子進程并等待其完成執行是一種常見的技術,可以通過信號量(Semaphore)來實現主線程等待子線程完成。
- 首先,在主線程中創建一個Semaphore對象并初始化為0。然后,在構造函數中創建子線程,并將Semaphore對象傳遞給子線程。子線程將執行所需的操作,并在最后使用Semaphore對象發出信號通知主線程它已經完成了工作。
- 主線程在構造函數中調用Semaphore對象的wait方法,這會使主線程阻塞直到收到信號并且Semaphore對象的計數器值大于0。當子線程發出信號時,Semaphore對象的計數器值增加1,因此主線程可以繼續執行構造函數的剩余部分。

浙公網安備 33010602011771號