什么是RPC

我的理解是,RPC(远程过程调用)是一种让程序能够像调用本地函数一样调用远程服务的思想概念。而RPC协议,则是详细设计了通信的数据格式(如JSON、XML、Protobuf)、传输协议(TCP/HTTP等)和序列化方式,从而将复杂的网络通信细节封装起来。

任何人都可以定义自己的rpc协议,只要:

  1. 明确的格式规范:定义请求和响应数据的结构、字段、编码方式
  2. 完整的调用语义:包含方法标识、参数传递、返回值、错误处理机制
  3. 一致的序列化:双方使用相同的序列化/反序列化规则

基本功能实现

下面,我将完整介绍如何实现一个基本的RPC框架,主要涉及服务的注册和远程调用。

环境配置

创建一个Maven项目,然后分别创建如下四个模块

image-20251216163234842

通用模块

通用模块存放了远程调用的接口和一些公共配置。消费者需要这个调用这个接口的远程实现。而服务端提供了这个接口的实现类,并可供消费者调用。

结构很简单,如图所示

image-20251216163829595

在common模块中定义一个HelloServer的接口

1
2
3
4
5
6
7
8
9
10
11
/**
*
* 远程调用的通用接口
*
* @author ldy
* @date 2025/12/15 23:36
*
*/
public interface HelloServer {
public String sayHello(String msg);
}

服务消费者

消费者中定义了启动类ConsumerApplication,通过调用RPC框架的api来进行远程方法的调用。具体方法内容的编写,看后面rpc框架实现的部分。

image-20251216164103317

注意引入公共模块和rpc模块的依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.example</groupId>
<artifactId>Common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.example</groupId>
<artifactId>MyRpc</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

服务提供者

服务提供者模块定义了ProviderApplication,通过调用rpc框架的api来注册服务和启动服务。ProviderApplication的定义看rpc框架实现部分。

image-20251216164303981

HelloServerImpl的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
*
* HelloServer的实现类
*
* @author ldy
* @date 2025/12/15 23:42
*
*/
public class HelloServerImpl implements HelloServer {

@Override
public String sayHello(String msg) {
System.out.println("hello " + msg + ", this is provider");
return "perfect";
}
}

同样,记得配置依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.example</groupId>
<artifactId>Common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.example</groupId>
<artifactId>MyRpc</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

自定义Rpc框架

要想实现Rpc框架,我们就需要考虑几个问题:

  • 服务端要实现服务启动和服务注册
  • 客户端要实现服务请求发送
  • rpc框架要定义好通用的数据通信对象

image-20251216165059385

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependencies>
<!--tomcat依赖-->
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>8.5.31</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.38</version>
</dependency>

<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.42</version>
</dependency>
</dependencies>

HttpServer服务端启动类

这里使用了Tomcat来启动服务。注意导入tocat的依赖包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
*
* http服务端
*
* @author ldy
* @date 2025/12/15 23:44
*
*/
public class HttpServer {
/**
* 启动tomcat服务
* @param hostName 主机名
* @param port 端口号
*/
public void start(String hostName, Integer port){
Tomcat tomcat = new Tomcat();

Server server = tomcat.getServer();
Service service = server.findService("Tomcat");

Connector connector = new Connector();
connector.setPort(port);

Engine engine = new StandardEngine();
engine.setDefaultHost(hostName);

StandardHost host = new StandardHost();
host.setName(hostName);

String contextPath = "";
StandardContext context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());

host.addChild(context);
engine.addChild(host);
service.setContainer(engine);
service.addConnector(connector);

//对于contextPath路径的请求,交给自定义的DispatcherServlet进行处理
//注册DispatcherServlet,名称为dispatcher,可以处理的请求路径为contextPath
tomcat.addServlet(contextPath, "dispatcher", new DispatcherServlet());
//映射URL, contextPath + /* 路径下的请求都会交给dispatcher处理
context.addServletMapping("/*", "dispatcher");

try{
tomcat.start();
tomcat.getServer().await();
} catch (LifecycleException e) {
e.printStackTrace();
}
}
}

DispatcherServlet自定义请求处理中转类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
*
* 处理rpc请求中转类
*
* @author ldy
* @date 2025/12/15 23:54
*
*/
public class DispatcherServlet extends HttpServlet {
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
//为方便扩展,这里不直接处理请求,而是调用请求处理类来进行处理
ServerHandler serverHandler = new ServerHandler();
try {
serverHandler.handle(req, resp);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

ServerHandler请求处理类

具体定义了如何去处理请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
*
* 请求处理实现类
*
* @author ldy
* @date 2025/12/15 23:57
*
*/
public class ServerHandler {
public void handle(HttpServletRequest req, HttpServletResponse resp) throws Exception {
//处理请求的目的就是调用远程服务提供的方法,
// 所以消费者必须提供要调用的接口名称、方法名称、方法参数类型、方法参数名称
Invocation invocation = (Invocation) new ObjectInputStream(req.getInputStream()).readObject();

//通过接口名称,获取要调用的类的实例
Object instance = ServerRegister.getInstance(invocation.getInterfaceName());
//通过反射调用目标方法
Method method = instance.getClass().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
method.setAccessible(true);
Object res = method.invoke(instance, invocation.getParameters());

//将结果写入到输出流中
try(ServletOutputStream outputStream = resp.getOutputStream();){
ObjectOutputStream oos = new ObjectOutputStream(resp.getOutputStream());
oos.writeObject(res);
}

}
}

Invocation通用请求调用类

规定了通信的内容。即要明确远程调用什么类、它的什么方法、它的参数类型是什么、参数是什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
*
* 请求调用类
*
* @author ldy
* @date 2025/12/16 00:00
*
*/
@Data
@AllArgsConstructor
public class Invocation implements Serializable {
/**
* 接口名称
*/
private String interfaceName;

/**
* 方法名称
*/
private String methodName;

/**
* 方法参数类型数组
*/
private Class[] parameterTypes;

/**
* 方法参数
*/
private Object[] parameters;
}

ServerRegister服务方法注册器

服务启动时,将提供的方法对象的实例注册到该注册器中,当需要远程过程调用时,即可通过反射的方式去调用相应的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
*
* 服务注册类
*
* @author ldy
* @date 2025/12/16 00:05
*
*/
public class ServerRegister {
//使用HashMap存储调用的类名和实现类的实例
private static HashMap<String, Object> serverMaps = new HashMap<>();

/**
* 注册服务
* @param serverName 服务名称
* @param instance 服务实例
*/
public static void register(String serverName, Object instance){
if (serverMaps.containsKey(serverName)) {
System.out.println(serverName + "存在相同名称的服务已注册");
return;
}
serverMaps.put(serverName, instance);
}

public static Object getInstance(String serverName) throws Exception {
Object o = serverMaps.get(serverName);
return Optional.ofNullable(o)
.orElseThrow(() -> new Exception("服务实例不存在"));
}
}

HttpClient客户端请求类

定义了客户端发送远程调用请求的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
*
* http客户端
*
* @author ldy
* @date 2025/12/16 00:31
*
*/
public class HttpClient {
/**
* 发起http请求的远程调用
* @param hostName 域名
* @param port 端口
* @param invocation 请求参数
* @return json序列化结果
*/
public String send(String hostName, Integer port, Invocation invocation) throws Exception {
URL url = new URL("http", hostName, port, "/");

HttpURLConnection urlConnection = (HttpURLConnection)url.openConnection();
urlConnection.setRequestMethod("POST");
urlConnection.setDoOutput(true);

//配置请求体
OutputStream os = urlConnection.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(invocation);
oos.flush();
oos.close();

//获取相应内容
InputStream is = urlConnection.getInputStream();
ObjectInputStream ois = new ObjectInputStream(is);
Object o = ois.readObject();

return JSONUtil.toJsonStr(o);
}
}

实现ProviderApplication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
*
* 服务提供者启动器
*
* @author ldy
* @date 2025/12/15 23:43
*
*/
public class ProviderApplication {
public static void main(String[] args) {
//注册远程调用提供的方法
ServerRegister.register(HelloServer.class.getSimpleName(), new HelloServerImpl());

//通过rpc框架, 启动Tomcat 服务
HttpServer httpServer = new HttpServer();
httpServer.start("localhost", 8080);
}
}

实现ConsumerApplication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
*
* 消费者启动器
*
* @author ldy
* @date 2025/12/15 23:43
*
*/
public class ConsumerApplication {
public static void main(String[] args) throws Exception {
//远程调用Provider提供的方法
HttpClient httpClient = new HttpClient();
Invocation invocation = new Invocation(
HelloServer.class.getSimpleName(),
"sayHello",
new Class[]{String.class},
new Object[]{"ldy"});

String res = httpClient.send("localhost", 8080, invocation);
System.out.println(res);
}
}

高级功能实现

高级功能的实现代码,在上面代码的基础上有较大改动,只贴出部分关键代码。完整源码上传至github。

一、动态代理

在上基础功能中,我们本地服务在调用远程服务时,需要消费者主动调用HttpClient的方法,并手动封装Invocation。这显然不够方便。所以,为了能够像OpenFeign一样,只需要调用接口的方法就好了。我们这里就可以使用动态代理的方式,给接口创建一个动态代理的实现类。

在Rpc框架下新建proxy包。创建一个ProxyHelper类来帮助我们创建代理对象。在代理类内部疯转Invocation对象,并调用HttpClient发起调用远程调用请求。这样子,消费者只需要通过该类创建接口的代理对象,然后直接调用方法就可以获取到结果了。

下面代码是完整的代码,包含了负载均衡和服务降级的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
*
* 获取代理对象的通用方法
*
* @author ldy
* @date 2025/12/20 16:38
*
*/
@Slf4j
public class ProxyHelper {



/**
* 获取代理对象
* @param interfaceClass 目标接口
* @return 代理对象
* @param <T>
*/
public static <T> T getProxyInstance(Class<T> interfaceClass) throws InstantiationException, IllegalAccessException {
Object o = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

//生成请求调用类
Invocation invocation = new Invocation(
interfaceClass.getName(),
method.getName(),
method.getParameterTypes(),
args);


//服务发现
List<URL> urLs = MapRemoteRegister.getURLs(interfaceClass.getName());

//json字符串,存放远程调用返回的结果
String jsonStr = null;
//服务重试
int max = 3;
while(max > 0){
//负载均衡
URL url = LoadBalance.getRandom(urLs);

try{
//远程调用
jsonStr = HttpClient.send(url.getHost(), url.getPort(), invocation);
break;
}catch(Exception e){
log.info("第{}次--服务器{}:{}的{}类的{}方法调用异常", (4-max), url.getHost(), url.getPort(), interfaceClass.getName(), method.getName());
System.out.println(url.getHost() + ":" + url.getPort() + ",远程调用异常" + (4 - max));
}
max --;
if(max == 0){
//降级处理
Object callBack = CallBackRegister.getCallBack(interfaceClass.getName());
//如果降级类存在,则通过反射调用降级类的方法直接返回即可
if(callBack != null){
return method.invoke(callBack, args);
}
}
}

if(jsonStr == null){
System.out.println("远程调用失败,达到最大重试次数,且无降级处理类");
}

Class<?> returnType = method.getReturnType();
//如果返回值类型就是字符串,则无需进行json反序列化
if(returnType == String.class){
return jsonStr;
}
Object res = JSONUtil.toBean(jsonStr, returnType);
return res;
}
});
return (T) o;
}
}

二、服务注册

当前代码中,消费者调用远程服务需要知道服务提供者的URL信息才行,当存在多个提供相同服务的实例时,消费者就不好确定给哪个提供者发起请求,且管理提供者也很麻烦,所以我们需要引入一个注册中心,这样子消费者就无需关心提供者的URL信息,只需要知道注册中心的URL,由注册中心返回给消费者服务提供者的URL信息,从而减轻消费者的维护负担。

一个注册中心主要维护的就是服务和提供服务实例的映射关系。一般来说会使用redis或者zookeeper这种分布式数据管理中间件来实现。但这里为了方便实现。直接通过本地文件充当注册中心的方式来实现。

首先创建一个远程注册服务的类,该类提供了将服务全类名和服务提供者的URL信息注册到注册中心(本地文件)的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
*
* 注册服务信息到注册中心的类
* 以Map实现
*
* @author ldy
* @date 2025/12/20 17:24
*
*/

public class MapRemoteRegister {

//key -- 服务全类名, value -- 提供服务的url信息
public static HashMap<String, List<URL>> remoteMap = new HashMap<>();

/**
* 注册服务到远程的注册中心
* @param interfaceName 服务名称
* @param url url信息
*/
public static void register(String interfaceName, URL url){
List<URL> urls = remoteMap.get(interfaceName);
if(urls == null){
urls = new ArrayList<>();
remoteMap.put(interfaceName, urls);
}
urls.add(url);
saveFile();
}

/**
* 获取服务的所有url信息
* @param interfaceName 服务名称
* @return
*/
public static List<URL> getURLs(String interfaceName){
// Set<URL> urls = remoteMap.get(interfaceName);
List<URL> urls = null;
try {
urls = getMapByFile().get(interfaceName);
} catch (Exception e) {
throw new RuntimeException(e);
}
return urls;
}

public static Map<String, List<URL>> getMapByFile() throws Exception {
try(ObjectInputStream ois = new ObjectInputStream(new FileInputStream("./temp.txt"));){
return (Map) ois.readObject();
}
}

public static void saveFile(){
try(ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("./temp.txt"));){
oos.writeObject(remoteMap);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

服务提供者只需要调用register方法即可将服务信息写入注册中心。而服务调用方可以通过getURLs方法从注册中心里获取提供对应服务的URL地址信息集合。

1
2
//注册远程调用提供的服务
ServerRegister.register(HelloServer.class.getName(), new HelloServerImpl());

三、负载均衡

负载均衡的目的是为了让消费者获取到当前可用的最不繁忙的服务提供者提供的URL信息。这部分既可以在消费者端进行,也可以直接在注册中心进行处理。因为我们的注册中心是存放在本地文件的,所以负载均衡的部分就交给消费者端调用了。

首先创建一个负载均衡的工具类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
*
* 负载均衡实现类
*
* @author ldy
* @date 2025/12/20 18:15
*
*/
public class LoadBalance {
/**
* 获取随机的URL
* @return
*/
public static URL getRandom(List<URL> urls){

Random random = new Random();
int target = random.nextInt(urls.size());
return urls.get(target);
}
}

然后在创建动态代理类的方法上添加获取url地址的方法。完整代码看动态代理部分。

1
2
3
4
//服务发现
List<URL> urLs = MapRemoteRegister.getURLs(interfaceClass.getName());
//负载均衡
URL url = LoadBalance.getRandom(urLs);

四、服务降级

为了保证当远程服务不可用时有一个兜底策略。我们需要再创建一个注册降级服务的类,当远程服务不可用时,就在调用本地对该接口的实现类的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
*
* 服务降级注册类
*
* @author ldy
* @date 2025/12/20 18:49
*
*/
public class CallBackRegister {
//降级处理类集合
private static Map<String, Object> callBackMap = new HashMap<>();

//注册降级类
public static void register(String interfaceName, Object instance){
callBackMap.put(interfaceName, instance);
}

//获取降级类
public static Object getCallBack(String interfaceName){
return callBackMap.get(interfaceName);
}
}

然后在消费者端创建接口的实现类并注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
*
* HelloServer的降级处理类
*
* @author ldy
* @date 2025/12/20 18:52
*
*/
public class HelloServerCallBack implements HelloServer {
@Override
public Person sayHello(String msg) {
return new Person("down");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class ConsumerApplication {
public static void main(String[] args) throws Exception {

//注册降级处理类
CallBackRegister.register(HelloServer.class.getName(), new HelloServerCallBack());

//获取HelloService的代理对象,远程调用的细节交给代理对象进行处理
HelloServer proxyInstance = ProxyHelper.getProxyInstance(HelloServer.class);
Person res = proxyInstance.sayHello("你好,我是ldy");
System.out.println(res.toString());
}
}

同时,也要在动态代理部分检测降级类是否存在,当服务重试次数到达上限时,就可用通过调用降级类来实现业务兜底,而不是直接抛出异常。

画个大饼

在rpc框架基础上,开发一个springboot-rpc-starter依赖。使用springboot自动配置特性、自定义注解、自定义配置和利用redis实现注册中心的访问。