Linux中讀寫自旋鎖rwlock的實現 - 詳解
文章目錄
- 一、判斷讀寫鎖是否鎖定`rwlock_is_locked`
- 二、讀寫鎖初始化`rwlock_init`
- 三、嘗試加寫鎖的函數`_raw_write_trylock`
- 四、寫鎖獲取失敗后的自旋等待`__write_lock_failed`
- 五、寫鎖獲取的內聯匯編實現`__build_write_lock`
- 六、非搶占式加寫鎖`_raw_write_lock`
- 七、搶占式加寫鎖`__preempt_write_lock`
- 八、加寫鎖包裝函數`_write_lock`
- 九、解除寫鎖`_write_unlock`
- 十、讀鎖獲取失敗后的自旋等待`__read_lock_failed`
- 十一、`__build_read_lock`
- 十二、`_raw_read_lock`
- 十三、`_read_lock`
- 十四、解除讀鎖`_read_unlock`
- 十五、總結
一、判斷讀寫鎖是否鎖定rwlock_is_locked
#define RW_LOCK_BIAS 0x01000000
#define rwlock_is_locked(x) ((x)->lock != RW_LOCK_BIAS)
RW_LOCK_BIAS
- 解鎖狀態的標志位
rwlock_is_locked
- 如果
lock和解鎖標志位一致則返回0,表示未鎖定 - 不一致返回1,表示已鎖定
二、讀寫鎖初始化rwlock_init
#define RW_LOCK_BIAS 0x01000000
typedef struct {
volatile unsigned int lock;
} rwlock_t;
#define RW_LOCK_UNLOCKED (rwlock_t) { RW_LOCK_BIAS }
#define rwlock_init(x) do { *(x) = RW_LOCK_UNLOCKED; } while(0)
RW_LOCK_UNLOCKED
- 將讀寫鎖初始化為未鎖定狀態
rwlock_init
- 動態初始化,x是一個讀寫鎖結構體的指針
三、嘗試加寫鎖的函數_raw_write_trylock
#define RW_LOCK_BIAS 0x01000000
static inline int _raw_write_trylock(rwlock_t *lock)
{
atomic_t *count = (atomic_t *)lock;
if (atomic_sub_and_test(RW_LOCK_BIAS, count))
return 1;
atomic_add(RW_LOCK_BIAS, count);
return 0;
}
該函數是不阻塞加寫鎖函數
(atomic_t *)lock
- 對
lock進行強轉,因為rwlock_t結構體和atomic_t結構體的成員類型是一致的
atomic_sub_and_test
- 將
count減去RW_LOCK_BIAS - 如果結果為0則返回1
- 否則返回0
atomic_add(RW_LOCK_BIAS, count);
- 將減去的值加回去,恢復原始狀態
四、寫鎖獲取失敗后的自旋等待__write_lock_failed
asm(
".section .sched.text\n"
".align 4\n"
".globl __write_lock_failed\n"
"__write_lock_failed:\n\t"
LOCK "addl $" RW_LOCK_BIAS_STR ",(%eax)\n"
"1: rep; nop\n\t"
"cmpl $" RW_LOCK_BIAS_STR ",(%eax)\n\t"
"jne 1b\n\t"
LOCK "subl $" RW_LOCK_BIAS_STR ",(%eax)\n\t"
"jnz __write_lock_failed\n\t"
"ret"
);
1.函數整體功能
__write_lock_failed:
- 場景: 當嘗試獲取寫鎖失敗時調用
- 目的: 自旋等待直到獲取寫鎖成功
- 調用約定: 鎖的地址通過
eax寄存器傳遞
2.代碼逐行分析
2.1. 恢復鎖狀態
LOCK "addl $" RW_LOCK_BIAS_STR ",(%eax)"
作用: 撤銷之前失敗的寫鎖獲取嘗試
2.2. 自旋等待循環
"1: rep; nop\n\t" // 短暫的暫停,節約功耗
"cmpl $" RW_LOCK_BIAS_STR ",(%eax)\n\t" // 檢查鎖是否空閑
"jne 1b\n\t" // 如果不空閑,繼續循環
等待條件: lock == RW_LOCK_BIAS
- 這表示鎖處于完全空閑狀態
- 沒有讀者,也沒有寫者
2.3. 再次嘗試獲取寫鎖
LOCK "subl $" RW_LOCK_BIAS_STR ",(%eax)\n\t" // 嘗試獲取寫鎖
"jnz __write_lock_failed\n\t" // 如果失敗,重試
"ret" // 成功,返回
關鍵操作:
lock -= RW_LOCK_BIAS- 如果結果為0:獲取成功
- 如果結果非0:獲取失敗,重新開始
五、寫鎖獲取的內聯匯編實現__build_write_lock
#define RW_LOCK_BIAS_STR "0x01000000"
#define __build_write_lock_ptr(rw, helper) \
asm volatile(LOCK "subl $" RW_LOCK_BIAS_STR ",(%0)\n\t" \
"jz 1f\n" \
"call " helper "\n\t" \
"1:\n" \
::"a" (rw) : "memory")
#define __build_write_lock(rw, helper) do { \
if (__builtin_constant_p(rw)) \
__build_write_lock_const(rw, helper); \
else \
__build_write_lock_ptr(rw, helper); \
} while (0)
1.宏定義分析
1.1. 基礎常量
#define RW_LOCK_BIAS_STR "0x01000000"
這是表示讀寫鎖空閑標志的字符串形式,用于內聯匯編
1.2. 指針版本的寫鎖獲取
#define __build_write_lock_ptr(rw, helper) \
asm volatile(LOCK "subl $" RW_LOCK_BIAS_STR ",(%0)\n\t" \
"jz 1f\n" \
"call " helper "\n\t" \
"1:\n" \
::"a" (rw) : "memory")
2.匯編代碼詳細解析
LOCK "subl $" RW_LOCK_BIAS_STR ",(%0)\n\t" # 原子減BIAS
"jz 1f\n" # 如果結果為0,跳轉到標簽1
"call " helper "\n\t" # 否則調用helper函數
"1:\n" # 標簽1:繼續執行
2.1執行流程
情況1: 快速路徑(獲取成功)
初始: lock = RW_LOCK_BIAS (0x01000000) - 鎖空閑
執行: lock -= RW_LOCK_BIAS → lock = 0
條件: 結果=0 → jz跳轉 → 跳過call指令
結果: 直接成功,無需函數調用
情況2: 慢速路徑(獲取失敗)
初始: lock < RW_LOCK_BIAS - 鎖被占用
執行: lock -= RW_LOCK_BIAS → lock ≠ 0
條件: 結果≠0 → 不跳轉 → 執行call指令
結果: 調用helper函數處理競爭
3.輸入約束和破壞描述符
::"a" (rw) : "memory"
"a" (rw): 輸入操作數,要求rw(鎖指針)放入eax寄存器"memory": 內存破壞描述符,確保內存訪問順序
4.編譯時常數優化
if (__builtin_constant_p(rw))
__build_write_lock_const(rw, helper);
else
__build_write_lock_ptr(rw, helper);
__builtin_constant_p() 是GCC內置函數:
- 如果
rw是編譯時常數,使用優化版本 - 如果
rw是變量,使用指針版本
六、非搶占式加寫鎖_raw_write_lock
static inline void _raw_write_lock(rwlock_t *rw)
{
__build_write_lock(rw, "__write_lock_failed");
}
調用__build_write_lock函數,指定獲取寫鎖失敗時調用__write_lock_failed
七、搶占式加寫鎖__preempt_write_lock
static inline void __preempt_write_lock(rwlock_t *lock)
{
if (preempt_count() > 1) {
_raw_write_lock(lock);
return;
}
do {
preempt_enable();
while (rwlock_is_locked(lock))
cpu_relax();
preempt_disable();
} while (!_raw_write_trylock(lock));
}
1.函數功能
static inline void __preempt_write_lock(rwlock_t *lock)
- 目的: 在支持內核搶占的環境中安全獲取寫鎖
- 特點: 在等待鎖時允許被搶占
2.代碼邏輯分析
2.1.快速路徑檢查
if (preempt_count() > 1) {
_raw_write_lock(lock);
return;
}
preempt_count() 含義:
preempt_count() = 0: 可搶占狀態preempt_count() > 0: 不可搶占狀態(在中斷、軟中斷、持有自旋鎖等)
條件解釋:
- 如果
preempt_count() > 1,說明已經在原子上下文中 - 此時不能被搶占,直接使用非搶占版本的鎖獲取
2.2. 慢速路徑循環
do {
preempt_enable(); // 允許搶占
while (rwlock_is_locked(lock)) // 檢查鎖是否被占用
cpu_relax(); // 等待時讓CPU進入低功耗
preempt_disable(); // 禁止搶占
} while (!_raw_write_trylock(lock)); // 嘗試獲取鎖,不阻塞
2.3.場景1: 在原子上下文中
進程A: preempt_count() = 2 (持有自旋鎖)
調用 __preempt_write_lock():
- 條件成立,直接調用 _raw_write_lock()
- 可能自旋等待,但不會被搶占
2.4.場景2: 在進程上下文中,鎖空閑
進程A: preempt_count() = 0
調用 __preempt_write_lock():
第一次循環:
preempt_enable() → 允許搶占
rwlock_is_locked() → false (鎖空閑)
preempt_disable() → 禁止搶占
_raw_write_trylock() → 成功獲取,退出循環
2.5.場景3: 在進程上下文中,鎖被占用
進程A: preempt_count() = 0
調用 __preempt_write_lock():
第一次循環:
preempt_enable() → 允許搶占
while循環: 發現鎖被占用 → cpu_relax() 等待
// 在等待期間可能被更高優先級進程搶占!
被喚醒后繼續等待...
鎖釋放后:
preempt_disable() → 禁止搶占
_raw_write_trylock() → 可能失敗(競態條件)
第二次循環: 重試...
八、加寫鎖包裝函數_write_lock
void __lockfunc _write_lock(rwlock_t *lock)
{
preempt_disable();
if (unlikely(!_raw_write_trylock(lock)))
__preempt_write_lock(lock);
}
preempt_disable();
- 先禁用內核搶占,表示這個函數加寫鎖不允許搶占
_raw_write_trylock(lock)
- 嘗試獲取寫鎖
- 如果失敗則調用加寫鎖函數
__preempt_write_lock - 因為當前處于原子上下文中,所以后面必然調用
_raw_write_lock函數
九、解除寫鎖_write_unlock
#define _raw_write_unlock(rw) asm volatile("lock ; addl $" RW_LOCK_BIAS_STR ",%0":"=m" ((rw)->lock) : : "memory")
void __lockfunc _write_unlock(rwlock_t *lock)
{
_raw_write_unlock(lock);
preempt_enable();
}
_raw_write_unlock
- 將解釋標志加回到
lock - 啟用搶占
十、讀鎖獲取失敗后的自旋等待__read_lock_failed
asm(
".section .sched.text\n"
".align 4\n"
".globl __read_lock_failed\n"
"__read_lock_failed:\n\t"
LOCK "incl (%eax)\n"
"1: rep; nop\n\t"
"cmpl $1,(%eax)\n\t"
"js 1b\n\t"
LOCK "decl (%eax)\n\t"
"js __read_lock_failed\n\t"
"ret"
);
1.函數整體功能
__read_lock_failed:
- 場景: 當嘗試獲取讀鎖失敗時調用
- 目的: 自旋等待直到成功獲取讀鎖
- 調用約定: 鎖的地址通過
eax寄存器傳遞
2.代碼逐行分析
2.1. 恢復鎖狀態
LOCK "incl (%eax)\n"
作用: 撤銷之前失敗的讀鎖獲取嘗試
執行前:
- 讀鎖嘗試:
lock-- - 但發現鎖被寫者占用(鎖值 <= 0),需要恢復
執行后:
lock++恢復鎖計數- 回到嘗試獲取之前的狀態
2.2. 自旋等待循環
"1: rep; nop\n\t" // 短暫的暫停,節約功耗
"cmpl $1,(%eax)\n\t" // 比較鎖值和1
"js 1b\n\t" // 如果鎖值 < 0,繼續循環
等待條件: lock >= 0
js(Jump if Sign)在結果為負時跳轉- 所以循環條件是:鎖值 == 0(有寫者持有鎖)
- 退出條件是:鎖值 > 0(沒有寫者)
2.3. 再次嘗試獲取讀鎖
LOCK "decl (%eax)\n\t" // 嘗試獲取讀鎖
"js __read_lock_failed\n\t" // 如果失敗,重試
"ret" // 成功,返回
關鍵操作:
lock--嘗試減少鎖計數(獲取讀鎖)- 如果結果 < 0:獲取失敗(寫者出現),重新開始
- 如果結果 >= 0:獲取成功,返回
十一、__build_read_lock
#define __build_read_lock_ptr(rw, helper) \
asm volatile(LOCK "subl $1,(%0)\n\t" \
"jns 1f\n" \
"call " helper "\n\t" \
"1:\n" \
::"a" (rw) : "memory")
#define __build_read_lock(rw, helper) do { \
if (__builtin_constant_p(rw)) \
__build_read_lock_const(rw, helper); \
else \
__build_read_lock_ptr(rw, helper); \
} while (0)
1.匯編代碼詳細解析
LOCK "subl $1,(%0)\n\t" # 原子減1
"jns 1f\n" # 如果結果非負,跳轉到標簽1
"call " helper "\n\t" # 否則調用helper函數
"1:\n" # 標簽1:繼續執行
情況1: 快速路徑(獲取成功)
初始: lock > 0 (沒有寫者持有鎖)
執行: lock -= 1 → lock >= 0
條件: 結果非負 → jns跳轉 → 跳過call指令
結果: 直接成功,無需函數調用
情況2: 慢速路徑(獲取失敗)
初始: lock == 0 (寫者持有鎖)
執行: lock -= 1 → lock < 0 (負)
條件: 結果為負 → jns不跳轉 → 執行call指令
結果: 調用helper函數處理競爭
十二、_raw_read_lock
static inline void _raw_read_lock(rwlock_t *rw)
{
#ifdef CONFIG_DEBUG_SPINLOCK
BUG_ON(rw->magic != RWLOCK_MAGIC);
#endif
__build_read_lock(rw, "__read_lock_failed");
}
- 調用
__build_read_lock - 指定獲取失敗時調用
__read_lock_failed函數
十三、_read_lock
void __lockfunc _read_lock(rwlock_t *lock)
{
preempt_disable();
_raw_read_lock(lock);
}
- 禁用搶占
- 獲取讀鎖
十四、解除讀鎖_read_unlock
#define _raw_read_unlock(rw) asm volatile("lock ; incl %0" :"=m" ((rw)->lock) : : "memory")
void __lockfunc _read_unlock(rwlock_t *lock)
{
_raw_read_unlock(lock);
preempt_enable();
}
_raw_read_unlock
- 給
lock加1,恢復初始 - 啟用搶占
十五、總結
通過確定讀寫鎖的初始值RW_LOCK_BIAS為0x01000000,其次加一次寫鎖就直接減去0x01000000,而加一次讀鎖只減1,這樣就可以寫鎖只能加一次,再加其他鎖lock就變負了,而讀鎖可以加0x01000000這么多次
浙公網安備 33010602011771號