Go學(xué)習(xí)筆記 - 使用jsonrpc進(jìn)行遠(yuǎn)程訪問
##JSON-RPC
----------
JSON-RPC是一個(gè)輕量級(jí)的遠(yuǎn)程調(diào)用協(xié)議,簡(jiǎn)單易用。
**請(qǐng)求數(shù)據(jù)體**:
{
"method": "getName",
"params": ["1"],
"id": 1
}
`method`: 遠(yuǎn)端的方法名
`params`: 遠(yuǎn)程方法接收的參數(shù)列表
`id`: 本次請(qǐng)求的標(biāo)識(shí)碼,遠(yuǎn)程返回時(shí)數(shù)據(jù)的標(biāo)識(shí)碼應(yīng)與本次請(qǐng)求的標(biāo)識(shí)碼相同
**返回?cái)?shù)據(jù)體**:
{
"result": {"id": 1, "name": "name1"},
"error": null,
"id": 1
}
`result`: 遠(yuǎn)程方法返回值
`error`: 錯(cuò)誤信息
`id`: 調(diào)用時(shí)所傳來的id
----------
##Go的rpc包
**`net/rpc`**
`net/rpc`包實(shí)現(xiàn)了最基本的rpc調(diào)用,它默認(rèn)通過`HTTP`協(xié)議傳輸`gob`數(shù)據(jù)來實(shí)現(xiàn)遠(yuǎn)程調(diào)用。
服務(wù)端實(shí)現(xiàn)了一個(gè)HTTP server,接收客戶端的請(qǐng)求,在收到調(diào)用請(qǐng)求后,會(huì)反序列化客戶端傳來的gob數(shù)據(jù),獲取要調(diào)用的方法名,并通過反射來調(diào)用我們自己實(shí)現(xiàn)的處理方法,這個(gè)處理方法傳入固定的兩個(gè)參數(shù),并返回一個(gè)error對(duì)象,參數(shù)分別為客戶端的請(qǐng)求內(nèi)容以及要返回給客戶端的數(shù)據(jù)體的指針。
**`net/rpc/jsonrpc`**
`net/rpc/jsonrpc`包實(shí)現(xiàn)了JSON-RPC協(xié)議,即實(shí)現(xiàn)了`net/rpc`包的`ClientCodec`接口與`ServerCodec`,增加了對(duì)json數(shù)據(jù)的序列化與反序列化。
----------
##Go JSON-RPC遠(yuǎn)程調(diào)用
客戶端與服務(wù)端雙方傳輸數(shù)據(jù),其中數(shù)據(jù)結(jié)構(gòu)必須得讓雙方都能處理。
首先定義rpc所傳輸?shù)臄?shù)據(jù)的結(jié)構(gòu),client端與server端都得用到。
// 需要傳輸?shù)膶?duì)象
type RpcObj struct {
Id int `json:"id"` // struct標(biāo)簽, 如果指定,jsonrpc包會(huì)在序列化json時(shí),將該聚合字段命名為指定的字符串
Name string `json:"name"`
}
// 需要傳輸?shù)膶?duì)象
type ReplyObj struct {
Ok bool `json:"ok"`
Id int `json:"id"`
Msg string `json:"msg"`
}
`RpcObj` 為傳輸?shù)臄?shù)據(jù)
`ReplyObj` 為服務(wù)端返回的數(shù)據(jù)
這兩個(gè)結(jié)構(gòu)體均可以在client和server端雙向傳遞
**服務(wù)端**
引入兩個(gè)包
"net/rpc"
"net/rpc/jsonrpc"
`net/rpc`實(shí)現(xiàn)了go的rpc框架,而`net/rpc/jsonrpc`則具體實(shí)現(xiàn)了JSON-RPC協(xié)議,具有json數(shù)據(jù)的序列化與反序列化功能。
實(shí)現(xiàn)處理器
// server端的rpc處理器
type ServerHandler struct {}
// server端暴露的rpc方法
func (serverHandler ServerHandler) GetName(id int, returnObj *RpcObj) error {
log.Println("server\t-", "recive GetName call, id:", id)
returnObj.Id = id
returnObj.Name = "名稱1"
return nil
}
// server端暴露的rpc方法
func (serverHandler ServerHandler) SaveName(rpcObj RpcObj, returnObj *ReplyObj) error {
log.Println("server\t-", "recive SaveName call, RpcObj:", rpcObj)
returnObj.Ok = true
returnObj.Id = rpcObj.Id
returnObj.Msg = "存儲(chǔ)成功"
return nil
}
`ServerHandler`結(jié)構(gòu)可以不需要什么字段,只需要有符合`net/rpc`server端處理器約定的方法即可。
符合約定的方法必須具備兩個(gè)參數(shù)和一個(gè)`error`類型的返回值
*第一個(gè)參數(shù)* 為client端調(diào)用rpc時(shí)交給服務(wù)器的數(shù)據(jù),可以是指針也可以是實(shí)體。`net/rpc/jsonrpc`的json處理器會(huì)將客戶端傳遞的json數(shù)據(jù)解析為正確的struct對(duì)象。
*第二個(gè)參數(shù)* 為server端返回給client端的數(shù)據(jù),必須為指針類型。`net/rpc/jsonrpc`的json處理器會(huì)將這個(gè)對(duì)象正確序列化為json字符串,最終返回給client端。
`ServerHandler`結(jié)構(gòu)需要注冊(cè)給`net/rpc`的HTTP處理器,HTTP處理器綁定后,會(huì)通過反射得到其暴露的方法,在處理請(qǐng)求時(shí),根據(jù)JSON-RPC協(xié)議中的`method`字段動(dòng)態(tài)的調(diào)用其指定的方法。
// 新建Server
server := rpc.NewServer()
// 開始監(jiān)聽,使用端口 8888
listener, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatal("server\t-", "listen error:", err.Error())
}
defer listener.Close()
log.Println("server\t-", "start listion on port 8888")
// 新建處理器
serverHandler := &ServerHandler{}
// 注冊(cè)處理器
server.Register(serverHandler)
// 等待并處理鏈接
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal(err.Error())
}
// 在goroutine中處理請(qǐng)求
// 綁定rpc的編碼器,使用http connection新建一個(gè)jsonrpc編碼器,并將該編碼器綁定給http處理器
go server.ServeCodec(jsonrpc.NewServerCodec(conn))
}
rpc server端大致的處理流程

**客戶端**
客戶端必須確保存在服務(wù)端在傳輸?shù)臄?shù)據(jù)中所使用的struct,在這里,必須確保客戶端也能使用`RpcObj`與`ReplyObj` struct。
client, err := net.DialTimeout("tcp", "localhost:8888", 1000*1000*1000*30) // 30秒超時(shí)時(shí)間
if err != nil {
log.Fatal("client\t-", err.Error())
}
defer client.Close()
首先,通過`net`包使用TCP協(xié)議連接至服務(wù)器,這里設(shè)定了超時(shí)時(shí)間。
clientRpc := jsonrpc.NewClient(client)
然后使用`jsonrpc.NewClient`通過之前的TCP鏈接建立一個(gè)rpcClient實(shí)例。
對(duì)于`net/rpc`的客戶端,在遠(yuǎn)程調(diào)用是有同步(Synchronous)和異步(Asynchronous)兩種方式。不論那種方式,在源碼中,請(qǐng)求總是在一個(gè)新的goroutine中執(zhí)行,并且使用一個(gè)通道(chan)來存放服務(wù)器返回值。使用同步方式調(diào)用時(shí),調(diào)用方法內(nèi)部會(huì)等待chan的數(shù)據(jù),并一直阻塞直到遠(yuǎn)程服務(wù)器返回。而使用異步方式時(shí),客戶端的調(diào)用方法會(huì)直接將chan返回,這樣就可以適時(shí)的處理數(shù)據(jù)而不影響當(dāng)前goroutine。
下面是`net/rpc/client`中調(diào)用遠(yuǎn)程rpc的源碼
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
*同步調(diào)用的使用*
// 遠(yuǎn)程服務(wù)器返回的對(duì)象
var rpcObj RpcObj
// 請(qǐng)求數(shù)據(jù),rpcObj對(duì)象會(huì)被填充
clientRpc.Call("ServerHandler.GetName", 1, &rpcObj)
// 遠(yuǎn)程返回的對(duì)象
var reply ReplyObj
// 傳給遠(yuǎn)程服務(wù)器的對(duì)象參數(shù)
saveObj := RpcObj{2, "對(duì)象2"}
// 請(qǐng)求數(shù)據(jù)
clientRpc.Call("ServerHandler.SaveName", saveObj, &reply)
`Call`方法屬于同步方式的調(diào)用。第一個(gè)參數(shù)為Server端JSON-RPC處理器的類名加方法名,第二個(gè)參數(shù)為提交給遠(yuǎn)端服務(wù)器的數(shù)據(jù),第三個(gè)參數(shù)是服務(wù)器的返回?cái)?shù)據(jù),必須是指針。
*異步調(diào)用的使用*
// 傳給遠(yuǎn)程的對(duì)象
saveObj := RpcObj{i, "對(duì)象"}
// 異步的請(qǐng)求數(shù)據(jù)
divCall := clientRpc.Go("ServerHandler.SaveName", saveObj, &ReplyObj{}, nil)
// 在一個(gè)新的goroutine中異步獲取遠(yuǎn)程的返回?cái)?shù)據(jù),并不阻塞當(dāng)前的goroutine
go func() {
reply := <-divCall.Done // 取出遠(yuǎn)程返回的數(shù)據(jù)
}()
`Call`方法屬于同步方式的調(diào)用。第一個(gè)參數(shù)為Server端JSON-RPC處理器的類名加方法名,第二個(gè)參數(shù)為提交給遠(yuǎn)端服務(wù)器的數(shù)據(jù),第三個(gè)參數(shù)是服務(wù)器的返回?cái)?shù)據(jù),必須是指針,第四個(gè)參數(shù)為一個(gè)通道,可以留空,留空的話它會(huì)幫忙建一個(gè),并保存在divCall中。
`net/rpc/jsonrpc/client`會(huì)把方法名與參數(shù)自動(dòng)序列化為json格式,其結(jié)構(gòu)如開頭所述的JSON-RPC結(jié)構(gòu)一樣,并自動(dòng)為JSON-RPC中的id賦值。而服務(wù)端返回的對(duì)象也會(huì)被正確的反序列化。
rpc client端大致的處理流程

----------
完整的程序
package main
import (
"net/rpc"
"net/rpc/jsonrpc"
"net"
"log"
)
// 需要傳輸?shù)膶?duì)象
type RpcObj struct {
Id int `json:"id"` // struct標(biāo)簽, 如果指定,jsonrpc包會(huì)在序列化json時(shí),將該聚合字段命名為指定的字符串
Name string `json:"name"`
}
// 需要傳輸?shù)膶?duì)象
type ReplyObj struct {
Ok bool `json:"ok"`
Id int `json:"id"`
Msg string `json:"msg"`
}
// server端的rpc處理器
type ServerHandler struct {}
// server端暴露的rpc方法
func (serverHandler ServerHandler) GetName(id int, returnObj *RpcObj) error {
log.Println("server\t-", "recive GetName call, id:", id)
returnObj.Id = id
returnObj.Name = "名稱1"
return nil
}
// server端暴露的rpc方法
func (serverHandler ServerHandler) SaveName(rpcObj *RpcObj, returnObj *ReplyObj) error {
log.Println("server\t-", "recive SaveName call, RpcObj:", rpcObj)
returnObj.Ok = true
returnObj.Id = rpcObj.Id
returnObj.Msg = "存儲(chǔ)成功"
return nil
}
// 開啟rpc服務(wù)器
func startServer() {
// 新建Server
server := rpc.NewServer()
// 開始監(jiān)聽,使用端口 8888
listener, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatal("server\t-", "listen error:", err.Error())
}
defer listener.Close()
log.Println("server\t-", "start listion on port 8888")
// 新建處理器
serverHandler := &ServerHandler{}
// 注冊(cè)處理器
server.Register(serverHandler)
// 等待并處理鏈接
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal(err.Error())
}
// 在goroutine中處理請(qǐng)求
// 綁定rpc的編碼器,使用http connection新建一個(gè)jsonrpc編碼器,并將該編碼器綁定給http處理器
go server.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
// 客戶端以同步的方式向rpc服務(wù)器請(qǐng)求
func callRpcBySynchronous() {
// 連接至服務(wù)器
client, err := net.DialTimeout("tcp", "localhost:8888", 1000*1000*1000*30) // 30秒超時(shí)時(shí)間
if err != nil {
log.Fatal("client\t-", err.Error())
}
defer client.Close()
// 建立rpc通道
clientRpc := jsonrpc.NewClient(client)
// 遠(yuǎn)程服務(wù)器返回的對(duì)象
var rpcObj RpcObj
log.Println("client\t-", "call GetName method")
// 請(qǐng)求數(shù)據(jù),rpcObj對(duì)象會(huì)被填充
clientRpc.Call("ServerHandler.GetName", 1, &rpcObj)
log.Println("client\t-", "recive remote return", rpcObj)
// 遠(yuǎn)程返回的對(duì)象
var reply ReplyObj
// 傳給遠(yuǎn)程服務(wù)器的對(duì)象參數(shù)
saveObj := RpcObj{2, "對(duì)象2"}
log.Println("client\t-", "call SetName method")
// 請(qǐng)求數(shù)據(jù)
clientRpc.Call("ServerHandler.SaveName", saveObj, &reply)
log.Println("client\t-", "recive remote return", reply)
}
// 客戶端以異步的方式向rpc服務(wù)器請(qǐng)求
func callRpcByAsynchronous() {
// 打開鏈接
client, err := net.DialTimeout("tcp", "localhost:8888", 1000*1000*1000*30) // 30秒超時(shí)時(shí)間
if err != nil {
log.Fatal("client\t-", err.Error())
}
defer client.Close()
// 建立rpc通道
clientRpc := jsonrpc.NewClient(client)
// 用于阻塞主goroutine
endChan := make(chan int, 15)
// 15次請(qǐng)求
for i := 1 ; i <= 15; i++ {
// 傳給遠(yuǎn)程的對(duì)象
saveObj := RpcObj{i, "對(duì)象"}
log.Println("client\t-", "call SetName method")
// 異步的請(qǐng)求數(shù)據(jù)
divCall := clientRpc.Go("ServerHandler.SaveName", saveObj, &ReplyObj{}, nil)
// 在一個(gè)新的goroutine中異步獲取遠(yuǎn)程的返回?cái)?shù)據(jù)
go func(num int) {
reply := <-divCall.Done
log.Println("client\t-", "recive remote return by Asynchronous", reply.Reply)
endChan <- num
}(i)
}
// 15個(gè)請(qǐng)求全部返回時(shí)此方法可以退出了
for i := 1 ; i <= 15; i++ {
_ = <-endChan
}
}
func main() {
go startServer()
callRpcBySynchronous()
callRpcByAsynchronous()
}
----------
##總結(jié)
在使用`net/rpc/jsonrpc`時(shí)遇到這樣一個(gè)問題:
有多個(gè)client與一個(gè)server進(jìn)行rpc調(diào)用,而這些client又處于不同的內(nèi)網(wǎng),在server端需要獲取client端的公網(wǎng)IP。
按照`net/rpc`的實(shí)現(xiàn),在服務(wù)端處理器的自定義方法中只能獲取被反序列化的數(shù)據(jù),其他請(qǐng)求相關(guān)信息如client的IP只能在主goroutine的`net.Listener.Accept`中的`Conn`對(duì)象取得。
按源碼中的示例,每接收一個(gè)TCP請(qǐng)求都會(huì)在一個(gè)新的goroutine中處理,但是處理器的自定義方法都運(yùn)行在不同的goroutine中,這些回調(diào)的方法沒有暴露任何能獲取conn的字段、方法。
我是這樣解決的,在server端rpc處理器struct中放一個(gè)聚合字段,用于存儲(chǔ)ip地址的。
處理器被注冊(cè)與rpc server,全局只有一個(gè),在每次接受到tcp請(qǐng)求后,開啟一個(gè)goroutine,然后在goroutine內(nèi)部立即加上排斥鎖,然后再把請(qǐng)求的conn綁定給rpc server處理器,這樣,即能保證handler字段的線程安全,又能及時(shí)的相應(yīng)client的請(qǐng)求。
....
....
....
mutex := &sync.Mutex{}
// 等待鏈接
for {
// 相應(yīng)請(qǐng)求
conn, err := listener.Accept()
if err != nil {
log.Println(err.Error())
}
// 開啟一個(gè)goroutine來處理請(qǐng)求,緊接著等待下一個(gè)請(qǐng)求。
go func() {
// 應(yīng)用排斥鎖
mutex.Lock()
// 記錄ip地址
reciveHandler.Ip = strings.Split(conn.RemoteAddr().String(), ":")[0]
// 處理JSON-RPC調(diào)用
server.ServeCodec(jsonrpc.NewServerCodec(conn))
// 解鎖
mutex.Unlock()
}()
}
....
....
....
posted on 2013-08-15 00:14 黑暗伯爵 閱讀(7632) 評(píng)論(0) 收藏 舉報(bào)
浙公網(wǎng)安備 33010602011771號(hào)