基于C的 - Redis哨兵模式客戶端實現
最近工作上需要用到內存數據庫 redis,架構設計使用redis的哨兵模式,也就是集群模式。
因為是用C開發,但是redis所提供的hiredis頭文件中并未提供有關集群模式或者哨兵模式調用的方式,前輩說可以參考一下java庫中的jedis的實現,然后有了這篇博客。
一、哨兵模式簡述
哨兵模式是一種特殊的模式,首先Redis提供了哨兵的命令,哨兵是一個獨立的進程,作為進程,它會獨立運行。
其原理是哨兵通過發送命令,等待Redis服務器響應,從而監控運行的多個Redis實例。
它的主要功能有以下幾點
1、通過發送命令,讓Redis服務器返回監控其運行狀態,包括主服務器和從服務器。
2、如果發現某個Redis節點運行出現狀況,能夠通知另外一個進程(例如它的客戶端);
3、當哨兵監測到Master宕機,能夠從Master的多個Slave中(至少存活一臺Slave)選舉一個來作為新的Master,其它的Slave節點會將它指定的Master的地址改為被提升為Master的Slave的新地址。
在使用過程中如果只使用一個哨兵進程對Redis服務器進行監控,可能會出現問題,為此,我們可以使用多個哨兵進行監控。各個哨兵之間還會進行監控,這樣就形成了多哨兵模式。
二、java客戶端實現方式:
因為對java并不是很熟悉,研究了很久jedis的包只是知道java通過配置三個哨兵端口,以及一個鏈接池實現了主從,但是一直沒弄明白jedis是如何進行主從路由的。
后面在一篇描述redis 哨兵模式詳解的博客里面看到了對java-redis客戶端的原理介紹,詳細可以看下面鏈接,6、7兩點。參考鏈接 : http://www.rzrgm.cn/myitnews/p/13732901.html
總結來說,jedis客戶端是通過遍歷所有的哨兵端口,找到任意一個可以連接上的哨兵,發送請求 get-master-addr-by-name 請求,確認Master節點,然后會向這個 Master節點發送role或info replication 命令,來確定其是否是Master節點,同時獲取slave節點的信息,然后存到了java的鏈接池中。后續使用的時候,就會通過鏈接池來進行操作了,若master掛掉了,會重復上面操作,重新查詢新的Master節點。
三、偷懶了的C語言實現方式:
一開始了解到java客戶端的實現方式后,我暫時陷入了一個比較困擾的境地。
用過C鏈接redis的讀者可能知道,redis提供的C語言動態庫libhiredis,其中并沒有提供直接鏈接集群模式、哨兵模式的鏈接池鏈接方法。libhiredis中提供的方式全部鏈接方式是與redis直連。所以支持redis的集群模式,至少會需要手動實現一個鏈接池。
就目前的需求而言,我需要滿足哨兵模式的支持,實現程序與redis哨兵模式的交互。
時間有限,從簡單的方式先實現需求,我至少需要實現以下幾點:
1、redis連接池(暫時不考慮哨兵查詢,直接與redis-server建立連接)
2、連接池能夠實現主從鑒別(根據主從讀寫分離進行判斷)
3、需要支持高并發場景(建立長連接,避免重復重連影響效率)
建立單獨連接節點及連接池,使用的是靜態的變量保存的連接池。
//連接節點及相關信息
typedef struct connNode{ char ip[200]; int port; redisContext * conn; }conn_node;
//redis節點池,因為需求使用的是3臺redis,一主二從,設置當前最大為八臺機器 typedef struct redisNode{ int size; /*總機器數*/ int master; /*主機序號*/ int slaver; /*從機序號* conn_node nodeInfo[8];/*連接節點列表*/ }redisconn;
static redisconn connList; /*靜態連接池,保持長連接*/
初始化連接池
/******************************************** * 建立redis鏈接 * 支持單機版與哨兵模式版本 通過linux環境變量配置 .bash_profile * 單機版本,同一臺機器進行讀寫 * 哨兵模式,一主多從,主機寫高可用,從機器只可讀 * 通過插入測試值方式對哨兵模式下主機進行甄別,并記錄主機 *********************************************/
int redis_init() { char addr[1024]; char *p,*pAddr; int i,len; memset(addr, 0, sizeof(addr)); if (getenv("REDIS_ADDR") == NULL) { PTLOG_FILE("redis節環境變量未配置;【REDIS_ADDR】"); return -1; } snprintf(addr, sizeof(addr), "%s,", getenv("REDIS_ADDR")); /*初始化前需要將redis中的連接對象釋放掉,否則不會關閉句柄,也可能內存泄漏*/ for(i=0;i<8;i++){ if(connList.nodeInfo[i].conn != NULL){ redisFree(connList.nodeInfo[i].conn); } } memset(&connList,0,sizeof(connList)); //哨兵模式,讀寫分離,初始化主從機器均為 -1 connList.master = -1; connList.slaver = -1; connList.size=0; p = strchr(addr, ','); //環境變量,配置逗號分隔,表示多個 if(p != NULL){//多臺redis,哨兵模式 p=addr; len = strlen(addr); for(i = 0 ; i< len ; i++){ if(addr[i]==','){ pAddr = addr + i; memset(connList.nodeInfo[connList.size].ip,0,200); snprintf(connList.nodeInfo[connList.size].ip,(pAddr - p + 1),"%s",p); p = addr + i + 1; //查詢端口信息 pAddr = strrchr(connList.nodeInfo[connList.size].ip, ':'); if (pAddr == NULL || (connList.nodeInfo[connList.size].port = atoi(pAddr+1)) <= 0) { //端口有誤,跳過當前配置,不進行計數 PTLOG_FILE("環境變量 REDIS_ADDR 配置有誤:[%s]",connList.nodeInfo[connList.size].ip); continue ; } *pAddr = '\0'; //建立當前連接,存入連接列表中 connList.nodeInfo[connList.size].conn = redisConnect(connList.nodeInfo[connList.size].ip, connList.nodeInfo[connList.size].port); //建立連接失敗,不進行計數,否則后續會有問題 if (connList.nodeInfo[connList.size].conn == NULL) { *pAddr = ':'; PTLOG_FILE("[%s],%s",connList.nodeInfo[connList.size].ip,strerror(errno)); continue ; } else if (connList.nodeInfo[connList.size].conn->err) { *pAddr = ':'; PTLOG_FILE("redis連接失敗:[%s] error %d:%s",connList.nodeInfo[connList.size].ip, connList.nodeInfo[connList.size].conn->err, connList.nodeInfo[connList.size].conn->errstr); if(connList.nodeInfo[connList.size].conn != NULL){ redisFree(connList.nodeInfo[connList.size].conn); } continue ; } /* 建立長連接 KeepAlive*/ redisEnableKeepAlive(connList.nodeInfo[connList.size].conn); //連接列表數量++ connList.size++; } } //初始化主從機器,公共連接默認為主連接 conn = connAsMaster(); connAsSlaver(); }else{ //單機器模式,主從均為同一臺機器 connList.master = -1; connList.slaver = -1; connList.size=0; conn = connAsSingle( 0 ); return 1; } //默認連接master連接 conn = connList.nodeInfo[connList.master].conn; return connList.size; }
將所有連接建立后,需要校驗哪一臺是主機,哪一臺是從機,目前使用的方法是指定一臺為專門寫的主機,指定一臺從機為專門讀的從機。
目前實現方法,根據環境變量配置查找,查找主機從前往后查,查找從機從后往前查,當主機經過多次掛機重啟之后,有可能會出現最后一臺為主機的情況,該情況會使得讀寫在同一臺機器上。(可優化)
/********************************************** * redis哨兵模式讀寫分離,master機器寫高可用,slaver不能進行寫操作,需要選擇主機進行寫入值,如果主機參數不為-1,則說明已經經過初始化,并且已經確定主機,直接返回主機連接 **********************************************/
redisContext* connAsMaster(){ redisReply *reply; int i; if(connList.master == -1){ for( i = 0 ; i < connList.size; i++ ){ reply = redisCommand(connList.nodeInfo[i].conn, "set %s %s", "redis_master_key", "1");//測試插入值 if (reply == NULL || reply->type == REDIS_REPLY_ERROR || connList.nodeInfo[i].conn->err) { if (reply != NULL) { freeReplyObject(reply); }else { /*redis連接斷開情況,返回值為NULL,重連再次執行一次*/ if(connList.nodeInfo[i].conn!=NULL){ redisFree(connList.nodeInfo[i].conn); } connList.nodeInfo[i].conn = redisConnect(connList.nodeInfo[i].ip,connList.nodeInfo[i].port); reply = redisCommand(connList.nodeInfo[i].conn, "set %s %s", "redis_master_key", "1");//測試插入值 if (!(reply == NULL && reply->type == REDIS_REPLY_ERROR && connList.nodeInfo[i].conn->err)){ freeReplyObject(reply); } } continue; } connList.master = i; break; } } if(connList.master == -1){//無可用連接 PTLOG_FILE("FAIL:[無可用連接]"); return NULL; } return connList.nodeInfo[connList.master].conn; }
/********************************************** * 連接從機器,通過查詢主機寫入的值,查詢成功則選定為從機,如果從機參數不為-1,則說明已經經過初始化,并確定從機,直接返回從機連接 **********************************************/
redisContext* connAsSlaver(){ int i; redisReply *reply; if(connList.slaver == -1){ for( i = connList.size - 1 ; i >= 0 ; i-- ){ reply = redisCommand(connList.nodeInfo[i].conn, "get redis_master_key "); if (reply == NULL || reply->type == REDIS_REPLY_ERROR || connList.nodeInfo[i].conn->err) { if (reply != NULL) { freeReplyObject(reply); }else { /*redis連接斷開情況,返回值為NULL,重連再次執行一次*/ if(connList.nodeInfo[i].conn!=NULL){ redisFree(connList.nodeInfo[i].conn); } connList.nodeInfo[i].conn = redisConnect(connList.nodeInfo[i].ip,connList.nodeInfo[i].port); reply = redisCommand(connList.nodeInfo[i].conn, "get redis_master_key ");//測試插入值 if (!(reply == NULL && reply->type == REDIS_REPLY_ERROR && connList.nodeInfo[i].conn->err)){ freeReplyObject(reply); } } continue; } connList.slaver = i; break; } } if(connList.slaver == -1){//無可用連接 PTLOG_FILE("FAIL:[無可用連接]"); return NULL; } return connList.nodeInfo[connList.slaver].conn; }
四、復盤反思
當初趕進度兩個禮拜要完成開發測試,包括熟悉jedis的實現方式,時間實在趕就沒有去深入思考怎么實現更合適。
當然上面成功實現了redis集群模式的支持,但是還是有很多可以進行改進的方式。
簡單舉個例子:上述實現沒有考慮redis的密碼模式(雖然是需求上沒有提及,沒實現也沒問題。)說白了,就是沒考慮到!是 bug! ORZ
有個小插曲:測試在測代碼的時候,問了我一個問題,他說他之前測試的jar包使用了redis的依賴,在配置文件中需要配置redis的節點并不是redis-server的端口,而是sentinel端口,而我是通過直接連接redis實現的,有什么區別。
其實這就是我這個實現與jedis客戶端的區別了。
按照jedis客戶端的實現,連接確實是配置sentinel,然后需要通過sentinel查詢master機器。
127.0.0.1:26379> SENTINEL get-master-addr-by-name mymaster
1) "127.0.0.1"
2) "6379"
mymaster是在進行集群配置的時候,寫在sentinel.conf中的主機名稱。通過這個主機名稱可以查出主機的ip和port
然后建立指向主機的連接,通過命令 role 或者 info replication查看當前機器是否為Master,并查看其從節點。從而來建立從節點的連接。
再進一步,對于高并發查詢的場景,可以將從節點進行一個負載均衡,避免大量查詢在一個從節點上。(官方數據表示Redis讀的速度是110000次/秒,寫的速度是81000次/秒。跑~~~
127.0.0.1:6380> role
1) "master"
2) (integer) 73735184
3) 1) 1) "127.0.0.1"
2) "6379"
3) "73735184"
2) 1) "127.0.0.1"
2) "6381"
3) "73735184"
127.0.0.1:6380> INFO replication
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6379,state=online,offset=73735982,lag=1
slave1:ip=127.0.0.1,port=6381,state=online,offset=73735982,lag=1
master_repl_offset:73736115
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:72687540
repl_backlog_histlen:1048576
在最開始開發的時候,看到不存在哨兵連接的接口,我甚至認為C語言不支持哨兵偵測,但是經過熟悉了解后,我還是naive了~
有時間碼一個,實現一下(挖坑~
總結:其實很多情況并不是無法實現,而是缺乏思考。
代碼千萬條,思考第一條,開發不規范,bug碼里藏。
要沉淀每一次的思考,下次代碼能寫得更好。

浙公網安備 33010602011771號