基本事務(wù)操作:
任何數(shù)據(jù)庫(kù)都必須要保證一種原子執(zhí)行操作:最基本的原子執(zhí)行操作肯定是需要提供:
舉一個(gè)例子來(lái)說(shuō)明: 當(dāng)對(duì)某個(gè)Key 做一個(gè)統(tǒng)計(jì): 可能不同的Client做它那部分的統(tǒng)計(jì),一段時(shí)間后,服務(wù)器端需要得出那個(gè)key的具體值
Client1: GET number
number = number +N1;
SET number number+N1;
Client2: GET number
number = number +N2;
SET number number+N2;
原本來(lái)講 ,期望的值是NUMBER=NUMBER+N1+N2; 但是可能結(jié)果有其他的可能性,需要將上面的3個(gè)操作原子化,也就是這樣的操作流是一個(gè)完整體,而不讓pipeline被打亂~!
REDIS事務(wù)機(jī)制
像上述情況 必須得以解決 不然redis很難做?作者提供了2個(gè)事務(wù)機(jī)制
利用multi/exec來(lái)完成 multi被認(rèn)為是放在同一個(gè)序列中的,按照序列化去執(zhí)行命令操作
看看源碼是如何寫的:
MULTI操作
void multiCommand(redisClient *c) {
if (c->flags & REDIS_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
c->flags |= REDIS_MULTI;
addReply(c,shared.ok);
}
每個(gè)redisClient 在同一時(shí)間都只能壓入一個(gè)multi 首先檢測(cè)client是否含有multi標(biāo)記,如果沒有 就將標(biāo)記位置為REDIS_MULTI.
命令入隊(duì)操作
redis設(shè)計(jì)中,對(duì)于某個(gè)redis客戶端來(lái)講,server端首先需要查看是否含有MULTI標(biāo)記位【src/redis.c】來(lái)判斷是否該講multi命令寫入到待執(zhí)行隊(duì)列中~!如果滿足要求 就執(zhí)行下面函數(shù)體:
在執(zhí)行MULTI之后 都需要做 一個(gè)命令入隊(duì)操作:
【src/multi.c】
- void queueMultiCommand(redisClient *c) {
- multiCmd *mc;
- int j;
- c->mstate.commands = zrealloc(c->mstate.commands,
- sizeof(multiCmd)*(c->mstate.count+1));
- mc = c->mstate.commands+c->mstate.count;
- printf("mc:%p\n",mc);
- mc->cmd = c->cmd;
- mc->argc = c->argc;
- mc->argv = zmalloc(sizeof(robj*)*c->argc);
- printf("mc->argv:%d\n",sizeof(robj*)*c->argc);
- memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
- for (j = 0; j < c->argc; j++)
- incrRefCount(mc->argv[j]);
- c->mstate.count++;
- }
- typedef struct multiCmd {
- robj **argv;
- int argc;
- struct redisCommand *cmd;
- } multiCmd;
line 5:給mstate重新分配一個(gè)命令所需要的空間 預(yù)分配count+1的指針
line 6: 指針操作,定位到commands的count個(gè)命令指針
line11以后:申請(qǐng)c->argc個(gè)robj*地址,將c中的argv的內(nèi)容都復(fù)制給mc的argv中 這里的robj指針暫時(shí)沒有看懂~!![]()
![]()
執(zhí)行命令操作
執(zhí)行主體函數(shù)就是下面的
- void execCommand(redisClient *c) {
- int j;
- robj **orig_argv;
- int orig_argc;
- struct redisCommand *orig_cmd;
- int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
- if (!(c->flags & REDIS_MULTI)) {
- addReplyError(c,"EXEC without MULTI-----SFWTOMS");
- return;
- }
- /* Check if we need to abort the EXEC because:
- * 1) Some WATCHed key was touched.
- * 2) There was a previous error while queueing commands.
- * A failed EXEC in the first case returns a multi bulk nil object
- * (technically it is not an error but a special behavior), while
- * in the second an EXECABORT error is returned. */
- if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
- addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
- shared.nullmultibulk);
- discardTransaction(c);
- goto handle_monitor;
- }
- /* Exec all the queued commands */
- unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
- orig_argv = c->argv;
- orig_argc = c->argc;
- orig_cmd = c->cmd;
- addReplyMultiBulkLen(c,c->mstate.count);
- for (j = 0; j < c->mstate.count; j++) {
- c->argc = c->mstate.commands[j].argc;
- c->argv = c->mstate.commands[j].argv;
- c->cmd = c->mstate.commands[j].cmd;
- /* Propagate a MULTI request once we encounter the first write op.
- * This way we'll deliver the MULTI/..../EXEC block as a whole and
- * both the AOF and the replication link will have the same consistency
- * and atomicity guarantees. */
- if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
- execCommandPropagateMulti(c);
- must_propagate = 1;
- }
- call(c,REDIS_CALL_FULL);
- /* Commands may alter argc/argv, restore mstate. */
- c->mstate.commands[j].argc = c->argc;
- c->mstate.commands[j].argv = c->argv;
- c->mstate.commands[j].cmd = c->cmd;
- }
- c->argv = orig_argv;
- c->argc = orig_argc;
- c->cmd = orig_cmd;
- discardTransaction(c);
- /* Make sure the EXEC command will be propagated as well if MULTI
- * was already propagated. */
- if (must_propagate) server.dirty++;
- handle_monitor:
- /* Send EXEC to clients waiting data from MONITOR. We do it here
- * since the natural order of commands execution is actually:
- * MUTLI, EXEC, ... commands inside transaction ...
- * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
- * table, and we do it here with correct ordering. */
- if (listLength(server.monitors) && !server.loading)
- replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
- }
從line27開始: 逐步執(zhí)行每個(gè)指令
must_propagate 初始是0 經(jīng)過(guò)line 40-41的一個(gè)2重判斷:
發(fā)現(xiàn)c的命名不是只含有讀的成分,也就是含有寫的成分,execCommandPropagateMulti就成為必須的了
execCommandPropagateMulti 這個(gè)function是用來(lái)給所有的SLAVE和AOF文件發(fā)出這個(gè)命令, 要異步到SLAVE中 為啥要這樣做呢?
首先想下: SLAVE在所有讀操作是無(wú)需理會(huì)的,不會(huì)改變數(shù)據(jù) 寫操作是必須得處理的~! AOF也是同樣一個(gè)道理
而40-43只會(huì)執(zhí)行一次 所以也就有一個(gè)mush_propagate就只有一個(gè)變量
執(zhí)行45:調(diào)用命令
執(zhí)行完成之后就進(jìn)行銷毀命令 然后將dirty更改變量
所以可以看出 整個(gè)執(zhí)行過(guò)程不會(huì)去支持事務(wù)的回滾機(jī)制,不管命令M是否執(zhí)行成功 都不會(huì)影響M+1的一個(gè)執(zhí)行
REDIS樂觀鎖
利用上面的事務(wù)機(jī)制 你會(huì)發(fā)現(xiàn)上面的任務(wù)是最開始的那個(gè)例子也是無(wú)法完全避免的 REDIS提出一種樂觀鎖的機(jī)制: 【我個(gè)人認(rèn)為很有技巧性 】
check-and-set:檢查并且重新更新 什么意思呢 就是說(shuō)get 和set能夠同時(shí)保持本地原子性,不會(huì)被其他客戶端干擾~!
利用調(diào)用watch命令 來(lái)監(jiān)控redis某個(gè)數(shù)據(jù)庫(kù)的某些Key 確保某個(gè)Key是不會(huì)被其他客戶來(lái)修改 如果其他客戶想要修改 那么這個(gè)redis就會(huì)執(zhí)行一個(gè)任務(wù)失敗的操作
你會(huì)怎么寫呢?
首先要有一個(gè)監(jiān)控watchkey的數(shù)據(jù)結(jié)構(gòu) 存放被監(jiān)控的watchkey 如果每次做修改操作時(shí),不同redisClient來(lái)嘗試修改這個(gè)操作時(shí),應(yīng)該都會(huì)檢查是不是這個(gè)watchkey 如果是 就不讓其修改。
具體看下源代碼如何實(shí)現(xiàn):
這是WATCH的更底層實(shí)現(xiàn)
入口參數(shù):客戶端信息指針c
被監(jiān)控的key
line 8-13: 判斷是否是不是含有這個(gè)watchkey
如果含有這個(gè)key 就不需要監(jiān)控 映射同一個(gè)key2個(gè)條件: db相同 內(nèi)容相同
line 15 -20 如果沒有key 就添加這個(gè)key
注意line18:
watch_key是一個(gè)觀察key字典,把所有的被觀察的key都放在一個(gè)dic結(jié)構(gòu)體中。
怎么添加key和value呢?
key: 被觀察的key
value:client客戶端
這樣更加驗(yàn)證了前面用void*作為value的必要性啊~!
這里就又有一個(gè)watch_key 新的字典
加入樂觀鎖
- /* Watch for the specified key */
- void watchForKey(redisClient *c, robj *key) {
- list *clients = NULL;
- listIter li;
- listNode *ln;
- watchedKey *wk;
- /* Check if we are already watching for this key */
- listRewind(c->watched_keys,&li);
- while((ln = listNext(&li))) {
- wk = listNodeValue(ln);
- if (wk->db == c->db && equalStringObjects(key,wk->key))
- return; /* Key already watched */
- }
- /* This key is not already watched in this DB. Let's add it */
- clients = dictFetchValue(c->db->watched_keys,key);
- if (!clients) {
- clients = listCreate();
- dictAdd(c->db->watched_keys,key,clients);
- incrRefCount(key);
- }
- listAddNodeTail(clients,c);
- /* Add the new key to the list of keys watched by this client */
- wk = zmalloc(sizeof(*wk));
- wk->key = key;
- wk->db = c->db;
- incrRefCount(key);
- listAddNodeTail(c->watched_keys,wk);
- }
因?yàn)檫@里確實(shí)有點(diǎn)抽象,我利用圖標(biāo)加上文字來(lái)說(shuō)明:
Step 1:
尋找本客戶端是否還有相同的key
發(fā)現(xiàn)沒有相同的key Ok
進(jìn)行Step 2: 就講new_key里進(jìn)行加入到watch_keys的數(shù)據(jù)庫(kù)里
加入的方式是Key:就是watchkey
而value:是一個(gè)List指針 如果含有watchkey 則把redisClient加入到改List指針末尾
Step3: NewKey在第一步中的WATCH_Key中
touch樂觀鎖
如果觸碰到樂觀鎖,會(huì)怎么樣呢? 不管怎么樣,至少要保證一點(diǎn):不能再樂觀鎖解除之前執(zhí)行這個(gè)key的所有寫操作
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as REDIS_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
while((ln = listNext(&li))) {
redisClient *c = listNodeValue(ln);
c->flags |= REDIS_DIRTY_CAS;
}
}
在這里可以看出: 觸碰了之后,主調(diào)客戶端沒有失敗,而那么加鎖監(jiān)控的客戶端是失敗的
這里的touchWatchedKey只有看了被調(diào)用的例子才能理解真正的redis怎么處理這個(gè)觸碰樂觀鎖的情況:
當(dāng)Session1是沒有辦法執(zhí)行這個(gè)age的~!也就是chang-and-set操作
只要被watch的key在其他客戶端修改 而該客戶端也已經(jīng)進(jìn)入了multi,那么我們?cè)趍utli之間的操作將會(huì)無(wú)法做成功~!
通過(guò)源碼內(nèi)部看具體看下怎么實(shí)現(xiàn)的~
對(duì)于Session1 :
Step1:src/multi.c里的watchForKey(session1,age) 加入watch的key中
Step2:執(zhí)行multi函數(shù):準(zhǔn)備把接下來(lái)所有的命令都加入到QUEUE隊(duì)列中
Step3:Session2 執(zhí)行set(age,30) 這個(gè)時(shí)候set命令會(huì)查看這個(gè)是不是在watch_key里 到相應(yīng)的watch_key字典中,如果含有 那就傻逼了,就調(diào)用Db.c里的singalTtachKey(),改寫這個(gè)key的每個(gè)需要監(jiān)控的客戶端的REDIS_DIRTY_CAS字段為1
Step4:Session1就非常高興的調(diào)用execCommand(session1)結(jié)果一發(fā)現(xiàn)現(xiàn)在這個(gè)REDIS_DIRTY_CAS字段就是一個(gè)1,就全部不執(zhí)行 直接返回。
引入這個(gè)MULTI的原因
redis本身是一個(gè)單線程,按照常理來(lái)說(shuō),指令都是序列化的,一堆需要原子操作的命令放在服務(wù)器端執(zhí)行 也是按照順序往下執(zhí)行,Client A 和Client B 只需要一個(gè)或者加Watch 某個(gè)Key 不管有沒有multi命令是不是就確保了其會(huì)進(jìn)行原子操作呢? 在ClientA 和ClientB中,如果watch 了一個(gè)age,如果沒有multi,那么假設(shè)Client A加了watch 執(zhí)行了一個(gè)其中命令,而另外一個(gè)命令準(zhǔn)備執(zhí)行時(shí),ClientB就修改了這個(gè)age,而那個(gè)時(shí)候ClientA即使讀到REDIS_DIRTY_CAS為1 也起不到作用了,因?yàn)闆]辦法進(jìn)行事務(wù)的回滾操作~!
所以只能把操作放在隊(duì)列中,要么不執(zhí)行,要么一下子全部執(zhí)行完~!






浙公網(wǎng)安備 33010602011771號(hào)