RocketMQ 发送消息(一条消息从发送到存储的过程)

目录:

  • 前言
  • quickStart
  • 单刀直入
  • Remoting 模块发送消息实现
  • 如何处理返回值
  • Broker Server 处理消息流程

前言

RocketMQ 目前在国内应该是比较流行的 MQ 了,楼主目前也在使用中,今天借着本文,理理 RocketMQ 发送一条消息到存储一条消息的过程。

注意:本文主线是发送到存储,因此,阅读源码时,其他和这条线相关度不高的代码,会酌情阅读。另外,本文的目的是为了看清一条消息是如何被发出且被存储的,代码中,关于 MQ 文件系统的优化,设计等,并不会花很多篇幅介绍。

quickStart

来自官方源码 example 的一段发送代码:

1
2
3
4
5
6
7
8
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();

Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);

producer.shutdown();

单刀直入

我们直接看看 send 方法。

send 方法会设置一个默认的 timeout, 3 秒。
默认使用 SYNC 模式,另外有 Async 和 OneWay 模式。
我们需要处理方法签名中的 Client 端的异常,网络异常,Broker 端的异常,线程中断异常。

DefaultMQProducerImpl # sendDefaultImpl 方法就是发送的主要逻辑。

这端代码里,有个有趣的地方,可以提一下,关于更新故障时间的策略,RMQ 有一个类 MQFaultStrategy,用来处理 MQ 错误,然后对 MQ Server 进行服务降级。

对照图:

这个策略具体内容:如果发送一条消息在 550 毫秒以内,那么就不用降级,如果 550 毫秒以外,就进行容错降级(熔断)30 秒,以此类推。
再看 DefaultMQProducerImpl # sendKernelImpl 发送到内核的方法实现。

先找到 broker 的地址。尝试压缩大于 4M 的消息(批量消息不压缩)。执行各种钩子。构造 Request 对象(存放数据),Context 上下文对象(存放调用上下文)。

这里会设置一个消息生成时间,即 bornTimestamp。后面使用消息轨迹的时候,可以查看。

最后,如果是 SYNC 模式,就调用 MQClientAPIImpl 来发送消息,这一层还是在 Client 模块里,在这一层,会设置更详细的消息细节,构造命令对象。最后调用 remotingClient # invokeSync 发送消息。

注意,在 MQClientAPIImpl # sendMessage 这一层,会给命令对象设置一个 CmdCode,叫 SEND_MESSAGE,这个东西就是一个和 Broker 的契约,Broker 会根据这个 Code 进行不同的策略。另外,如果这里用 RPC 的方式,例如,使用一个接口的抽象方法,然后 Broker 对抽象方法进行 RPC 调用,这样可不可以呢?

最后,看看 remotingClient # invokeSync 是如何实现的。

Remoting 模块发送消息实现

invokeSync 方法首先执行 RPCBefore 钩子,类似 Spring 的各种 Bean 扩展组件,然后就是对超时进行判断。可以看到,每个方法几乎都有对超时的判断,超时判断和超时处理在分布式场景非常重要。

然后根据 addr 找到对应的 Socket Channel。然后执行 invokeSyncImpl 方法。

这里其实和其他大部分的 RPC 框架都是类似的了,生产一个永远自增的 Request ID,创建一个 Feature 对象,和这个 ID 绑定,方便 Netty 返回数据对这个 ID 对应的线程进行唤醒。
然后调用 Netty 的 writeAndFlush 方法,将数据写进 Socket,同时添加一个监听器,如果发送失败,唤醒当前线程。

发送完毕之后,当前线程进行等待,使用 CountDownLatch.wait 方法实现,当 Netty 返回数据时,使用 CountDownLatch.countDown 进行唤醒,然后返回从 Broker 写入的结果,可能成功,也可能失败,需要到上层(Client 层)解析,网络层只负责网络的事情。

我们知道, Netty 会使用 Handler 处理出去的数据和返回的数据,我们看看 Client 端 Netty 有哪些 Handler.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});

我们看到,这里使用了一个 Encoder,Decoder,空闲处理器,连接管理器,ClientHandler。

XXCoder 就是对 Cmd 对象进行序列化和反序列化的。这里的空闲使用的读写最大空闲时间为 120s,超过这个,就会触发空闲事件。RMQ 就会关闭 Channel 连接。而针对空闲事件进行处理的就是连接管理器了。

连接管理器处理空闲、Close、Connect、异常等事件,使用监听器模式,不同的监听器对不同的事件进行处理。另外,这里也许可以借鉴 EventBus,每个事件可以设置多个监听器。

如何处理返回值

我们看了 RMQ 中 Netty 的设计,再看看返回值处理就简单了,NettyClientHandler 会在 channelRead0 方法处理 Netty Server 的返回值。对应 RMQ,则是 processMessageReceived 方法。该方法很简洁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}

其实,这是一个模板方法,固定算法,由子类实现,分为 Request 实现和 Response 实现。我们看看 Response 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
// 找到 Response .
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);

responseTable.remove(opaque);

if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {// 返回结果
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}

这里,通过 cmd 对象的 Request ID 找到 Feature,执行 responseFuture.putResponse,设置返回值,唤醒阻塞等待的发送线程。这里还有一个 release 调用,这个和异步发送有关,默认最大同时 65535 个异步请求,具体就不展开了。

好,到这里,唤醒阻塞的发送线程,返回数据,客户端层面的发送就结束了,我们小结一下。根据模块层次,我们记录一下 sendMessage 的过程:

层次还是比较清晰的。

我们再来看看 Server 端如何处理一条消息的。

Broker Server 处理消息流程

从哪里入手呢?

我们上面看源码,看到有个 SEND_MESSAGE Code,是 Client 和 Broker Server 的一个约定代码,我们看看这个代码在哪里用的。

在 broker 模块的 BrokerController 类中,有个 registerProcessor 方法,会将 SEND_MESSAGE Code 和一个 SendMessageProcessor 对象绑定。

这一步我们停一下,再去看看 netty Server 端的 Handler。

NettyRemotingServer 是处理 Request 的类,他的 ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler 处理器,这个处理器的 channelRead0 方法会调用 NettyRemotingServer 的父类 processMessageReceived 方法。

这个方法会从 processorTable 里,根据 Cmd Code,也就是 SEND_MESSAGE 获取对应的 Processor, Processor 由 2 部分组成,一部分是处理数据的对象,一部分是这个对象所对应的线程池。用于异步处理逻辑,防止阻塞 Netty IO 线程。

关键代码:

1
2
3
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);// 处理.
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

前后都是执行一些钩子,例如 ACL 啥的。

这里我们小结一下,RMQ 会有一个 BrokerController 类,会注册 Code 和 Processor 的绑定关系,BrokerController 也会把这些绑定,注册到 Netty Server 中,当 Netty Server 从 Socket 收到 Cmd 对象,根据 Cmd 对象的 Code,就可以找到对应 Processor 类,对数据进行处理。

中间是处理 Request 请求的。这个 processRequest 方法,有很多的实现,如下图,我们主要看 SendMessageProcessor 的实现。

SendMessageProcessor # sendMessage 是处理消息的主要逻辑。

关键代码:
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

消息存储引擎,这里我们看 DefaultMessageStore 的 putMessage 实现。

首先一堆校验。注意,其中有一个地方:

1
2
3
if (this.isOSPageCacheBusy()) {// 检查 mmp 忙不忙.
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

由于 RMQ 写数据是王 PageCache 里面写的,因此,如果写的慢,就是 PageCache 忙,这里忙的标准是,如果锁文件的时间,超过了 1 秒,那就是忙。

最后调用 PutMessageResult result = this.commitLog.putMessage(msg) 写数据。

如果耗时超过 500 毫秒,就会打印日志。这样我们排查问题的时候,可以看看 storeStats 的日志。

看看 commitLog 的 putMessage 方法实现。

先拿到最新的 MappedFile 文件,MappedFile 文件的命名是用 offset 命名的,一个文件默认 1gb,这个大小和 mmp 的机制有关,通常不能过大。

然后上锁,这段代码是可以说整个 RMQ Server 的热点区域,

这里上锁会记录上锁的时间,方便前面做 PageCache Busy 的判断。

写入代码:

result = mappedFile.appendMessage(msg, this.appendMessageCallback)

写完之后,释放锁,如果超过 500 毫秒,打印 cost time 日志。

统计。

处理刷盘和slave 同步,这里看刷盘策略和同步策略,是 SYNC 还是 ASYNC。

经过我的测试,同步刷盘和异步刷盘的性能差距是 10 倍。
而 Slave 的数据同步,如果用 SYNC 模式,tps 最高也就 2000 多一丢度,为什么?内网,两台机器 ping 一下都要 0.2 毫秒,一秒最多 5000 次,再加上处理逻辑, 2000 已经到顶了,网络成了瓶颈。

如果用全异步的话,我的 4c8g 的机器,单机 tps 最高能 2 万多。美滋滋。

跑题了。

我们看看 mappedFile.appendMessage 方法的实现。

一路追踪,有个关键逻辑, 在 appendMessagesInner 里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int currentPos = this.wrotePosition.get();

if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
// 写数据到 缓存
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}

代码中,使用了 mappedFile 从 Linux 映射的 MMap buffer,对数据进行写入。我们看看 doAppend 方法。

我们可以看到,一条消息有太多的内容:
总长度、魔数、CRC 校验、队列 ID、各种 flag、存储时间,物理 offset、存储 IP、时间戳、扩展属性等等。

最终,这条消息会被写入到 MMap 中。

那什么时候刷盘呢?

如果是 SYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,就会立刻刷盘并等待刷盘结果。
如果是 ASYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,会通知异步线程进行刷盘,但不等待结果。

另外,如果没有新数据,则为 500ms 执行一次刷盘策略。

简单说下异步刷盘:

  1. 默认刷盘 4 页,Linux 一页是 4kb 数据,4页就是 16kb。
  2. 如果写的数据减去已经刷的数据,剩下的数据大于等于 4 页,就执行刷盘.
  3. 执行 mappedByteBuffer.force() 或者 fileChannel.force(false);

我们这里小结一下,看看 RMQ Server 处理处理一条消息的:

总结

来张大图总结一下。

篇幅有限,下篇再一起看看 RMQ 如何消费消息。


RocketMQ 发送消息(一条消息从发送到存储的过程)
http://thinkinjava.cn/2019/05/08/2019/2019-05-09-rmq-sendmsg/
作者
莫那·鲁道
发布于
2019年5月8日
许可协议