基础通信模块
依赖netty实现,引入了netty-tcnative
协议内容
- 4byte msg length
- 1byte serialization type
- 3byte header length
- header
- body
序列化/反序列化
在RemotingCommand
类的encode()
方法中可以看到序列化的过程
- RocketMQSerializable, 序列化为bytes, 用ByteBuffer实现
- RemotingSerializable, 序列化为Json, 用fastjson实现
在获取selector时针对Linux特殊处理, 尝试获取sun.nio.ch.EPollSelectorProvider
, 笔者使用macOS装有Java1.8 HotSpotJVM, 使用了sun.nio.ch.KQueueSelectorProvider
通信方式
在NettyRemotingAbstract
类中实现了三种通信方式
- sync
- async
- oneway
异步请求的流程
client Assemle remotingCommand and generate callbacks (MQClientAPIImpl.sendMessage())
client create a chennel for communication based on addr (NettyRemotingClient.getAndCreateChannel())
client Acquisition semaphore (NettyRemotingAbstract.invokeAsyncImpl())
client give opaque access to ResponseFuture (NettyRemotingAbstract.invokeAsyncImpl())
client call netty’s channel’s writeAndFlush (NettyRemotingAbstract.invokeAsyncImpl())
serverHandler: receive and process message
server: the processMessageReceived method process
server: get the corresponding processor according to the business code
server: the business processor executes and gets the responds result
server: determines whether the request is one-way
server: set opaque to the response and send it to the client side
clientHandler: read the message that netty receives and process
client: get responseFuture based on opaque
client: execute the callback method
tips
获取当前os版本
1 | System.getProperty("os.name"); |
reference