SOFA-源码分析-——-服务发布过程

## 前言

SOFA 包含了 RPC 框架,底层通信框架是 bolt ,基于 Netty 4,今天将通过 SOFA—RPC 源码中的例子,看看他是如何发布一个服务的。

示例代码

下面的代码在 com.alipay.sofa.rpc.quickstart.QuickStartServer 类下。

1
2
3
4
5
6
7
8
9
10
11
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(9696) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程

ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端

providerConfig.export(); // 发布服务

首先,创建一个 ServerConfig ,包含了端口,协议等基础信息,当然,这些都是手动设定的,在该类加载的时候,会自动加载很多配置文件中的服务器默认配置。比如 RpcConfigs 类,RpcRuntimeContext 上下文等。

然后呢,创建一个 ProviderConfig,也是个 config,不过多继承了一个 AbstractInterfaceConfig 抽象类,该类是接口级别的配置,而 ServerConfig 是 服务器级别的配置。虽然都继承了 AbstractIdConfig。

ProviderConfig 包含了接口名称,接口指定实现类,还有服务器的配置。

最后,ProviderConfig 调用 export 发布服务。

展示给我的 API 很简单,但内部是如何实现的呢?

在看源码之前,我们思考一下:如果我们自己来实现,怎么弄?

RPC 框架简单一点来说,就是使用动态代理和 Socket。

SOFA 使用 Netty 来做网络通信框架,我们之前也写过一个简单的 Netty RPC,主要是通过 handler 的 channelRead 方法来实现。

SOFA 是这么操作的吗?

一起来看看。

# 源码分析

上面的示例代码其实就是 3 个步骤,创建 ServerConfig,创建 ProviderConfig,调用 export 方法。

先看第一步,还是有点意思的。

虽然是空构造方法,但 ServerConfig 的属性都是自动初始化的,而他的父类 AbstractIdConfig 更有意思了,父类有 1 个地方值得注意:

1
2
3
static {
RpcRuntimeContext.now();
}

熟悉类加载的同学都知道,这是为了主动加载 RpcRuntimeContext ,看名字是 RPC 运行时上下文,所谓上下文,大约就是我们人类聊天中的 “老地方” 的意思。

这个上下文会在静态块中加载 Module(基于扩展点实现),注册 JVM 关闭钩子(类似 Tomcat)。还有很多配置信息。

然后呢?创建 ProviderConfig 对象。这个类比上面的那个类多继承了一个 AbstractInterfaceConfig,接口级别的配置。比如有些方法我不想发布啊,比如权重啊,比如超时啊,比如具体的实现类啊等等,当然还需要一个 ServerConfig 的属性(注册到 Server 中啊喂)。

最后就是发布了。export 方法。

ProviderCofing 拥有一个 export 方法,但并不是直接就在这里发布的,因为他是一个 config,不适合在config 里面做这些事情,违背单一职责。

SOFA 使用了一个 Bootstrap 类来进行操作。和大部分服务器类似,这里就是启动服务器的地方。因为这个类会多线程使用,比如并发的发布服务。而不是一个一个慢慢的发布服务。所以他不是单例的,而是和 Config 一起使用的,并缓存在 map 中。

ProviderBootstrap 目前有 3 个实现:Rest,Bolt,Dubbo。Bolt 是他的默认实现。

export 方法默认有个实现(Dubbo 的话就要重写了)。主要逻辑是执行 doExport 方法,其中包括延迟加载逻辑。

而 doExport 方法中,就是 SOFA 发布服务的逻辑所在了。

楼主将方法的异常处理逻辑去除,整体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private void doExport() {
if (exported) {
return;
}
String key = providerConfig.buildKey();
String appName = providerConfig.getAppName();
// 检查参数
checkParameters();
// 注意同一interface,同一uniqleId,不同server情况
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
if (cnt == null) { // 没有发布过
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
int c = cnt.incrementAndGet();
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
// 超过最大数量,直接抛出异常
}
// 构造请求调用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 初始化注册中心
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
}
}
}
// 将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
Server server = serverConfig.buildIfAbsent();
// 注册序列化接口
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
}

// 注册到注册中心
providerConfig.setConfigListener(new ProviderAttributeListener());
register();

// 记录一些缓存数据
RpcRuntimeContext.cacheProviderConfig(this);
exported = true;
}

主要逻辑如下:

  1. 根据 providerConfig 创建一个 key 和 AppName。
  2. 检验同一个服务多次发布的次数。
  3. 创建一个 ProviderProxyInvoker, 其中包含了过滤器链,而过滤器链的最后一链就是对接口实现类的调用。
  4. 初始化注册中心,创建 Server(会有多个Server,因为可能配置了多个协议)。
  5. 将 config 和 invoker 注册到 Server 中。内部是将其放进了一个 Map 中。
  6. 启动 Server。启动 Server 其实就是启动 Netty 服务,并创建一个 RpcHandler,也就是 Netty 的 Handler,这个 RpcHandler 内部含有一个数据结构,包含接口级别的 invoker。所以,当请求进入的时候,RpcHandler 的 channelRead 方法会被调用,然后间接的调用 invoker 方法。
  7. 成功启动后,注册到注册中心。将数据缓存到 RpcRuntimeContext 的一个 Set 中。

一起来详细看看。

Invoker 怎么构造的?很简单,最主要的就是过滤器。关于过滤器,我们之前已经写过一篇文章了。不再赘述。

关键看看 Server 是如何构造的。

关键代码 serverConfig.buildIfAbsent(),类似 HashMap 的 putIfAbsent。如果不存在就创建。

Server 接口目前有 2 个实现,bolt 和 rest。当然,Server 也是基于扩展的,所以,不用怕,可以随便增加实现。

关键代码在 ServerFactory 的 getServer 中,其中会获取扩展点的 Server,然后,执行 Server 的 init 方法,我们看看默认 bolt 的 init 方法。

1
2
3
4
5
6
7
@Override
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
// 启动线程池
bizThreadPool = initThreadPool(serverConfig);
boltServerProcessor = new BoltServerProcessor(this);
}

保存了 serverConfig 的引用,启动了一个业务线程池,创建了一个 BoltServerProcessor 对象。

第一:这个线程池会在 Bolt 的 RpcHandler 中被使用,也就是说,复杂业务都是在这个线程池执行,不会影响 Netty 的 IO 线程。

第二:BoltServerProcessor 非常重要,他的构造方法包括了当前的 BoltServer,所以他俩是互相依赖的。关键点来了:

BoltServerProcessor 实现了 UserProcessor 接口,而 Bolt 的 RpcHandler 持有一个 Map<String, UserProcessor<?>>,所以,当 RpcHandler 被执行 channelRead 方法的时候,一定会根据接口名称找到对应的 UserProcessor,并执行他的 handlerRequest 方法。

那么,RpcHandler 是什么时候创建并放置到 RpcHandler 中的呢?

具体是这样的:在 server.start() 执行的时候,该方法会初始化 Netty 的 Server,在 SOFA 中,叫 RpcServer,将 BoltServerProcessor 放置到名叫 userProcessors 的 Map 中。然后,当 RpcServer 启动的时候,也就是 start 方法,会执行一个 init 方法,该方法内部就是设置 Netty 各种属性的地方,包括 Hander,其中有 2 行代码对我们很重要:

1
2
final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);
pipeline.addLast("handler", rpcHandler);

创建了一个 RpcHandler,并添加到 pipeline 中,这个 Handler 的构造参数就是包含所有 BoltServerProcessor 的 Map。

所以,总的流程就是:

每个接口都会创建一个 providerConfig 对象,这个对象会创建对应的 invoker 对象(包含过滤器链),这两个对象都会放到 BoltServer 的 invokerMap 中,而 BoltServer 还包含其他对象,比如 BoltServerProcessor(继承 UserProcessor), RpcServer(依赖 RpcHandler)。当初始化 BoltServerProcessor 的时候,会传入 this(BoltServer),当初始化 RpcServer 的时候,会传入 BoltServerProcessor 到 RpcServer 的 Map 中。在 RpcHandler 初始化的时候,又会将 RpcServer 的 Map 传进自己的内部。完成最终的依赖。
当请求进入,RpcHandler 调用对应的 UserProcessor 的 handlerRequest 方法,而该方法中,会调用对应的 invoker,invoker 调用过滤器链,知道调用真正的实现类。

而大概的 UML 图就是下面这样的:

image.png

红色部分是 RPC 的核心,包含 Solt 的 Server,实现 UserProcessor 接口的 BoltServerProcessor,业务线程池,存储所有接口实现的 Map。

绿色部分是 Bolt 的接口和类,只要实现了 UserProcessor 接口,就能将具体实现替换,也既是处理具体数据的逻辑。

最后,看看关键类 BoltServerProcessor ,他是融合 RPC 和 Bolt 的胶水类。

该类会注册一个序列化器替代 Bolt 默认的。handleRequest 方法是这个类的核心方法。有很多逻辑,主要看这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 查找服务
Invoker invoker = boltServer.findInvoker(serviceName);
// 真正调用
response = doInvoke(serviceName, invoker, request);

/**
* 找到服务端Invoker
*
* @param serviceName 服务名
* @return Invoker对象
*/
public Invoker findInvoker(String serviceName) {
return invokerMap.get(serviceName);
}

根据服务名称,从 Map 中找到服务,然后调用 invoker 的 invoker 方法。

再看看 Netty 到 BoltServerProcessor 的 handlerRequest 的调用链,使用 IDEA 的 Hierarchy 功能,查看该方法,最后停留在 ProcessTast 中,一个 Runnable.

image.png

根据经验,这个类肯定是被放到线程池了。什么时候放的呢?看看他的构造方法的 Hierarchy。

image.png

从图中可以看到 ,Bolt 的 RpcHandler 的 channelRead 最终会调用 ProcessTask 的 构造方法。

那么 BoltServer 的用户线程池什么时候使用呢?还是使用 IDEA 的 Hierarchy 功能。

image.png

其实也是在这个过程中,当用户没有设置线程池,则使用系统线程池。

总结

好了,关于 SOFA 的服务发布和服务的接收过程,就介绍完了,可以说,整个框架还是非常轻量级的。基本操作就是:内部通过在 Netty的 Handler 中保存一个存储服务实现的 Map 完成远程调用。

其实和我们之前用 Netty 写的小 demo 类似。


SOFA-源码分析-——-服务发布过程
http://thinkinjava.cn/2018/04/29/2018/2018-04-29-SOFA-源码分析-——-服务发布过程/
作者
莫那·鲁道
发布于
2018年4月29日
许可协议