Glusterfs之rpc模塊源碼分析(中)之Glusterfs的rpc模塊實現(1)
我的新浪微博:http://weibo.com/freshairbrucewoo。
歡迎大家相互交流,共同提高技術。
二、Glusterfs的rpc模塊實現
第一節、rpc服務器端實現原理及代碼分析
1.rpc服務初始化
Rpc服務的初始化工作在函數rpcsvc_init中實現的,實現代碼如下:
1 rpcsvc_t * rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) 2 3 { 4 5 rpcsvc_t *svc = NULL;//所有rpc服務的全局狀態描述對象 6 7 int ret = -1, poolcount = 0; 8 9 svc = GF_CALLOC (1, sizeof (*svc), gf_common_mt_rpcsvc_t);//分配內存資源 10 11 pthread_mutex_init (&svc->rpclock, NULL);//初始化鎖 12 13 INIT_LIST_HEAD (&svc->authschemes);//初始化權限模式鏈表 14 15 INIT_LIST_HEAD (&svc->notify);//初始化通知回調函數鏈表 16 17 INIT_LIST_HEAD (&svc->listeners);//初始化監聽鏈表 18 19 INIT_LIST_HEAD (&svc->programs);//初始化所有程序鏈表 20 21 ret = rpcsvc_init_options (svc, options);//初始化rpc服務的可選項信息 22 23 poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor;//計算內存池大小 24 25 svc->rxpool = mem_pool_new (rpcsvc_request_t, poolcount);//分配內存池空間 26 27 ret = rpcsvc_auth_init (svc, options);//初始化權限信息 28 29 svc->options = options;//可選項保存 30 31 svc->ctx = ctx;//所屬ctx 32 33 gluster_dump_prog.options = options;//讓描述程序的對象保存選項信息 34 35 ret = rpcsvc_program_register (svc, &gluster_dump_prog);//注冊rpc服務 36 37 return svc;//返回初始化后的所有rpc服務的全局描述對象 38 39 }
初始化的工作主要就是為描述一個所有rpc服務的全局對象設置一些初始化的值,這些信息一直保存到整個rpc服務結束,也就是glusterfs集群管理程序等主進程結束。
2.注冊回調通知函數rpcsvc_register_notify
其實這個也是算初始化的一部分,只不過它只初始化一種功能,就是注冊某些事件產生時的通知回調函數,通知的含義就是當這個事件發生了通知別的部分這里發生了事件,你應該根據這個事情做相應的處理,說簡單一點就是我這里發生的事件可能對你也有影響,所以通知你一下以便你也好做相應的處理。主要代碼如下:
1 wrapper = rpcsvc_notify_wrapper_alloc ();//為通知函數的包裝對象分配資源 2 3 svc->mydata = mydata;//設置屬于哪一個xlator 4 5 wrapper->data = mydata;//通知函數的包裝對象也保存屬于哪一個xlator 6 7 wrapper->notify = notify;//保存通知的回調函數 8 9 pthread_mutex_lock (&svc->rpclock);//svc是全局對象,訪問需要枷鎖 10 11 { 12 13 list_add_tail (&wrapper->list, &svc->notify);//添加到svc的通知函數列表 14 15 svc->notify_count++;//通知函數個數加1 16 17 } 18 19 pthread_mutex_unlock (&svc->rpclock);//釋放鎖
3.創建監聽器的函數rpcsvc_create_listeners
這個函數是重點中的重點,因為它的工作基本上把rpc服務建立起來了,就開始等待服務器的鏈接了,而且會加載底層的通信協議,主要包括rdma和tcp。所以這個函數做的工作會比較多,但是都很重要,下面詳細分析這個過程,先看這個函數實現的主要代碼:
1 rpcsvc_create_listener (svc, options, transport_name);
這個函數的實現大部分代碼都是在解析底層的傳輸類型,然后針對每一個解析出來的類型分別創建監聽程序,也就是上面那一句代碼。繼續看這個函數的實現,主要代碼如下:
1 trans = rpcsvc_transport_create (svc, options, name);//根據傳輸名稱創建傳輸描述對象 2 3 listener = rpcsvc_listener_alloc (svc, trans);//為監聽對象分配資源 4 5 if (!listener && trans) {//如果分配監聽資源失敗并且傳輸鏈接已建立成功 6 7 rpc_transport_disconnect (trans);//斷開傳輸鏈接 8 9 }
繼續看傳輸對象的創建函數rpcsvc_transport_create,主要代碼如下:
1 trans = rpc_transport_load (svc->ctx, options, name);//裝載傳輸層的庫并初始化 2 3 ret = rpc_transport_listen (trans);//建立傳輸層鏈接監聽,執行裝載庫后的listen函數 4 5 ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc);//注冊通知函數 6 7 if ((ret == -1) && (trans)) { 8 9 rpc_transport_disconnect (trans);//斷開底層傳輸鏈接 10 11 trans = NULL; 12 13 }
上面調用的幾個函數就rpc_transport_load還有點意思,不過rpc_transport_listen函數執行了裝載后具體的協議(rdma和tcp)的listen函數來開始監聽客戶端的請求。先看看rpc_transport_load函數的主要實現代碼:
1 trans = GF_CALLOC (1, sizeof (struct rpc_transport), gf_common_mt_rpc_trans_t); 2 3 trans->name = gf_strdup (trans_name);//復制傳輸層的名字 4 5 trans->ctx = ctx;//傳輸所屬的ctx 6 7 type = str;//傳輸類型 8 9 is_tcp = strcmp (type, "tcp"); 10 11 is_unix = strcmp (type, "unix"); 12 13 is_ibsdp = strcmp (type, "ib-sdp"); 14 15 if ((is_tcp == 0) || (is_unix == 0) || (is_ibsdp == 0)) { 16 17 if (is_unix == 0)//如果是unix通信協議就設置unix協議族 18 19 ret = dict_set_str (options, "transport.address-family", "unix"); 20 21 if (is_ibsdp == 0)//如果是ib-sdb通信協議就設置inet-sdp協議族 22 23 ret = dict_set_str (options, "transport.address-family", "inet-sdp"); 24 25 ret = dict_set_str (options, "transport-type", "socket");//設置傳輸類型socket 26 27 } 28 29 } 30 31 ret = dict_get_str (options, "transport-type", &type);//設置傳輸類型選項 32 33 ret = gf_asprintf (&name, "%s/%s.so", RPC_TRANSPORTDIR, type);//格式化庫路徑字符串 34 35 handle = dlopen (name, RTLD_NOW|RTLD_GLOBAL);//打開庫文件 36 37 trans->ops = dlsym (handle, "tops");//取ops函數組 38 39 trans->init = dlsym (handle, "init");//取初始化函數 40 41 trans->fini = dlsym (handle, "fini");//取析構函數 42 43 trans->reconfigure = dlsym (handle, "reconfigure");//取重新配置函數 44 45 vol_opt = GF_CALLOC (1, sizeof (volume_opt_list_t),gf_common_mt_volume_opt_list_t); 46 47 vol_opt->given_opt = dlsym (handle, "options");//取可選項數組 48 49 trans->options = options; 50 51 pthread_mutex_init (&trans->lock, NULL);//初始化鎖 52 53 trans->xl = THIS;//設置屬于哪一個xlator 54 55 ret = trans->init (trans);//執行初始化函數
由上面代碼可以看出,先動態加載了庫函數到對應的對象中去,最后執行了這個傳輸對象的初始化函數init,上面提到過注冊函數也是執行這里加載的一個listen函數,不過它是在一個函數數組集合里面ops,所以就是使用trans->ops->listen()來調用。下面就分析一下rdma和tcp傳輸協議的這兩個函數執行情況,先分析tcp的,因為tcp是比較熟悉的協議,tcp協議的init函數的主要代碼如下:
socket_init (this);
這個函數的代碼主要就是調用socket_init函數,而socket_init函數主要根據選項信息初始化一些socket對象描述的結構體,例如是否是非阻塞IO、鏈接超時等。繼續看看tcp的listen函數(在tcp實現為socket_listen),主要代碼如下:
1 //根據選項信息設置本地sockaddr結構信息(協議族、端口號等) 2 3 ret = socket_server_get_local_sockaddr (this, SA (&sockaddr), 4 5 &sockaddr_len, &sa_family); 6 7 pthread_mutex_lock (&priv->lock);//加鎖 8 9 { 10 11 memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len);//復制sockaddr地址 12 13 myinfo->sockaddr_len = sockaddr_len; 14 15 priv->sock = socket (sa_family, SOCK_STREAM, 0);//創建socket 16 17 //設置接收系統緩存區大小 18 19 if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, 20 21 &priv->windowsize,sizeof (priv->windowsize)) < 0) { 22 23 } 24 25 //設置發送數據系統緩沖區大小 26 27 if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, 28 29 &priv->windowsize,sizeof (priv->windowsize)) < 0) { 30 31 } 32 33 if (priv->nodelay) { 34 35 ret = __socket_nodelay (priv->sock);//設置無延遲 36 37 } 38 39 ret = __socket_server_bind (this);//設置地址重用并且綁定端口 40 41 if (priv->backlog) 42 43 ret = listen (priv->sock, priv->backlog);//設置并發接收數量 44 45 else 46 47 ret = listen (priv->sock, 10);//默認10個并發連接 48 49 rpc_transport_ref (this);//引用加1 50 51 //注冊讀取網絡事件 52 53 priv->idx = event_register (ctx->event_pool, priv->sock, 54 55 socket_server_event_handler, this, 1, 0); 56 57 } 58 59 pthread_mutex_unlock (&priv->lock);//解鎖
經過listen函數真個監聽服務就建立起來了,客戶端就可以發起請求了,客戶端的請求就由上面注冊的函數socket_server_event_handler處理,這個函數接收連接并且建立新的socket通信,并注冊另外一個函數進行數據傳輸的處理,基本上所有的網絡事件模型都是這個流程。下面繼續看rdma(rdm協議和工作原理相關內容就看附近)傳輸有什么不同的地方,還是從它的init函數開始分析,主要代碼如下:
1 rdma_private_t *priv = NULL;//定義一個rdma的私有數據描述對象 2 3 priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_rdma_private_t);//分配內存 4 5 this->private = priv;//私有數據保存到傳輸描述對象中 6 7 priv->sock = -1;//sock初始化為-1 8 9 if (rdma_init (this)) {//初始化infiniBand設備 10 11 }
這段代碼分配還內存資源以后就調用rdma_init初始化一些infiniBand相關內容,這些內容都是調用相應庫函數實現的,而且infiniBand需要特殊網絡設備才能支持(也需要相關的設備驅動程序)。當這些內容都初始化好了以后,后面的過程就和上面的完全tcp傳輸過程完全一樣,就是先注冊監聽端口等待客戶端的鏈接,當鏈接建立以后就注冊讀寫事件并在相應的注冊函數中處理相應的事件響應。
繼續回到rpcsvc_transport_create函數,當它執行了rpc_transport_load和rpc_transport_listen函數以后,基本基于配置協議的(tcp和rdma)的監聽程序都已初始化完畢并且開始監聽,不過還有最后一步就是注冊傳輸對象的通知回調函數,在函數rpc_transport_register_notify中實現,注冊的回調函數是rpcsvc_notify(就是把函數的地址保存到傳輸對象的notify成員中,把rpc狀態描述對象保存到mydata中),下面看看注冊的回調函數rpcsvc_notify的主要實現代碼,如下:
1 switch (event) { 2 3 case RPC_TRANSPORT_ACCEPT://新的客戶端鏈接請求已經被接收 4 5 new_trans = data; 6 7 ret = rpcsvc_accept (svc, trans, new_trans);//執行接收后的處理工作 8 9 break; 10 11 case RPC_TRANSPORT_DISCONNECT://鏈接被斷開 12 13 ret = rpcsvc_handle_disconnect (svc, trans); 14 15 break; 16 17 case RPC_TRANSPORT_MSG_RECEIVED://消息已經接收 18 19 msg = data; 20 21 ret = rpcsvc_handle_rpc_call (svc, trans, msg); 22 23 break; 24 25 case RPC_TRANSPORT_MSG_SENT://消息已經發送 26 27 break; 28 29 case RPC_TRANSPORT_CLEANUP://清理鏈接 30 31 listener = rpcsvc_get_listener (svc, -1, trans->listener);//取得listener 32 33 rpcsvc_program_notify (listener, RPCSVC_EVENT_TRANSPORT_DESTROY,trans); 34 35 break; 36 37 }
主要的功能就是根據事件類型做相應的處理工作,相應的事件處理又有可能根據是哪一個監聽對象調用監聽對象以前保存的rpc狀態對象里面注冊的notify函數。
到此為止整個rpc服務基本上已經建立完成,開始提供rpc服務,不過具體提供哪些程序的服務還需要最后一步,就是在已經建立的rpc服務上注冊服務程序(就是提供客戶端調用的程序集,每一個程序里面提供多個函數),不過這些程序的注冊是在具體啟動這個rpc服務的地方進行注冊的,下面以glusterd程序啟動rpc服務為例,看注冊了哪些程序,注冊程序代碼如下:
1 ret = glusterd_program_register (this, rpc, &glusterd1_mop_prog); 2 3 ret = glusterd_program_register (this, rpc, &gd_svc_cli_prog); 4 5 ret = glusterd_program_register (this, rpc, &gd_svc_mgmt_prog); 6 7 ret = glusterd_program_register (this, rpc, &gluster_pmap_prog); 8 9 ret = glusterd_program_register (this, rpc, &gluster_handshake_prog);
Glusterd進程注冊了5個程序集,那么客戶端就可以請求這5個程序集里面的函數調用。一個完整的rpc服務就這樣完全建立了。
總結:可以看出整個rpc服務的建立過程還是比較復雜的,下面用一個完整的圖來解析整個rpc的建立過程,圖如下:

浙公網安備 33010602011771號