1. 1. 内核IO模型
    1. 1.1. 网络包接收流程
    2. 1.2. 同步与异步, 阻塞与非阻塞
    3. 1.3. IO多路复用 TODO
      1. 1.3.1. select
      2. 1.3.2. poll
      3. 1.3.3. epoll
    4. 1.4. Reactor模型
    5. 1.5. Netty对Reactor模型的实现
      1. 1.5.1. 主ReactorGroup
      2. 1.5.2. 从ReactorGroup
  2. 2. Reactor在Netty中的实现(创建)
    1. 2.1. Netty服务端代码模板
    2. 2.2. 创建用于启动EventLoop线程的executor
    3. 2.3. 创建EventLoop
    4. 2.4. 创建Channel到EventLoop的绑定策略
  3. 3. Netty启动流程
    1. 3.1. 结构
    2. 3.2. Netty服务端的启动
  4. 4. Reactor的架构
    1. 4.1. Reactor线程的整个运行框架
    2. 4.2. Reactor线程轮询IO就绪事件
    3. 4.3. Reactor处理IO与处理异步任务的时间比例分配
    4. 4.4. Reactor线程处理IO就绪事件
    5. 4.5. Reactor线程处理异步任务
    6. 4.6. 规避JDK Epoll空轮询bug
  5. 5. Netty接收网络连接
    1. 5.1. MainReactor处理OP_ACCEPT事件
    2. 5.2. doReadMessages接收客户端连接
    3. 5.3. ChannelRead事件的响应
    4. 5.4. 向从ReactorGroup注册NioSocketChannel
  6. 6. Netty接收网络数据
    1. 6.1. SubReactor处理OP_READ事件流程总览
    2. 6.2. Netty接收网络数据流程总览
    3. 6.3. ChannelRead和ChannelReadComplete的区别
    4. 6.4. 源码核心框架总览
      1. 6.4.1. 分配DirectByteBuffer接收网络数据
    5. 6.5. ByteBuffer动态自适应括缩容机制
    6. 6.6. 使用堆外内存为ByteBuffer分配内存
  7. 7. Recycler对象池
    1. 7.1. 背景
    2. 7.2. 对象在JVM中创建和回收开销
    3. 7.3. 对象池Recycler
    4. 7.4. Recycler总体设计
      1. 7.4.1. 获取对象无锁化设计
      2. 7.4.2. 释放对象无锁化设计
    5. 7.5. Recycler实现
  8. 8. Netty发送数据流程
    1. 8.1. write方法发送数据
    2. 8.2. Flush
    3. 8.3. 发送数据
    4. 8.4. Socket写满16次未写完
    5. 8.5. 处理OP_WRITE
    6. 8.6. writeAndFlush
  9. 9. Pipeline, 详解所有 IO 事件的触发时机以及传播路径
    1. 9.1. Pipeline的创建
      1. 9.1.1. HeadContext
      2. 9.1.2. TailContext
    2. 9.2. Pipeline中的事件
      1. 9.2.1. Inbond事件
      2. 9.2.2. OutBond事件
    3. 9.3. 向Pipeline添加ChannelHandler
    4. 9.4. ChannelHandlerContext的创建
    5. 9.5. 从pipeline删除ChannelHandler
    6. 9.6. 初始化pipeline
    7. 9.7. 事件传播
  10. 10. Netty 如何处理 TCP 连接的正常关闭, 异常关闭, 半关闭场景
    1. 10.1. TCP连接正常关闭
    2. 10.2. Netty处理TCP连接正常关闭
    3. 10.3. TCP连接异常关闭
    4. 10.4. Netty对RST包的处理
    5. 10.5. TCP连接半关闭HalfClosure
    6. 10.6. 主动关闭方发起TCP半关闭
    7. 10.7. 被动关闭放处理TCP半关闭

《聊聊Netty那些事》阅读笔记

内核IO模型

网络包接收流程

接收网络数据包的详细开销

  • 网卡DMA拷贝网络数据包到内存中的开销(DMA指网卡可以在没有CPU参与的情况下独立写内存)
  • CPU收到硬中断后调用网卡驱动进行响应的开销
  • OS的内核线程ksoftirqd响应软中断的开销
  • 应用程序通过系统调用从用户态转为内核态的开销, 系统调用返回时从内核态转为用户态的开销
  • 网络数据从内存内核空间通过CPU拷贝到用户空间的开销

网络包接收过程

网络包发送过程

同步与异步, 阻塞与非阻塞

接收网络数据包的2个流程

  • 数据准备阶段: 在这个阶段, 网络数据包到达网卡, 通过DMA的方式将数据包拷贝到内存中, 然后经过硬中断, 软中断, 接着通过内核线程ksoftirqd经过内核协议栈的处理, 最终将数据发送到内核Socket的接收缓冲区
  • 数据拷贝阶段: 当数据到达内核Socket的接收缓冲区中时, 此时数据存在于内核空间中, 需要将数据拷贝到用户空间中, 才能够被应用程序读取

参考《凤凰架构》读书摘要网络IO模型一节

网关路由 | 凤凰架构

  • 同步IO(Synchronous I/O)
    • 阻塞IO(Blocking I/O), 节省 CPU 资源(Java传统IO模型)
    • 非阻塞IO(Non-Blocking I/O), 浪费 CPU 资源(Java的NIO)
    • 多路复用IO(Multiplexing I/O), 主流(通过NIO实现的Reactor模式)
    • 信号驱动IO(Signal-Driven I/O), 需要自己从缓冲区获取数据
  • 异步IO(Asynchronous I/O)(通过AIO实现的Proactor模式)

阻塞与非阻塞的区别主要发生在第一阶段: 数据准备阶段

  • 阻塞模式, 当Socket的接收缓冲区中没有数据的时候, 应用线程会一直等待
  • 非阻塞模式, 应用线程不会等待, 系统调用直接返回错误标志EWOULDBLOCK

同步与异步主要的区别发生在第二阶段: 数据拷贝阶段

  • 同步模式在数据准备好后, 是由用户线程的内核态来执行第二阶段, 所以应用程序会在第二阶段发生阻塞, 直到数据从内核空间拷贝到用户空间, 系统调用才会返回
  • 异步模式下是由内核来执行第二阶段的数据拷贝操作, 当内核执行完第二阶段, 会通知用户线程IO操作已经完成, 并将数据回调给用户线程

IO多路复用 TODO

select

select

poll

epoll

epoll

在Netty中实现的EpollSocketChannel默认的就是边缘触发模式, JDK的NIO默认是水平触发模式

epoll对select, poll的优化

  1. epoll在内核中通过红黑树管理海量的连接, 所以在调用epoll_wait获取IO就绪的socket时, 不需要传入监听的socket文件描述符, 从而避免了海量的文件描述符集合在用户空间和内核空间中来回复制

  2. epoll仅会通知IO就绪的socket, 避免了在用户空间遍历的开销

  3. epoll通过在socket的等待队列上注册回调函数ep_poll_callback通知用户程序IO就绪的socket, 避免了在内核中轮询的开销

Reactor模型

Reactor基于NIO, Proactor基于AIO

  • 单Reactor单线程
    • 单Reactor意味着只有一个epoll对象, 监听所有的事件, 包括连接事件, 读写事件
    • 单线程意味着只有一个线程来执行epoll_wait获取IO就绪的Socket, 然后对这些就绪的Socket执行读写, 后续的业务处理也依然是这个线程
  • 单Reactor多线程
    • 只有一个epoll对象来监听所有的IO事件, 一个线程来调用epoll_wait获取IO就绪的Socket
    • 当IO就绪事件产生时, 这些IO事件对应处理的业务Handler, 我们是通过线程池来执行, 这样相比单Reactor单线程模型提高了执行效率, 充分发挥了多核CPU的优势
  • 主从Reactor多线程
    • 主Reactor处理连接事件
    • 当创建好连接, 建立好对应的socket后, 在acceptor中将需要监听的read事件注册到从Reactor中, 由从Reactor来监听socket上的读写事件
1
2
3
4
5
6
7
8
9
10
11
12
// 配置单Reactor单线程
EventLoopGroup eventGroup = new NioEventLoopGroup(1);
new ServerBootstrap().group(eventGroup);

// 配置多Reactor线程
EventLoopGroup eventGroup = new NioEventLoopGroup();
new ServerBootstrap().group(eventGroup);

// 配置主从Reactor多线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
new ServerBootstrap().group(bossGroup, workerGroup);

Netty对Reactor模型的实现

Netty中的reactor

Reactor在netty中是以group的形式出现的, netty中将Reactor分为两组, 一组是主ReactorGroup也就是bossGroup, 另一组是从ReactorGroup也就是workerGroup

主ReactorGroup

主ReactorGroup中通常只有一个Reactor, 专门负责监听连接accept事件, 当有连接事件产生时, 在acceptor中创建相应的NioSocketChannel(代表一个Socket连接), 然后以负载均衡的方式在从ReactorGroup中选一个Reactor, 注册监听Read事件

主ReactorGroup中只有一个Reactor, 因为通常我们服务端程序只会绑定监听一个端口, 如果要绑定监听多个端口, 就会配置多个Reactor

从ReactorGroup

从ReactorGroup中有多个Reactor, 具体Reactor的个数可以由系统参数指定, 默认的Reactor的个数为CPU核数 * 2, 从ReactorGroup中的Reactor主要负责监听读写事件, 每一个Reactor负责监听一组socket连接

Socket连接在创建后就被固定的分配给一个Reactor, 所以一个Socket连接也只会被一个固定的IO线程执行, 这种无锁串行化的设计的目的是为了防止多线程并发执行同一个socket连接上的IO逻辑处理, 防止出现线程安全问题

  1. 轮询IO就绪事件
  2. 处理IO事件
  3. 执行异步任务

由于每个Reactor中只有一个IO线程, ChannelHandler中执行的逻辑不能耗时太长, 尽量将耗时的业务逻辑处理放入单独的业务线程池中处理, 否则会影响其他连接的IO读写, 从而影响整个服务程序的IO吞吐

Reactor在Netty中的实现(创建)

Netty服务端代码模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

public static void main(String[] args) throws Exception {
// Configure the server.
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 配置主从Reactor
.channel(NioServerSocketChannel.class) // 配置主Reactor中的channel类型
.option(ChannelOption.SO_BACKLOG, 100) // 设置主Reactor中channel的option选项
.handler(new LoggingHandler(LogLevel.INFO)) // 设置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {
// 设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

// 绑定端口启动服务, 开始监听accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

创建主从ReactorGroup, 在Netty中EventLoopGroup就是ReactorGroup的实现类, EventLoop就是Reactor的实现类

SocketChannel

  • NioServerSocketChannel, 监听Socket, 负责listen和bind, 使用ServerBootstrapAcceptor创建NioSocketChannel
  • NioSocketChannel, 客户端连接socket, 负责与客户端之间的网络通信

serverBootstrap.handler设置的是服务端NioServerSocketChannel PipeLine中的ChannelHandler, ServerBootstrap启动类方法带有child前缀的均是设置客户端NioSocketChannel属性的

创建用于启动EventLoop线程的executor

ThreadPerTaskExecutor, 来一个任务就创建一个线程执行, EventLoop线程的核心就是一个死循环不停的轮询IO就绪事件, 处理IO事件, 执行异步任务

创建EventLoop

EventLoop结构

EventLoop线程组NioEventLoopGroup包含多个EventLoop, 存放于private final EventExecutor[] children

EventLoop的核心是使用IO多路复用模型来对客户端连接上的IO事件进行监听, 所以最重要的事情是创建Selector

openSelector是NioEventLoop类中用于创建IO多路复用的Selector, 并对创建出来的JDK NIO原生的Selector进行性能优化:

  1. SelectorImpl中存放可用socket的HashSet使用反射修改为SelectedSelectionKeySet, 底层实现从HashMap变为数组, 优化遍历、插入性能, 利用cpu cache
  2. 使用代理模式, 用SelectedSelectionKeySetSelector代理selector的真实实现类, 在每次遍历结束后清空SelectedSelectionKeySet的内容

EventLoop内的异步任务队列的类型为MpscQueue,它是由JCTools提供的一个高性能无锁队列, 从命名前缀Mpsc可以看出, 它适用于多生产者单消费者的场景

EventLoop负责执行的异步任务分为三类:

  • 普通任务: 这是Netty最主要执行的异步任务, 存放在普通任务队列taskQueue中, 在NioEventLoop构造函数中创建
  • 定时任务: 存放在优先级队列中
  • 尾部任务: 存放于尾部任务队列tailTasks中, 尾部任务一般不常用, 在普通任务执行完后 EventLoop线程会执行尾部任务, 比如对Netty的运行状态做一些统计数据, 例如任务循环的耗时、占用物理内存的大小等等都可以向尾部队列添加一个收尾任务完成统计数据的实时更新

创建Channel到EventLoop的绑定策略

无论是Netty服务端NioServerSocketChannel关注的OP_ACCEPT事件, 还是Netty客户端NioSocketChannel关注的OP_READOP_WRITE事件, 都需要先注册到EventLoop上, EventLoop才能监听Channel上关注的IO事件实现IO多路复用

MultithreadEventExecutorGroup类的构造器参数EventExecutorChooserFactory负责创建Channel到EventLoop的绑定策略, 默认为round-robin轮询, 如果EventLoop数量为2的次幂可以用移位快速计算

Netty启动流程

EventLoop启动流程图

结构

不管是服务端用到的NioServerSocketChannel还是客户端用到的NioSocketChannel, 每个Channel实例都会有一个Pipeline, Pipeline中有多个ChannelHandler用于编排处理对应Channel上感兴趣的IO事件

NioServerSocketChannel中的Pipeline添加ChannelHandler分为两种方式

  1. 显式添加: 用户通过ServerBootstrap#handler添加, 如果需要添加多个ChannelHandler, 则可以通过ChannelInitializer向pipeline中进行添加
  2. 隐式添加: 隐式添加主要添加的就是MainEventLoopGroup的核心组件, 也就是图中的acceptor, Netty中的实现为ServerBootstrapAcceptor, 本质上也是一种ChannelHandler, 主要负责在客户端连接建立好后, 初始化客户端NioSocketChannel, 在从EventLoop线程组中选取一个SubEventLoop, 将客户端NioSocketChannel注册到SubEventLoop中的selector上

EventLoopGroup结构

由于在Netty的IO线程模型中, 是由单个SubEventLoop线程负责执行客户端NioSocketChannel中的Pipeline, 一个SubEventLoop线程负责处理多个NioSocketChannel上的IO事件, 如果Pipeline中的ChannelHandler添加的太多, 就会影响SubEventLoop线程执行其他NioSocketChannel上的Pipeline, 从而降低IO处理效率, 降低吞吐量

所以Pipeline中的ChannelHandler不易添加过多, 并且不能在ChannelHandler中执行耗时的业务处理任务

Netty服务端的启动

EventLoop启动后的结构

Netty服务端的启动流程总体如下:

  1. 创建服务端NioServerSocketChannel并初始化
  2. 将服务端NioServerSocketChannel注册到主EventLoopGroup
  3. 注册成功后, 开始初始化NioServerSocketChannel中的pipeline, 然后在pipeline中触发ChannelRegister事件
  4. 随后由NioServerSocketChannel绑定端口地址
  5. 绑定端口地址成功后, 向NioServerSocketChannel对应的Pipeline中触发传播ChannelActive事件, 在事件回调中向MainEventLoop注册OP_ACCEPT事件, 开始等待客户端连接, 服务端启动完成

代码执行流程:

  1. 创建NioServerSocketChannel
  2. 初始化NioServerSocketChannel
  3. MainEventLoop注册NioServerSocketChannel
    1. MainEventLoopGroup中选取一个MainEventLoop进行注册
    2. 向绑定后的MainEventLoop进行注册
    3. MainEventLoop的启动
    4. startThread
    5. register0, 封装为异步任务
    6. doRegister(), 添加Acceptor封装为异步任务
    7. HandlerAdded事件回调中初始化ChannelPipeline
    8. 回调regFuture的ChannelFutureListener
  4. doBind0, 封装为异步任务
  5. 绑定端口地址
    1. HeadContext
    2. channelActive事件处理, 封装为异步任务
    3. beginRead

UnsafeChannel接口的一个内部接口, 用于定义实现对Channel底层的各种操作, Unsafe接口定义的操作行为只能由Netty框架的Reactor线程调用, 用户线程禁止调用

Netty自定义的SocketChannel类型均继承AttributeMap接口以及DefaultAttributeMap类, 正是它们定义了ChannelAttributes, 用于向Channel添加用户自定义的一些信息

初始化NioServerSocketChannel中pipeline的时机是: 当NioServerSocketChannel注册到MainEventLoop之后, 绑定端口地址之前

MainEventLoop线程是在提交第一个异步任务的时候启动的, 在用户程序(Main线程)提交用于注册NioServerSocketChannel的异步任务时开始启动

Reactor的架构

Reactor线程的整个运行框架

Netty中的Reactor线程主要干三件事情:

  • 轮询注册在EventLoop上的所有Channel感兴趣的IO就绪事件
  • 处理Channel上的IO就绪事件
  • 执行Netty中的异步任务

EventLoop线程其实执行的就是一个死循环, 在死循环中不断的通过Selector去轮询IO就绪事件, 如果发生IO就绪事件则从Selector系统调用中返回并处理IO就绪事件, 如果没有发生IO就绪事件则一直阻塞在Selector系统调用上, 直到满足Selector唤醒条件:

  1. 当Selector轮询到有IO活跃事件发生时
  2. EventLoop线程需要执行的定时任务到达任务执行时间deadline时
  3. 当有异步任务提交给EventLoop时, EventLoop线程需要从Selector上被唤醒, 这样才能及时的去执行异步任务

EventLoop工作流程图

Reactor线程轮询IO就绪事件

在Reactor线程的轮询工作开始之前, 需要首先判断下当前是否有异步任务需要执行, 判断依据就是查看Reactor中的异步任务队列taskQueue和用于统计信息任务用的尾部队列tailTask是否有异步任务

如果Reactor中有异步任务需要执行, 那么Reactor线程需要立即执行, 不能阻塞在Selector上, 在返回前需要再顺带调用selectNow()非阻塞查看一下当前是否有IO就绪事件发生, 如果有, 那么正好可以和异步任务一起被处理, 如果没有, 则及时地处理异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// DefaultSelectStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
/**
* Reactor线程要保证及时的执行异步任务
* 1: 如果有异步任务等待执行, 则马上执行selectNow()非阻塞轮询一次IO就绪事件
* 2: 没有异步任务, 则跳到switch select分支
*/
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
// NioEventLoop
private final IntSupplier selectNowSupplier = new IntSupplier() {
public int get() throws Exception {
// 非阻塞
return selector.selectNow();
}
};

NioEventLoopScheduledExecutorService的子类, 支持执行定时任务, 在阻塞轮询IO任务之前通过AbstractScheduledEventExecutornextScheduledTaskDeadlineNanos获取到下个定时任务执行的时间作为阻塞超时时间

异步任务在被提交后希望立马得到执行, 那么就在提交异步任务的时候去唤醒正在阻塞轮询的Reactor线程

1
2
3
4
5
6
7
8
9
10
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// 执行到这里说明EventLoop已经从Selector上被唤醒了
// 设置EventLoop的状态为苏醒状态AWAKE
// lazySet优化不必要的volatile操作, 不使用内存屏障, 不保证写操作的可见性(单线程不需要保证)
nextWakeupNanos.lazySet(AWAKE);
}

Reactor处理IO与处理异步任务的时间比例分配

Netty通过ioRatio变量来调配EventLoop线程在处理IO事件和执行异步任务之间的CPU时间分配比例, 防止EventLoop线程处理异步任务时间过长而导致I/O 事件得不到及时地处理

ioRatio / 100 = 执行异步任务时间 / (执行异步任务时间 + 轮询IO事件时间), 默认50

Reactor线程处理IO就绪事件

  1. processSelectedKeysPlain, JDK实现
    1. 获取IO就绪的Channel
    2. 处理Channel上的IO事件
      1. 处理Connect事件
      2. 处理Write事件
      3. 处理Read事件或者Accept事件
    3. 从Selector中移除失效的SelectionKey, 为了保证Selector中所有KeySet的有效性, 需要在Channel取消个数达到256时, 触发一次selectNow, 目的是清除无效的SelectionKey
  2. processSelectedKeysOptimized, Netty实现
    1. 数组需要手动做清除

服务端NioServerSocketChannel中的Read方法处理的是Accept事件, 客户端NioSocketChannel中的Read方法处理的是Read事件

Reactor线程处理异步任务

EventLoop线程执行异步任务的核心逻辑:

  1. 先将到期的定时任务从定时任务队列scheduledTaskQueue中全部取出并转存到普通任务队列taskQueue中
  2. EventLoop线程统一从普通任务队列taskQueue中取出任务执行
  3. EventLoop线程执行完定时任务和普通任务后, 开始执行存储于尾部任务队列tailTasks中的尾部任务

规避JDK Epoll空轮询bug

JDK NIO Epoll的空轮询BUG会导致EventLoop线程在没有任何事情可做的情况下被意外唤醒, 导致CPU空转

既没有IO就绪事件, 也没有异步任务, EventLoop线程从Selector上被异常唤醒, 发生512次之后认为已触发bug, 则重建Selector(将之前注册的所有Channel重新注册到新的Selector上并关闭旧的Selector), selectCnt计数归0

Netty接收网络连接

MainReactor处理OP_ACCEPT事件

Netty将OP_ACCEPT事件处理的入口函数封装在NioServerSocketChannel里的底层操作类Unsafe的read方法中

接收客户端连接

main reactor线程是在一个无限循环read loop中不断的调用JDK NIO serverSocketChannel.accept()方法来接收完成三次握手的客户端连接NioSocketChannel的, 并将接收到的NioSocketChannel临时保存在List<Object> readBuf集合中, 后续在NioServerSocketChannel的pipeline中通过ChannelRead事件来传递, 最终会在ServerBootstrapAcceptor这个ChannelHandler中被处理初始化, 并将其注册到SubEventLoop

这里的read loop循环会被限定只能读取16次, 当MainEventLoopNioServerSocketChannel中读取客户端连接NioSocketChannel的次数达到16次之后, 无论此时是否还有客户端连接都不能在继续读取了, 因为还需要分配时间去执行异步任务, 不能因为无限制的接收客户端连接而耽误了异步任务的执行

doReadMessages接收客户端连接

根据ServerSocketChannel的accept方法获取到JDK NIO 原生的SocketChannel(用于底层真正与客户端通信的Channel), 来创建Netty中的NioSocketChannel

NioServerSocketChannelNioSocketChannel的不同

  1. NioServerSocketChannelEventLoop启动过程中创建, NioSocketChannelEventLoop接收连接时被NioServerSocketChannel创建
  2. NioServerSocketChannelMainEventLoop注册OP_ACCEPT事件, NioSocketChannelSubEventLoop注册OP_READ事件
  3. NioServerSocketChannel继承AbstractNioMessageChannel, 输出的Message指的是SocketChannel客户端连接; NioSocketChannel继承的是AbstractNioByteChannel, 输出的是网络数据Byte

NioSocketChannel结构

ChannelRead事件的响应

ServerBootstrapAcceptor主要的作用就是初始化客户端NioSocketChannel, 并将客户端NioSocketChannel注册到从ReactorGroup中, 并监听OP_READ事件

向从ReactorGroup注册NioSocketChannel

  1. 从ReactorGroup中选取一个从Reactor进行绑定
  2. 向绑定的从Reactor上注册NioSocketChannel
  3. register0

Netty接收网络数据

SubReactor处理OP_READ事件流程总览

当网络数据到达服务端的网卡并经过内核协议栈的处理, 最终数据到达Socket的接收缓冲区之后, SubEventLoop轮询到NioSocketChannel上的OP_READ事件就绪, 随后SubEventLoop线程就会从JDK Selector上的阻塞轮询APIselector.select(timeoutMillis)调用中返回, 转而去处理NioSocketChannel上的OP_READ事件

SubEventLoop在处理Channel上的IO事件入口函数为NioEventLoop#processSelectedKey

Netty接收网络数据流程总览

Netty接收网络数据流程

NioSocketChannel读取连接数据的read loop中受最大读取次数的限制, 默认配置最多只能读取16次, 超过16次无论此时NioSocketChannel中是否还有可读数据都不能在进行读取了

  • lastBytesRead < 0: 表示客户端主动发起了连接关闭流程, Netty开始连接关闭处理流程
  • lastBytesRead = 0: 表示当前NioSocketChannel上的数据已经全部读取完毕, 没有数据可读, 本次OP_READ事件处理完毕
  • lastBytesRead > 0: 表示在本次read loop中从NioSocketChannel中读取到了数据, 会在NioSocketChannel的pipeline中触发ChannelRead事件, 进而在pipeline中负责IO处理的ChannelHandelr中响应, 处理网络请求

ChannelRead和ChannelReadComplete的区别

  • ChanneRead事件: 一次循环读取一次数据, 就触发一次ChannelRead事件, 本次最多读取在read loop循环开始分配的DirectByteBuffer容量大小
  • ChannelReadComplete事件: 当读取不到数据或者不满足continueReading的任意一个条件就会退出read loop, 这时就会触发ChannelReadComplete事件

触发ChannelReadComplete事件并不代表NioSocketChannel中的数据已经读取完了, 只能说明本次OP_READ事件处理完毕, 因为有可能是客户端发送的数据太多, Netty读了16次还没读完, 那就只能等到下次OP_READ事件到来的时候在进行读取了

源码核心框架总览

分配DirectByteBuffer接收网络数据

NioSocketChannel的2个ByteBufAllocator

  • ByteBufAllocator是一个PooledByteBufAllocator的实例, 内存池, 用来管理堆外内存DirectByteBuffer
  • RecvByteBufAllocator是一个AdaptiveRecvByteBufAllocator类的实例, 可以动态调整ByteBuffer的容量, 初始为2048

RecvByteBufAllocator计算大小, 然后ByteBufAllocator进行内存分配

ByteBuffer动态自适应括缩容机制

容量索引表

  1. 当索引容量小于512时, 容量索引从16开始按16递增
  2. 当索引容量大于512时, 容量索引按前一个索引容量的2倍递增

AdaptiveRecvByteBufAllocator类中定义的扩容步长INDEX_INCREMENT = 4, 缩容步长INDEX_DECREMENT = 1

  • 扩容: 取容量索引向后走4步对应的size
  • 缩容: 取容量索引向前走1步对应的size, 满足两次缩容条件才会进行缩容

使用堆外内存为ByteBuffer分配内存

JDK接收请求的拷贝次数

  1. 网卡 -> 内核空间, 使用DMA
  2. 内核空间 -> 用户空间, 系统调用触发
  3. 堆外内存 -> 堆内存, JVM拷贝

Netty使用堆外内存的好处

  1. 减少一次拷贝
  2. 手动引用计数维护内存可以减少FGC

Recycler对象池

背景

Netty选择使用堆外内存存储网络通信数据

  1. 在JVM堆中创建一个用于引用native memory的引用对象DirectByteBuffer
  2. 使用native方法unsafe.allocateMemory通过底层malloc系统调用申请一块堆外内存, 然后被DirectByteBuffer引用

Netty面对的高并发网络通信场景下, 申请堆外内存是一个非常频繁的操作, 这种大量频繁的内存申请释放操作对程序的性能影响是巨大的, 所以Netty就引入了内存池对内存相关的操作进行统一的管理

对象在JVM中创建和回收开销

略过

对象池Recycler

对象池接口

Netty中每个被池化的对象中都会引用对象池的实例RECYCLER

每个池化对象中都会包含一个recyclerHandle, 是池化对象在对象池中的句柄, 是由对象池在创建对象后传递进来的

  • 获取对象: RECYCLER.get()
  • 删除对象: recyclerHandle.recycle(this)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Channel#write使用的缓存ChannelOutboundBuffer中保存的对象Entry
// 以Entry为例看一下如何使用对象池
static final class Entry {
// 构造
private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(handle -> return new Entry(handle));
private final Handle<Entry> recyclerHandle;
private Entry(Handle<Entry> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

// 申请对象
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
Entry entry = RECYCLER.get();
entry.msg = msg;
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
entry.total = total;
entry.promise = promise;
return entry;
}

// 回收对象
void recycle() {
next = null;
bufs = null;
buf = null;
msg = null;
promise = null;
progress = 0;
total = 0;
pendingSize = 0;
count = -1;
cancelled = false;
handle.recycle(this);
}
}

Recycler总体设计

Recycler总体设计

获取对象无锁化设计

每个线程拥有一个独立Stack, 这样当多个线程并发从对象池中获取对象时, 都是从自己线程中的Stack中获取, 全程无锁化运行, JVM的堆内存分配也是这样(TLAB), 但是JVM提供的堆外内存分配为了防止内存泄漏是全局加锁的(堆外不能自动GC)

释放对象无锁化设计

thread2释放thread1获取的对象, 就需要操作thread1的stack, 可能产生多线程冲突

引入WeakOrderQueue链表, 每个WeakOrderQueue表示一个其他线程回收的该线程的对象, 获取对象时取不到对象就去WeakOrderQueue

对象池回收对象的一个原则就是对象由谁创建的, 最终就要被回收到创建线程对应的Stack结构中的数组栈中, 数组栈中存放的才是真正被回收的池化对象, 可以直接被取出复用, 回收线程只能将待回收对象暂时存放至创建线程对应的Stack结构中的WeakOrderQueue链表中, 当数组栈中没有对象时, 由创建线程将WeakOrderQueue链表中的待回收对象转移至数组栈中

WeakOrderQueue链表, 创建线程使用head指针获取可以被回收的对象, 回收线程使用tail指针插入需要被回收的对象, Netty这里为了不引入多线程同步的开销, 只会保证待回收对象的最终可见性, 会存在线程可见性的问题(维护线程之间操作的原子性, 可见性都是需要开销的, netty为了提高多线程的运行效率, 避免引入不必要的同步开销)

Recycler实现

Handle是池化对象在对象池中的一个模型, 默认实现是DefaultHandle

stackWeakOrderQueue中存的就是handle

在创建对象池的时候, 需要通过ObjectCreator#newObject方法指定对象池创建对象的行为即Handle

创建ObjectPool

Netty发送数据流程

write方法发送数据

write事件传播流程

  • channelHandlerContext.write()方法会从当前ChannelHandler开始在pipeline中向前传播write事件直到HeadContext
  • channelHandlerContext.channel().write()会从pipeline的尾结点TailContext开始在pipeline中向前传播write事件直到HeadContext

Netty的写操作是一个异步操作, 当我们在业务线程中调用channelHandlerContext.write()后, Netty会给我们返回一个ChannelFuture, 我们可以在这个ChannelFutrue中添加ChannelFutureListener, 这样要发送的数据发送到底层Socket中时, Netty会通过ChannelFutureListener通知我们写入结果

当异步事件在pipeline传播的过程中发生异常时就会停止传播。所以我们在日常开发中, 需要对写操作异常情况进行处理

Flush

发送数据

Socket写满16次未写完

处理OP_WRITE

writeAndFlush

Pipeline, 详解所有 IO 事件的触发时机以及传播路径

Pipeline的创建

HeadContext

TailContext

TailContext作为一个ChannelHandlerContext

TailContext作为一个ChannelInbondHandler

Pipeline中的事件

Inbond事件

OutBond事件

向Pipeline添加ChannelHandler

ChannelHandlerContext的创建

从pipeline删除ChannelHandler

初始化pipeline

事件传播

Netty 如何处理 TCP 连接的正常关闭, 异常关闭, 半关闭场景

TCP连接正常关闭

Netty处理TCP连接正常关闭

TCP连接异常关闭

Netty对RST包的处理

TCP连接半关闭HalfClosure

主动关闭方发起TCP半关闭

被动关闭放处理TCP半关闭


聊聊Netty那些事儿之从内核角度看IO模型

聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)

详细图解Netty Reactor启动全流程

一文聊透Netty核心引擎Reactor的运转架构

抓到Netty一个Bug, 聊一下Netty是如何高效接收网络连接的

Netty如何高效接收网络数据?ByteBuffer动态自适应扩缩容机制

抓到Netty一个内存泄露Bug | 详解Recycler对象池的设计与实现

Netty发送数据全流程

Netty IO 事件的编排利器 pipeline | 所有 IO 事件的触发时机以及传播路径

Netty 如何应对 TCP 连接的正常关闭, 异常关闭, 半关闭场景