阅读RocketMQ源码(2) - 基础通信模块

基础通信模块

依赖netty实现,引入了netty-tcnative

协议内容

  • 4byte msg length
  • 1byte serialization type
  • 3byte header length
  • header
  • body

序列化/反序列化

RemotingCommand类的encode()方法中可以看到序列化的过程

  • RocketMQSerializable, 序列化为bytes, 用ByteBuffer实现
  • RemotingSerializable, 序列化为Json, 用fastjson实现

在获取selector时针对Linux特殊处理, 尝试获取sun.nio.ch.EPollSelectorProvider, 笔者使用macOS装有Java1.8 HotSpotJVM, 使用了sun.nio.ch.KQueueSelectorProvider

通信方式

rocketmq_design_3

NettyRemotingAbstract类中实现了三种通信方式

  • sync
  • async
  • oneway

异步请求的流程

rocketmq_design_5

  1. client Assemle remotingCommand and generate callbacks (MQClientAPIImpl.sendMessage())

  2. client create a chennel for communication based on addr (NettyRemotingClient.getAndCreateChannel())

  3. client Acquisition semaphore (NettyRemotingAbstract.invokeAsyncImpl())

  4. client give opaque access to ResponseFuture (NettyRemotingAbstract.invokeAsyncImpl())

  5. client call netty’s channel’s writeAndFlush (NettyRemotingAbstract.invokeAsyncImpl())

serverHandler: receive and process message

server: the processMessageReceived method process

server: get the corresponding processor according to the business code

server: the business processor executes and gets the responds result

server: determines whether the request is one-way

server: set opaque to the response and send it to the client side

clientHandler: read the message that netty receives and process

client: get responseFuture based on opaque

client: execute the callback method

tips

获取当前os版本

1
System.getProperty("os.name");

reference

2-通信机制 at master · apache/rocketmq

RocketMQ——通信协议 - 薛定谔的风口猪