RPC - 基于 SPI 机制实现一个简单的 RPC 案例

为了彻底理解 RPC(远程过程调用)的底层原理,我们这里不使用任何第三方 RPC 框架(如 Dubbo 或 Spring Cloud),而是使用 SPI 机制加载我们自己写的 HTTP 和 netty 实现。HTTP 服务使用内置的 Tomcat,通过最原始的 HTTP 请求 + Java 动态代理 + 反射,纯手工打造一个迷你版 RPC 框架。整个案例分成三个模块,公共模块 demo-common,服务提供者 demo-provider,以及服务的调用者 demo-consumer。下面是这三个模块的完整实现代码。


demo-common

依赖包:

1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.18.0</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

接口定义:

1
2
3
public interface UserService {
String sayHello(String username);
}

公共类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 远程调用入参信息封装
*/
@Data
@AllArgsConstructor
public class Invocation implements Serializable {
private String interfaceName;
private String version;
private String methodName;
private Class[] paramType;
private Object[] params;
}

/**
* 远程注册中心元信息封装
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TargetServiceInfo implements Serializable {
private String hostname;
private Integer port;
}

公共组件 - 远程注册中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class RemoteMapRegister {
private static final Map<String, List<TargetServiceInfo>> REGISTER = new HashMap<>();

public static void register(String interfaceName, String version, TargetServiceInfo info) {
List<TargetServiceInfo> list = REGISTER.get(interfaceName + version);
if (list == null) {
list = new ArrayList<>();
}
list.add(info);
REGISTER.put(interfaceName + version, list);
flushToFile(); // 由于跨进程获取信息,需要模拟存储,这里将注册中心公共信息存储到本地文件
}

public static List<TargetServiceInfo> get(String interfaceName, String version) {
if (REGISTER.isEmpty()) {
getFromFile();
}
return REGISTER.get(interfaceName + version);
}

private static void flushToFile() {
try (FileOutputStream fos = new FileOutputStream("/tmp/targetServiceInfos.txt");
ObjectOutputStream objectOutputStream = new ObjectOutputStream(fos)) {
objectOutputStream.writeObject(REGISTER);
objectOutputStream.flush();
} catch (Exception e) {
e.printStackTrace();
}
}

private static void getFromFile() {
try (FileInputStream fin = new FileInputStream("/tmp/targetServiceInfos.txt");
ObjectInputStream objectInputStream = new ObjectInputStream(fin)) {
Map<String, List<TargetServiceInfo>> mm = (Map) objectInputStream.readObject();
REGISTER.putAll(mm);
} catch (Exception e) {
e.printStackTrace();
}
}
}


demo-provider

依赖包:

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>com.owlias.janus</groupId>
<artifactId>demo-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>10.1.24</version>
</dependency>
</dependencies>

服务提供者启动类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import com.owlias.janus.framework.Transporter;
import com.owlias.janus.framework.register.LocalRegister;
import com.owlias.janus.register.RemoteMapRegister;
import com.owlias.janus.register.TargetServiceInfo;
import com.owlias.janus.service.UserService;
import com.owlias.janus.service.UserServiceImpl01;
import com.owlias.janus.service.UserServiceImpl02;
import java.util.ServiceLoader;

public class ProviderApp {
public static void main(String[] args) {
// 本地注册
LocalRegister.register(UserService.class.getName(), "1.0", UserServiceImpl01.class);
LocalRegister.register(UserService.class.getName(), "2.0", UserServiceImpl02.class);

// 在远程注册中心注册服务
TargetServiceInfo targetServiceInfo = new TargetServiceInfo("localhost", 8080);
RemoteMapRegister.register(UserService.class.getName(),"1.0", targetServiceInfo);
RemoteMapRegister.register(UserService.class.getName(),"2.0", targetServiceInfo);

//【SPI】动态寻找并加载当前类路径下被档案激活的 RpcServer 实现类
String protocolName = "http"; // System.getProperty("protocolName");
Transporter targetServer = null;
ServiceLoader<Transporter> serviceLoader = ServiceLoader.load(Transporter.class);
for (Transporter server : serviceLoader) {
if (server.support(protocolName)) {
targetServer = server;
break;
}
}
if (targetServer != null) {
targetServer.start("localhost", 8080);
} else {
throw new IllegalArgumentException("[SPI 错误] 未找到任何支持协议扩展名为 [" + protocolName + "] 的通信插件,请检查 META-INF/services 档案配置!");
}
}
}

服务的接口实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UserServiceImpl01 implements UserService {
@Override
public String sayHello(String username) {
return "hello version01: " + username;
}
}


public class UserServiceImpl02 implements UserService {
@Override
public String sayHello(String username) {
return "hello version02: " + username;
}
}

本地服务注册封装类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 本地服务注册封装类,实际会监控和同步远程注册中心的数据
*/
public class LocalRegister {
private static final Map<String, Class<?>> map = new HashMap<>();

public static void register(String interfaceName, String version, Class<?> implClass) {
map.put(interfaceName + version, implClass);
}

public static Class<?> get(String interfaceName, String version) {
return map.get(interfaceName + version);
}
}

定义统一的容器抽象接口,用于SPI的加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @author KJ
* @description 定义统一的容器抽象接口(RPC 顶层规范),用于SPI的加载
*/
public interface Transporter {

/**
* 判断当前插件是否支持该协议别名 (如 "http", "netty")
*/
boolean support(String protocolName);

/**
* 启动远程通信容器
* @param hostname 绑定主机
* @param port 绑定端口
*/
void start(String hostname, Integer port);
}

在 demo-provider(或者专门的插件 Jar 包)的 resources 资源目录下,严格按照以下路径创建文件夹和文件:

  • 创建目录:src/main/resources/META-INF/services/
  • 在该目录下创建一个文本文件,文件名必须是接口的全限定名:com.owlias.janus.framework.Transporter
  • 在文件内容里,写上你想要激活的具体实现类的全限定名(一行一个)
1
2
com.owlias.janus.framework.http.HttpServer
com.owlias.janus.framework.netty.NettyServer

完成了 SPI 改造后,我们的主应用启动类将升华到真正的微内核、框架级高度。它不再和 Tomcat 或 Netty 发生任何强耦合!下面是两个 Transporter server 的实现:

HttpServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import com.owlias.janus.framework.Transporter;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.Server;
import org.apache.catalina.Service;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.core.StandardEngine;
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.startup.Tomcat;

/**
* Tomcat 底层的物理组织结构:
* Server (服务器) ──> Service (服务) ──> Engine (引擎) ──> Host (虚拟主机) ──> Context (Web应用) ──> Wrapper (Servlet)
*/
public class HttpServer implements Transporter {

@Override
public boolean support(String protocolName) {
return "http".equalsIgnoreCase(protocolName);
}

@Override
public void start(String hostname, Integer port) {
// 创建顶层门面引导对象。Tomcat 类是方便开发者快速配置的工具类
Tomcat tomcat = new Tomcat();

// 从引导对象中获取最高级别的顶级组件:Server(代表整个 Tomcat 服务器实例)
Server server = tomcat.getServer();

// 从 Server 中捞出默认的名字叫 "Tomcat" 的 Service 组件。
Service service = server.findService("Tomcat");

// Engine 是 Servlet 容器的最高层(也就是常说的 Container 顶层),负责接收处理 Service 丢过来的所有连接请求
StandardEngine engine = new StandardEngine();
engine.setDefaultHost(hostname);

// 一个引擎下可以有多个 Host(比如一台物理机同时挂载了 www.aaa.com 和 www.bbb.com 两个域名)
StandardHost host = new StandardHost();
host.setName(hostname);
engine.addChild(host);

// 实例化标准上下文组件(在 Servlet 规范里,一个 Context 就代表一个独立的 Web 应用)
String contextPath = "";
StandardContext context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());
host.addChild(context);

// 创建一个网络连接器 Connector,负责监听指定端口并处理物理网络 Socket 连接
Connector connector = new Connector();
connector.setPort(port);
service.setContainer(engine);
service.addConnector(connector);

// 向根上下文注册我们处理 RPC 核心路由的 Servlet
tomcat.addServlet(contextPath, "dispatcher", new DispatcherServlet());

// 配置 Servlet 的 URL 匹配规则,"/*" 代表这个 Tomcat 拦截下的所有网络请求,全部无条件转给上面的 dispatcher
context.addServletMappingDecoded("/*", "dispatcher");

try {
// 点火启动,顺着组件树生命周期,自上而下顺藤摸瓜启动所有内置组件(Connector、Engine、Host、Context)
tomcat.start();
// 强行让 Server 级服务进入 await 阻塞状态。如果不加这一行,主线程执行完 start() 后就会瞬间退出,Java 进程闪退。
tomcat.getServer().start();
} catch (LifecycleException e) {
e.printStackTrace();
}
}
}

DispatcherServlet:

1
2
3
4
5
6
7
8
9
10
11
12
13
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;

public class DispatcherServlet extends HttpServlet {

@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
new HttpServletHandler().handler(req, resp);
}
}

HttpServletHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.owlias.janus.model.Invocation;
import com.owlias.janus.framework.register.LocalRegister;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import java.io.ObjectInputStream;
import java.lang.reflect.Method;

public class HttpServletHandler {
public void handler(HttpServletRequest req, HttpServletResponse resp) {
try {
Invocation invocation = (Invocation) new ObjectInputStream(req.getInputStream()).readObject();
String interfaceName = invocation.getInterfaceName();
String version = invocation.getVersion();

Class<?> classImpl = LocalRegister.get(interfaceName, version);
Method method = classImpl.getMethod(invocation.getMethodName(), invocation.getParamType());

String result = (String) method.invoke(classImpl.getConstructor().newInstance(), invocation.getParams());
IOUtils.write(result, resp.getOutputStream());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

NettyServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class NettyServer implements Transporter {

@Override
public boolean support(String protocolName) {
return "netty".equalsIgnoreCase(protocolName);
}

@Override
public void start(String hostname, Integer port) {
System.out.println("[SPI 插件] 正在通过高性能 Netty (Epoll) 异步双工通道启动服务...");
// ... 编写你的 Netty ServerBootstrap 逻辑
}
}


demo-comsumer

调用者服务启动测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.owlias.janus.framework.HttpClient;
import com.owlias.janus.model.Invocation;
import com.owlias.janus.framework.LoadBalance;
import com.owlias.janus.register.RemoteMapRegister;
import com.owlias.janus.register.TargetServiceInfo;
import com.owlias.janus.service.UserService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;

public class ConsumerApp {
public static void main(String[] args) {
/*HttpClient httpClient = new HttpClient();
Invocation invocation = new Invocation(UserService.class.getName(),
"2.0",
"sayHello",
new Class[]{String.class},
new Object[]{"zhangsan"});
String result = httpClient.send("localhost", 8080, invocation);
System.out.println(result);*/

// 生成代理对象
UserService userService = (UserService) Proxy.newProxyInstance(UserService.class.getClassLoader(), new Class[]{UserService.class}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 从注册中心获取服务列表
List<TargetServiceInfo> serviceList = RemoteMapRegister.get(UserService.class.getName(), "2.0");
// 通过负载均衡路由到一台实际的服务节点
TargetServiceInfo targetService = LoadBalance.random(serviceList);

// 实际调用远程的服务接口
HttpClient httpClient = new HttpClient();
Invocation invocation = new Invocation(UserService.class.getName(),
"2.0",
"sayHello",
new Class[]{String.class},
new Object[]{"zhangsan"});
return httpClient.send(targetService.getHostname(), targetService.getPort(), invocation);
}
});

// 使用代理对象获取结果
String result = userService.sayHello("zhangsan");
System.out.println(result);
}
}

调用端负载均衡器:

一般的 RPC 服务,负载均衡器通常放在调用端,如果放在服务提供端的话,还需要 nginx 或其他前置组件。

1
2
3
4
5
6
7
8
9
10
import com.owlias.janus.register.TargetServiceInfo;
import java.util.List;
import java.util.Random;

public class LoadBalance {
public static TargetServiceInfo random(List<TargetServiceInfo>list) {
int index = new Random().nextInt(list.size());
return list.get(index);
}
}

调用工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.owlias.janus.model.Invocation;
import org.apache.commons.io.IOUtils;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;

public class HttpClient {
public String send(String hostname, Integer port, Invocation invocation) {
try {
URL url = new URL("http", hostname, port, "/");
HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();

httpUrlConnection.setRequestMethod("POST");
httpUrlConnection.setDoOutput(true);

OutputStream outputStream = httpUrlConnection.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(outputStream);
oos.writeObject(invocation);
oos.flush();
oos.close();

InputStream inputStream = httpUrlConnection.getInputStream();
String result = IOUtils.toString(inputStream);
return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}