背景
Netty将IO事件按照流向划分为两个部分:Inbound入站事件和Outbound出站事件。入站事件由外部触发,包括通道注册(register)、通道激活(active)、数据可读(read)、通道异常(exceptionCaught)等;出站事件由程序主动触发,如连接的建立(bind, connect)与关闭(closh),数据的写出(write、flush)等。
先整体上看, Netty的每个通道都独有一个Pipeline,通道的出站和入站事件在Pipeline中流动和处理,如下所示:
上图包含了Netty引入的ChannelHandler、ChannelHandlerContext、Pipeline等三个概念:
[1] ChannelHandler: 真正处理读写事件的类;
[2] ChannelHandlerContext: 每个ctx(ChannelHandlerContext的缩写)内部包含一个ChannelHandler;
[3] Pipeline内部维护一个元素类型为ChannelHandlerContext的双向链表队列,队头位HeadContext, 队尾为TailContext, 可以在二者之间添加业务ctx;
本文的重点内容是结合Netty代码对上图进行介绍。
1.ChannelHandler
从整体上看,IO事件和数据在Pipeline的ctx间流动,实际是在ctx的ChannelHandler节点之间流动。每个ChannelHandler都有机会选择是否处理流动的事件,可以选择:忽略事件、处理事件并中断流动(不再传递)、处理事件并继续传递事件。事件分类为入站事件和出站事件,入站ChannelHandler只能处理入站事件,出站ChannelHandler只能处理出站事件。
1.1 ChannelHandler接口
ChannelHandler是Netty中的一个核心接口,被用来拦截和处理各种IO事件,如通道的注册、激活、数据的读写等。使用Netty框架开发程序,本质是开发ChannelHandler,并将自定义的ChannelHandler注册到Netty框架中。ChannelHandler的接口定义如下:
public interface ChannelHandler {void handlerAdded(ChannelHandlerContext ctx) throws Exception;void handlerRemoved(ChannelHandlerContext ctx) throws Exception;// ...
}
包含两个接口,handlerAdded和handlerRemoved,子类可以将ChannelHandler注册到系统以及从系统移除时需要执行的逻辑实现在这两个方法中。
ChannelHandler的核心功能的定义在两个子类中,ChannelInboundHandler处理入站事件,ChannelOutboundHandler处理出站事件:
ChannelInboundHandler入站接口:
public interface ChannelInboundHandler extends ChannelHandler {void channelRegistered(ChannelHandlerContext ctx) throws Exception;void channelUnregistered(ChannelHandlerContext ctx) throws Exception;void channelActive(ChannelHandlerContext ctx) throws Exception;void channelInactive(ChannelHandlerContext ctx) throws Exception;void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;void channelReadComplete(ChannelHandlerContext ctx) throws Exception;void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;// ... 以及userEventTriggered, channelWritabilityChanged等接口
}
负责处理的事件包括:通道注册到选择器时、通道激活(连接成功)时、数据可读以及数据读取完毕时。
ChannelOutboundHandler出站接口:
public interface ChannelOutboundHandler extends ChannelHandler {void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress,SocketAddress localAddress, ChannelPromise promise) throws Exception;void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;void flush(ChannelHandlerContext ctx) throws Exception;//... 以及read, disconnect, deregister等接口
}
负责处理的事件包括:连接的建立(bind, connect)与关闭(closh),数据的写出(write、flush)。
实现ChannelInboundHandler和ChannelOutboundHandler接口的类将同时具备处理入站和出站事件的能力。
1.2 案例介绍
程序使用了3个ChannelInboundHandler子类, 分别是ChannelInboundHandlerFirst, ChannelInboundHandlerSecond, ChannelInboundHandlerThird,依次添加到Pipeline中,构成的pipeline如下图所示(省去字符串编解码):
有消息到达时,消息的流向按如下顺序: HeadContext -> ChannelInboundHandlerFirst -> ChannelInboundHandlerSecond -> ChannelInboundHandlerThird -> TailContext.
3个ChannelInboundHandler子类的代码依次为:
public class ChannelInboundHandlerFirst extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 打印消息System.out.println(msg);// 修改消息,并继续向下传递消息ctx.fireChannelRead("First_" + msg);}
}public class ChannelInboundHandlerSecond extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 仅打印消息,不继续传递消息System.out.println(msg);}
}public class ChannelInboundHandlerThird extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 仅打印消息,不继续传递消息System.out.println(msg);}
}
当客户端发送"hello server"字符串给服务器时:
Microsoft Telnet> send hello server
发送字符串 hello server
Microsoft Telnet>
服务器日志打印为:
[First] hello world
[Second] First_hello world
结果分析:
Pipeline对应的通道有消息可读时,消息进入Pipeline:
[1] HeadContext不处理消息,直接传递给ChannelInboundHandlerFirst;
[2] ChannelInboundHandlerFirst接收到消息后,消费消息(打印), 修改消息,并继续向下游触发消息;
[3] ChannelInboundHandlerSecond收到ChannelInboundHandlerFirst触发的消息,仅消费消息(打印),不向下游触发, 消息至此中断;
[4] ChannelInboundHandlerThird因没有上游触发消息,channelRead方法永远不会被调用。
2.ChannelHandlerContext
Netty中我们只需要关注4个ChannelHandlerContext子类,其次为AbstractChannelHandlerContext、DefaultChannelHandlerContext、HeadContext和TailContext,其类结构关系图如下所示:
ChannelHandlerContext接口类定义了ctx规范,核心实现在抽象类AbstractChannelHandlerContext中,HeadContext和TailContext分别作为Pipeline中双向链表首尾两个节点,其他所有的节点类型为DefaultChannelHandlerContext类型。
本章内容先介绍ChannelHandlerContext接口,再介绍实现类DefaultChannelHandlerContext、HeadContext和TailContext,最后再介绍抽象类AbstractChannelHandlerContext,以便突出AbstractChannelHandlerContext的核心作用。
2.1 ChannelHandlerContext接口
介绍ChannelHandlerContext接口之前,有必要简单介绍一下其父类ChannelInboundInvoker和ChannelOutboundInvoker。ChannelInboundInvoker和ChannelOutboundInvoker分别定义了主动触发Inbound和Outbound事件的方法。如当可读消息到达当前节点,并需要向下级传递时,需要通过Invoker。
ChannelHandlerContext对象(后续使用ctx表示),每个ctx都有关联的通道channel、所属的Pipeline、所包含的处理器handler、内部使用的线程池executor,接口需要定义获取这些组件的方法。
ctx还需要拥有向上游或者下游传递事件和消息的能力,因此需要继承ChannelInboundInvoker和ChannelOutboundInvoker接口。
ChannelHandlerContext接口定义如下所示:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {Channel channel();ChannelPipeline pipeline();ChannelHandler handler();EventExecutor executor();boolean isRemoved();@Override// 重写ChannelInboundInvoker, ChannelOutboundInvoker的方法,修改返回值为ChannelHandlerContext// fireChannelRegistered/fireChannelUnregistered、fireChannelActive/fireChannelInactive、fireChannelRead/fireChannelReadComplete、fireExceptionCaught、fireChannelWritabilityChanged/fireUserEventTriggered、read/flushChannelHandlerContext fireChannelRegistered();//...
}
另外,引入了isRemoved方法表示ctx是否从所属的pipeline中被移除了。
2.2 DefaultChannelHandlerContext
DefaultChannelHandlerContext实现类极其简单,内部维护一个ChannelHandler类型的处理器handler,重写handler()返回该handler对象。
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {private final ChannelHandler handler;DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, handler.getClass());this.handler = handler;}@Overridepublic ChannelHandler handler() {return handler;}
}
其构造函数在DefaultChannelPipeline中的newContext被调用,用于构造ctx.
2.3 HeadContext和TailContext
HeadContext:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, HeadContext.class);unsafe = pipeline.channel().unsafe();setAddComplete();}@Overridepublic ChannelHandler handler() {return this;}// handlerRemoved和handlerAdded实现为空@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {}//...// 【outbound】bind/connect/disconnect/close/deregister/read/write/flush@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}//...// 【inbound】channelRegistered/channelUnregistered、channelActive/channelInactive、channelRead/channelReadComplete、userEventTriggered/channelWritabilityChanged、exceptionCaught@Overridepublic void channelRegistered(ChannelHandlerContext ctx) {invokeHandlerAddedIfNeeded();ctx.fireChannelRegistered();}//...
}
整体上看,HeadContext属于ChannelInboundHandler也属于ChannelOutboundHandler,即可处理入站和出站事件:
[1] HeadContext内部维持了一个unbsafe对象,取自pipeline关联的通道的unsafe对象(后续文章介绍),HeadContext将内部所有的outbound事件委托给unsafe处理, 如上述源码的bind方法;
[2] 对于所有的Inbound消息,HeadContext直接通过Invoker向下游传递。
TailContext:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {TailContext(DefaultChannelPipeline pipeline) {super(pipeline, null, TAIL_NAME, TailContext.class);setAddComplete();}@Overridepublic ChannelHandler handler() {return this;}// channelRegistered/channelUnregistered、channelActive/channelInactive、handlerAdded/handlerRemoved、userEventTriggered/exceptionCaught、channelRead/channelReadComplete// 实现为空,即消息到此丢弃,不再向前或向后传递@Overridepublic void channelRegistered(ChannelHandlerContext ctx) { }//...
}
整体上看,TailContext是一个ChannelInboundHandler,对于所有的消息直接忽略,不向前和前后传递任何消息。
2.4 AbstractChannelHandlerContext抽象类
由于AbstractChannelHandlerContext承载的内容较多,为理解方便,对其按照功能进行拆分,分别介绍。
2.4.1 获取channel、Pipeline、handler、executor方法
channel、Pipeline、handler、executor是ctx工作的基础:
[1] 内部维持一个DefaultChannelPipeline类型的pipeline对象,赋值来自构造函数;
[2] channel与pipeline相互绑定, 因此可通过pipeline获取channel;
[3] handler在子类中实现;
[4] executor来自构造函数,一般入参为空,使用channel绑定的NioEventLoop线程池对象;
对应的方法实现如下:
@Override
public Channel channel() {return pipeline.channel();
}@Override
public ChannelPipeline pipeline() {return pipeline;
}@Override
public EventExecutor executor() {if (executor == null) {return channel().eventLoop();} else {return executor;}
}
2.4.2 next和prev属性
Pipeline内部包含一个ChannelHandlerContext类型的双向链表,ctx通过next和prev的指向将多个ctx串连成链表,属性定义如下:
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
因此,pipeline内部仅需要维护首尾节点即可。
2.4.3 executionMask属性和findContextInbound和findContextOutbound方法
Netty引入executionMask表示该ctx对象可处理的事件类型。findContextInbound(mask)用于沿着head->ctx->tail的顺序查找下一个可以处理mask类型Inbound事件的ctx对象;findContextOutbound沿着反向(tail -> ctx -> head)顺序查找上一个可以处理mask;类型Outbound事件的ctx对象。
先看一下mask的定义:
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
//... 从1 << 1定义至1 << 16,用于表示不同的读写事件
static final int MASK_CHANNEL_READ = 1 << 5;
//...
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
具体为每个读写事件定义了一个宏定义,MASK_ONLY_INBOUND包括所有的Inbound事件,MASK_ALL_OUTBOUND包括所有的Outbound事件,MASK_ALL_INBOUND包括所有的Inbound和Outbound事件。
ctx对象在构造阶段,通过ChannelHandler的类型确定executionMask属性值:
this.executionMask = mask(handlerClass);static int mask(Class<? extends ChannelHandler> clazz) {Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();Integer mask = cache.get(clazz);if (mask == null) {mask = mask0(clazz);cache.put(clazz, mask);}return mask;
}
mask对mask0添加了一层缓存优化:
private static int mask0(Class<? extends ChannelHandler> handlerType) {int mask = MASK_EXCEPTION_CAUGHT;if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {mask |= MASK_ALL_INBOUND;if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_REGISTERED;}//... 省略相同逻辑:channelUnregistered、channelActive/channelInactive、channelRead/channelReadComplete、channelWritabilityChanged/userEventTriggeredif (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_UNREGISTERED;}}if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {mask |= MASK_ALL_OUTBOUND;if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,SocketAddress.class, ChannelPromise.class)) {mask &= ~MASK_BIND;}//... 省略相同逻辑:connect/disconnect、close/deregister, read、write/flushif (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_UNREGISTERED;}}//...return mask;
}
逻辑较为简单,根据ChannelHandler的类型和是否实现对应的方法(不包括从父类继承的)计算mask值。如前面提到的ChannelInboundHandlerFirst:
public class ChannelInboundHandlerFirst extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 打印消息System.out.println(msg);// 修改消息,并继续向下传递消息ctx.fireChannelRead("First_" + msg);}
}
属于Inbound类型,且仅实现了channelRead方法,因此ChannelInboundHandlerFirst最终计算得到的mask值为32, 即宏定义MASK_CHANNEL_READ的值.
findContextInbound和findContextOutbound方法根据mask的值沿着pipeline的ctx链查找满足条件的ctx对象。
如findContextInbound(MASK_CHANNEL_READ)和findContextInbound(MASK_CHANNEL_READ_COMPLETE)表示查找下一个可以处理channelRead和channelReadComplete事件的InboundHandler, 具体实现如下:
private AbstractChannelHandlerContext findContextInbound(int mask) {AbstractChannelHandlerContext ctx = this;EventExecutor currentExecutor = executor();do {// 从当前ctx的下一个向后搜索ctx = ctx.next;} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));return ctx;
}private AbstractChannelHandlerContext findContextOutbound(int mask) {AbstractChannelHandlerContext ctx = this;EventExecutor currentExecutor = executor();do {// 从当前ctx的上一个向前搜索ctx = ctx.prev;} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));return ctx;
}
2.4.4 handlerState状态属性
引入handlerState属性记录加入Pipeline的状态,存在4种类型:
// 初始状态
private static final int INIT = 0;
// 正在加入到Pipeline
private static final int ADD_PENDING = 1;
// 已成功加入Pipeline
private static final int ADD_COMPLETE = 2;
// 已成功从pipeline移除
private static final int REMOVE_COMPLETE = 3;
handlerState初始值为INIT,引入一个原子对象用于修改handlerState的值:
private volatile int handlerState = INIT;private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
提供了三个方法(提供给Pipeline对象)用于设置状态值,以及提供一个方法查询是否被移除:
// 从INIT状态设置为ADD_PENDING状态
final void setAddPending() {boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);assert updated;
}// 设置为REMOVE_COMPLETE状态
final void setRemoved() {handlerState = REMOVE_COMPLETE;
}// 设置为ADD_COMPLETE状态
final boolean setAddComplete() {for (;;) {int oldState = handlerState;if (oldState == REMOVE_COMPLETE) {return false;}if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {return true;}}
}
// 查询是否被移除
public boolean isRemoved() {return handlerState == REMOVE_COMPLETE;
}
2.4.5 处理Inbound和Outbound事件
ChannelHandlerContext因实现了ChannelInboundInvoker接口具备了触发Inbound事件的能力,包括fireChannelRegistered/fireChannelUnregistered, fireChannelActive/fireChannelInactive, fireChannelRead/fireChannelReadComplete等。通过调用ctx对象的这些方法可以将消息以Inbound事件的方式在ctx链路间中传递,即将消息发送给Pipeline处理。
由于机制相同,本文仅以channelRead为案例进行介绍。
调用ctx的fireChannelRead时通过msg参数传递消息:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);return this;
}
findContextInbound(MASK_CHANNEL_READ)
前文已介绍:从当前ctx(不包括当前)向后找,找到下一个可以处理channelRead的ctx对象。
继续根据invokeChannelRead方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {// pipeline对特殊类型的处理,这里可以理解为直接返回msg(不处理)final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}
这里判断当前线程是否是ctx的线程池线程,如果是直接执行next.invokeChannelRead(m)
, 否则提交给ctx的线程池线程执行;从而确保了next.invokeChannelRead(m)任务只能由ctx的线程池执行。
使用Netty框架时,向pipeline添加ChannelHandler时一般不会额外指定线程池,因此ctx对象的线程池对象就是channel绑定的NioEventLoop对象;因此,此处逻辑为直接执行next.invokeChannelRead(m)
.
跟进next.invokeChannelRead(m);
语句,实现如下:
private void invokeChannelRead(Object msg) {// 根据状态判断是否可以处理if (invokeHandler()) {try {// 进入Handler的channelRead方法((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {invokeExceptionCaught(t);}} else {// 当前handler不处理,继续向下传递fireChannelRead(msg);}
}
逻辑较为清晰,通过invokeHandler()
判断当前ctx状态(handlerState)是否可以处理(是否已加入Pipeline),如果可以获取ctx的handler, 并调用handler的channelRead处理msg消息,否者将消息通过fireChannelRead继续向下传递。
处理Outbound事件与处理Inbound事件相同,不再单独展开介绍。
3.ChannelPipeline
整体上看,ChannelPipeline与ctx和handler的关系如下图所示:
在理解ChannelHandler和ctx后,可以将Pipeline理解为对二者的封装和增强。需要对外提供添加handler到pipeline链的方法以及触发Inbound和Outbound事件的方法,因此ChannelPipeline也需要继承ChannelInboundInvoker和ChannelOutboundInvoker。
接口定义如下:
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {ChannelPipeline addFirst(ChannelHandler... handlers);ChannelPipeline addLast(ChannelHandler... handlers);// ... 定义在pipeline增删改查ChannelHandler的接口// 重写ChannelInboundInvoker和ChannelOutboundInvoker方法,返回值修改为ChannelPipeline// 重写的方法有: //fireChannelRegistered/fireChannelUnregistered、fireChannelActive/fireChannelInactive、fireChannelRead/fireChannelReadComplete、fireExceptionCaught/fireUserEventTriggered、fireChannelWritabilityChanged、flush@OverrideChannelPipeline fireChannelRegistered();// ...
}
核心接口可以分为两类:
[1] 将ChannelHandler添加到pipeline的ctx链路;
[2] ChannelInboundInvoker和ChannelOutboundInvoker提供的invoke方法;
由于AbstractChannelHandlerContext承载的内容较多,为理解方便,对其按照功能进行拆分,分别介绍。
3.1 head和tail属性
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;protected DefaultChannelPipeline(Channel channel) {//...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}
创建DefaultChannelPipeline对象时,实例化TailContext和HeadContext,并相互指向。
3.2 channel
pipeline与channel相互持有,每个channel都有一个所属的pipeline.
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");//...
}
构造DefaultChannelPipeline时需要传入channel对象。
3.3 registered注册状态
DefaultChannelPipeline中的registered表示该Pipeline关联的channel是否已被注册成功(注册到选择器)。当注册成功时,registered会被设置为true.
3.4 添加handler
pipeline重载了很多添加handler的方法,以addLast(ChannelHandler)为例进行介绍:
public final ChannelPipeline addLast(ChannelHandler handler) {return addLast(null, handler);
}public final ChannelPipeline addLast(String name, ChannelHandler handler) {return addLast(null, name, handler);
}
继续跟进重载的last方法, 此时入参group和name为空:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}callHandlerAdded0(newCtx);return this;
}
逻辑可以分为四个部分:
[1] 添加前的校验: checkMultiplicity校验非注解Sharable的ChannelHandlerAdapter类型的Handler对象是否重复添加, 对其他类型不进行校验。
[2] 构造ctx对象:newContext方法根据传入ChannelHandler构造ctx对象;
[3] 加入pipeline链表:addLast0将新创建的ctx添加值tail的前驱结点;
[4] 加入后逻辑:根据pipeline是否已注册、当前线程是否为ctx线程池线程确定不同的后续逻辑。
checkMultiplicity、newContext和addLast0逻辑较为简单,不进行介绍,这里重点关注一下第四部分。
pipeline未注册:
当pipeline未注册时(关联的channel未注册到选择器),按如下逻辑执行:
if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;
}
将ctx对象的状态设置为加入中AddPending,并调用callHandlerCallbackLater将任务保存到内存中,以便注册后取出来执行。
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {assert !registered;PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);PendingHandlerCallback pending = pendingHandlerCallbackHead;if (pending == null) {pendingHandlerCallbackHead = task;} else {while (pending.next != null) {pending = pending.next;}pending.next = task;}
}
将ctx对象包装为PendingHandlerAddedTask对象,添加到pendingHandlerCallbackHead队尾。这里的pendingHandlerCallbackHead属性保存了任务链表,当pipeline被注册后,可通过调用pipeline的invokeHandlerAddedIfNeeded方法按序执行这些任务。
final void invokeHandlerAddedIfNeeded() {assert channel.eventLoop().inEventLoop();if (firstRegistration) {firstRegistration = false;callHandlerAddedForAllHandlers();}
}
firstRegistration布尔属性保证了callHandlerAddedForAllHandlers只会被调用一次,跟进callHandlerAddedForAllHandlers方法:
private void callHandlerAddedForAllHandlers() {final PendingHandlerCallback pendingHandlerCallbackHead;synchronized (this) {assert !registered;registered = true;pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;this.pendingHandlerCallbackHead = null;}PendingHandlerCallback task = pendingHandlerCallbackHead;while (task != null) {task.execute();task = task.next;}
}
registered属性设置为true后从pendingHandlerCallbackHead属性取出PendingHandlerAddedTask任务,并依次执行。查看PendingHandlerAddedTask的execute方法:
void execute() {EventExecutor executor = ctx.executor();if (executor.inEventLoop()) {callHandlerAdded0(ctx);} else {try {executor.execute(this);} catch (RejectedExecutionException e) {if (logger.isWarnEnabled()) {logger.warn("Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",executor, ctx.name(), e);}atomicRemoveFromHandlerList(ctx);ctx.setRemoved();}}
}
将callHandlerAdded0(ctx);
方法调用委托给ctx的线程池(如果当前线程是ctx的线程池,直接执行)。
pipeline已注册:
已注册时,整理得到的执行逻辑如下:
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;
} else {callHandlerAdded0(newCtx);return this;
}
与PendingHandlerAddedTask的execute方法的逻辑类似,将callHandlerAdded0(newCtx);的执行任务交给ctx的线程池。
callHandlerAdded0方法删除catch的异常逻辑后代码如下:
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {//... 省略异常处理逻辑}
}final void callHandlerAdded() throws Exception {if (setAddComplete()) {handler().handlerAdded(this);}
}
核心逻辑为调用ctx关联的channelHandler对象的handlerAdded方法。
一般而言,自定义channelHandler的handlerAdded方法的实现为空;这里是为了处理ChannelInitializer这类特殊的channelHandler子类:
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {removeState(ctx);}}}
}
这里的步骤是调用ChannelInitializer的initChannel方法(细节在netty启动流程文章中介绍)。
3.6 pipeline的封装
pipeline的作用是处理channel相关的IO消息,在Netty框架中对应Inbound和Outbound事件;因此需要向外暴露触发Inbound和Outbound事件的接口,以便向消息发送给Pipeline处理。
逻辑相同,本文仅以fireChannelRead为例介绍。
public final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;
}
这里的head是pipeline的ctx链路的首节点,跟踪AbstractChannelHandlerContext的invokeChannelRead方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}
通过executor.inEventLoop()判断:非ctx线程—将核心逻辑提交给ctx线程池处理,否则直接执行。核心逻辑通过调用head的invokeChannelRead方法,将msg消息沿着head向下传递,消息传递过程在本章第2章已介绍过,不再赘述。