Java 實現簡單的 RPC 框架
RPC 簡介
RPC,全稱為 Remote Procedure Call,即遠程過程調用,它是一個計算機通信協議。它允許像調用本地服務一樣調用遠程服務。它可以有不同的實現方式,而不需要了解底層網絡技術的協議。 RPC 協議假定某些傳輸協議的存在,如 TCP 或 UDP,為通信程序之間攜帶信息數據。如 RMI(遠程方法調用)、Hessian、Http invoker 等。
怎樣實現一個 RPC 框架
RPC 能夠讓本地應用簡單、高效地調用服務器中的過程。它主要應用在分布式系統。如 Hadoop 中的 IPC 組件。但怎樣實現一個 RPC 框架呢?
可以從下面幾個方面思考:
- 通信模型:假設通信的為 A 機器與 B 機器,A 與 B 之間有通信模型,在 Java 中一般基于 BIO 或 NIO。
- 過程(服務)定位:使用給定的通信方式,與確定 IP 與端口及方法名稱確定具體的過程或方法;
- 遠程代理對象:本地調用的方法(服務)其實是遠程方法的本地代理,因此可能需要一個遠程代理對象,對于 Java 而言,遠程代理對象可以使用 Java 的動態對象實現,封裝了調用遠程方法調用;
- 序列化,將對象名稱、方法名稱、參數等對象信息進行網絡傳輸需要轉換成二進制傳輸,這里可能需要不同的序列化技術方案。如:protobuf,Arvo 等。
RPC 框架架構
RPC 架構分為三部分:
- 服務提供者,運行在服務器端,提供服務接口定義與服務實現類。
- 服務中心,運行在服務器端,負責將本地服務發布成遠程服務,管理遠程服務,提供給服務消費者使用。
- 服務消費者,運行在客戶端,通過遠程代理對象調用遠程服務。
RPC 框架的簡單實現
這里我只介紹服務提供者和客戶端的實現方式。
服務提供者
服務提供者 IHello 接口定義:
public interface IHello {
String sayHello(String string);
}
服務提供者 IHello 接口實現:
public class HelloImpl implements IHello {
@Override
public String sayHello(String string) {
return "Hello:" + string;
}
}
服務端 RpcProxyServer 類:
public class RpcProxyServer {
ExecutorService executorService = Executors.newCachedThreadPool();
public void publisher(Object service, int port) {
ServerSocket serverSocket = null;
try {
// 啟動 socket 服務
serverSocket = new ServerSocket(port);
while (true) {
Socket socket = serverSocket.accept();
executorService.execute(new ProcessorHandler(service, socket));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
服務端 RpcRequest 類:
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 383378368319625542L;
private String className;
private String methodName;
private Object[] params;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
@Override
public String toString() {
return "RpcRequest{" +
"className='" + className + '\'' +
", methodName='" + methodName + '\'' +
", params=" + Arrays.toString(params) +
'}';
}
}
服務端 ProcessorHandler 類:
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
public class ProcessorHandler implements Runnable {
Socket socket;
Object service;
public ProcessorHandler(Object service, Socket socket) {
this.socket = socket;
this.service = service;
}
@Override
public void run() {
System.out.println("begin processor handler!");
ObjectInputStream objectInputStream = null;
try {
objectInputStream = new ObjectInputStream(socket.getInputStream());
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object restlt = invoke(rpcRequest);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(restlt);
objectOutputStream.flush();
objectInputStream.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private Object invoke(RpcRequest request) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Object[] args = request.getParams();
Class<?>[] types = new Class[args.length];
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
}
Method method = service.getClass().getMethod(request.getMethodName(), types);
return method.invoke(service, args);
}
}
服務端主類 RpcServerMain:
public class RpcServerMain {
public static void main(String[] args) {
IHello hello = new HelloImpl();
RpcProxyServer rpcProxyServer = new RpcProxyServer();
rpcProxyServer.publisher(hello, 8080);
System.out.println(hello.sayHello("charles"));
}
}
客戶端
客戶端 IHello 類:
public interface IHello {
String sayHello(String string);
}
客戶端 RpcClientProxy 類:
public class RpcClientProxy {
public <T> T clientProxy(Class<T> interfaceCls, String host, int port) {
return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
}
}
客戶端 RpcRequest 類:
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 383378368319625542L;
private String className;
private String methodName;
private Object[] params;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
@Override
public String toString() {
return "RpcRequest{" +
"className='" + className + '\'' +
", methodName='" + methodName + '\'' +
", params=" + Arrays.toString(params) +
'}';
}
}
客戶端 RpcNetTransport 類:
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.net.Socket;
public class RpcNetTransport {
String host;
int port;
public RpcNetTransport(String host, int port) {
this.host = host;
this.port = port;
}
private Socket createSocket() {
System.out.println("Begin create socket connect!");
Socket socket = null;
try {
socket = new Socket(host, port);
} catch (Exception e) {
throw new RuntimeException("build connect failed.");
}
return socket;
}
public Object send(RpcRequest rpcRequest) {
Socket socket = null;
try {
socket = createSocket();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(rpcRequest);
objectOutputStream.flush();
// 返回結果接收
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
Object resultObject = objectInputStream.readObject();// 反序列化 對象
objectInputStream.close();
objectOutputStream.close();
return resultObject;
} catch (Exception e) {
throw new RuntimeException("send request exception:" + e);
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客戶端 RemoteInvocationHandler 類:
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class RemoteInvocationHandler implements InvocationHandler {
String host;
int port;
public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParams(args);
RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
return rpcNetTransport.send(rpcRequest);
}
}
客戶端主類 RpcClientMain:
public class RpcClientMain {
public static void main(String[] args) {
RpcClientProxy rpcClientProxy = new RpcClientProxy();
IHello hello = rpcClientProxy.clientProxy(IHello.class, "localhost", 8080);
System.out.println(hello.sayHello("charles"));
}
}
項目啟動后客戶端向服務端發送了一條消息,分別運行兩個項目后輸出結果如下
服務端:
begin processor handler!
客戶端:
Begin create socket connect! Hello:charles
總結
RPC 本質為消息處理模型,RPC 屏蔽了底層不同主機間的通信細節,讓進程調用遠程的服務就像是本地的服務一樣。

浙公網安備 33010602011771號