Netty源码阅读2——NioEventLoop执行过程

Posted by 皮皮潘 on 12-03,2021

NioEventLoop的执行过程抽象如下:

for (;;) {
   select(wakenUp.getAndSet(false));
   if (wakenUp.get()) 
      selector.wakeup();
   processSelectedKeys();
   runAllTasks(...);
}

线程的执行主要分为三个阶段:1. 调用select方法轮询selector上的所有的channel的IO事件 2. 处理产生了事件的Channel 3. 处理任务队列

阶段一:select等待IO事件

select方法调用的内部其实也是一个for循环,其中调用了jdk原生的selector的select方法,但同时也插入了一些逻辑在里面,也即检查是否有定时任务和普通任务,保证了Netty的任务队列中的任务得到有效执行,每次for循环中具体执行步骤如下:

  1. 查看是否有定时任务的截至时间快到了(<=0.5ms),如果有定时任务需要执行了且selector没有被调用过,则调用selectNow方法并跳出循环,其中定时任务按照延迟时间从小到大排序存在PriorityQueue中

  2. 查看是否有任务加入,如果发现有任务加入且selector没有被调用过,则调用selectNow方法并跳出循环,

  3. 执行阻塞式selector操作:selector.select(),执行完成后设置selectCnt ++,并判断是否能跳出循环,其中为了防止select方法阻塞太长时间,当对EventLoop提交任务的时候会调用wakeup方法,其中wakeup方法使得当有线程在执行selector.slect()时,该方法会立即返回,当没有线程在执行时,则下一次执行select方法会立即方法

  4. 通过对于空轮询(没有事件发生select方法却返回了)计数,如果超过阈值则rebuildSelector从而解决jdk的NIO的bug,rebuildSelector的具体实现则是先创建一个新的Selector,然后遍历旧Selector中所有的key,取消掉在旧Selector的注册然后注册到新的Selector

阶段二:处理Selected Keys

在初始化的时候,Netty在openSelector方法中通过反射的方式将SelectorImpl中Set类型的selectedKeys字段以及publicSelectedKeys字段用Netty自己定义的SelectedSelectionKeySet类型实例进行替换从而完成优化,并将对应实例直接赋值给了NioEventLoop中的selectedKeys字段,在SelectedSelectionKeySet中使用数组替换了HashSet从而当keys非常多的时候依然可以在O(1)复杂度下进行key的插入,而HashSet由于哈希碰撞,可能需要O(logn)

具体处理步骤如下:

  1. 遍历取出IO事件SelectionKey以及其上的attachment,由于在创建一个AbstractNioChannel的时候会通过registor方法将自身注册到EventLoop的Selector上,而在注册过程中会调用AbstractNioChannel#doRegister方法,其中会将AbstractNioChannel自身作为对应的attachment,因此attachment即为对应的AbstractNioChannel

  2. 处理attachement对应的AbstractNioChannel:对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理;对于worker NioEventLoop来说,轮询到的基本上都是io读写事件:1. 如果是读事件则后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理,也即回调到用户方法;2. 如果是写事件,具体的输出数据其实之前已经在channelHandler中都处理好了,此时直接flush即可

  3. 判断是否需要再次Select:对于每个EventLoop而言,每隔256个Channel从selector上移除的时候,就标记needsToSelectAgain为true,此时将所有selectedKeys情况,并selectAgain

阶段三:处理任务队列

用户有三种方式提交任务:

  1. 在EventLoop所在线程提交任务:ctx.channel().eventLoop().execute(new Runnable());

  2. 在其他线程提交任务:也即调用channel的各种方法,如:channel.write()

  3. 在EventLoop所在线程提交定时任务:ctx.channel().eventLoop().schedule(new Runnable())

当用户提交任务的时候,Netty会把任务push到EventLoop的taskQueue中,taskQueue在NioEventLoop中默认是mpsc队列,即多生产者单消费者队列,netty使用mpsc方便的将外部线程的task聚集,在EventLoop线程内部用单线程来串行执行。当用户提交定时任务时一般都在EventLoop所在的线程中,此时不需要担心并发的问题,直接把定时任务存在优先队列中即可(优先队列通过在创建ScheduledTask计算的到绝对DeadlineNanos进行比较),但是对于提交定时任务时不在EventLoop所在的线程的场景,Netty会把定时任务包装成一个添加定时任务的任务,并放入mpsc队列中,从而解决了并发的问题。

当Netty实际执行任务的时候,首先需要assert执行线程等于EventLoop绑定线程从而保证mpsc的正确性,然后会将快到期的ScheduledTask放入TaskQueue中,接下来从TaskQueue中拿出Task并执行直到没有Task可以执行或者超出结束时间,其中每执行64个Task才会暂停一下将当前时间和结束时间(通过IORatio计算得到,为了保证IO和CPU时间之间的比例,一般为50,也即IO时间和CPU时间1比1)进行对比

  • 当前reactor线程调用当前eventLoop执行任务,直接执行,否则,添加到任务队列稍后执行

  • netty内部的任务分为普通任务和定时任务,分别落地到MpscQueue和PriorityQueue

  • netty每次执行任务循环之前,会将已经到期的定时任务从PriorityQueue转移到MpscQueue

  • netty每隔64个任务检查一下是否该退出任务循环