Netty系列-4 Pipeline和Handler

背景

Netty将IO事件按照流向划分为两个部分:Inbound入站事件和Outbound出站事件。入站事件由外部触发,包括通道注册(register)、通道激活(active)、数据可读(read)、通道异常(exceptionCaught)等;出站事件由程序主动触发,如连接的建立(bind, connect)与关闭(closh),数据的写出(write、flush)等。
先整体上看, Netty的每个通道都独有一个Pipeline,通道的出站和入站事件在Pipeline中流动和处理,如下所示:
在这里插入图片描述上图包含了Netty引入的ChannelHandler、ChannelHandlerContext、Pipeline等三个概念:
[1] ChannelHandler: 真正处理读写事件的类;
[2] ChannelHandlerContext: 每个ctx(ChannelHandlerContext的缩写)内部包含一个ChannelHandler;
[3] Pipeline内部维护一个元素类型为ChannelHandlerContext的双向链表队列,队头位HeadContext, 队尾为TailContext, 可以在二者之间添加业务ctx;
本文的重点内容是结合Netty代码对上图进行介绍。

1.ChannelHandler

从整体上看,IO事件和数据在Pipeline的ctx间流动,实际是在ctx的ChannelHandler节点之间流动。每个ChannelHandler都有机会选择是否处理流动的事件,可以选择:忽略事件、处理事件并中断流动(不再传递)、处理事件并继续传递事件。事件分类为入站事件和出站事件,入站ChannelHandler只能处理入站事件,出站ChannelHandler只能处理出站事件。

1.1 ChannelHandler接口

ChannelHandler是Netty中的一个核心接口,被用来拦截和处理各种IO事件,如通道的注册、激活、数据的读写等。使用Netty框架开发程序,本质是开发ChannelHandler,并将自定义的ChannelHandler注册到Netty框架中。ChannelHandler的接口定义如下:

public interface ChannelHandler {void handlerAdded(ChannelHandlerContext ctx) throws Exception;void handlerRemoved(ChannelHandlerContext ctx) throws Exception;// ...
}

包含两个接口,handlerAdded和handlerRemoved,子类可以将ChannelHandler注册到系统以及从系统移除时需要执行的逻辑实现在这两个方法中。
ChannelHandler的核心功能的定义在两个子类中,ChannelInboundHandler处理入站事件,ChannelOutboundHandler处理出站事件:
ChannelInboundHandler入站接口:

public interface ChannelInboundHandler extends ChannelHandler {void channelRegistered(ChannelHandlerContext ctx) throws Exception;void channelUnregistered(ChannelHandlerContext ctx) throws Exception;void channelActive(ChannelHandlerContext ctx) throws Exception;void channelInactive(ChannelHandlerContext ctx) throws Exception;void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;void channelReadComplete(ChannelHandlerContext ctx) throws Exception;void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;// ... 以及userEventTriggered, channelWritabilityChanged等接口
}

负责处理的事件包括:通道注册到选择器时、通道激活(连接成功)时、数据可读以及数据读取完毕时。
ChannelOutboundHandler出站接口:

public interface ChannelOutboundHandler extends ChannelHandler {void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress,SocketAddress localAddress, ChannelPromise promise) throws Exception;void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;void flush(ChannelHandlerContext ctx) throws Exception;//... 以及read, disconnect, deregister等接口
}

负责处理的事件包括:连接的建立(bind, connect)与关闭(closh),数据的写出(write、flush)。
实现ChannelInboundHandler和ChannelOutboundHandler接口的类将同时具备处理入站和出站事件的能力。

1.2 案例介绍

程序使用了3个ChannelInboundHandler子类, 分别是ChannelInboundHandlerFirst, ChannelInboundHandlerSecond, ChannelInboundHandlerThird,依次添加到Pipeline中,构成的pipeline如下图所示(省去字符串编解码):
在这里插入图片描述 有消息到达时,消息的流向按如下顺序: HeadContext -> ChannelInboundHandlerFirst -> ChannelInboundHandlerSecond -> ChannelInboundHandlerThird -> TailContext.
3个ChannelInboundHandler子类的代码依次为:

public class ChannelInboundHandlerFirst extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 打印消息System.out.println(msg);// 修改消息,并继续向下传递消息ctx.fireChannelRead("First_" + msg);}
}public class ChannelInboundHandlerSecond extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 仅打印消息,不继续传递消息System.out.println(msg);}
}public class ChannelInboundHandlerThird extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 仅打印消息,不继续传递消息System.out.println(msg);}
}

当客户端发送"hello server"字符串给服务器时:

Microsoft Telnet> send hello server
发送字符串 hello server
Microsoft Telnet>

服务器日志打印为:

[First] hello world
[Second] First_hello world

结果分析:
Pipeline对应的通道有消息可读时,消息进入Pipeline:
[1] HeadContext不处理消息,直接传递给ChannelInboundHandlerFirst;
[2] ChannelInboundHandlerFirst接收到消息后,消费消息(打印), 修改消息,并继续向下游触发消息;
[3] ChannelInboundHandlerSecond收到ChannelInboundHandlerFirst触发的消息,仅消费消息(打印),不向下游触发, 消息至此中断;
[4] ChannelInboundHandlerThird因没有上游触发消息,channelRead方法永远不会被调用。

2.ChannelHandlerContext

Netty中我们只需要关注4个ChannelHandlerContext子类,其次为AbstractChannelHandlerContext、DefaultChannelHandlerContext、HeadContext和TailContext,其类结构关系图如下所示:
在这里插入图片描述ChannelHandlerContext接口类定义了ctx规范,核心实现在抽象类AbstractChannelHandlerContext中,HeadContext和TailContext分别作为Pipeline中双向链表首尾两个节点,其他所有的节点类型为DefaultChannelHandlerContext类型。
本章内容先介绍ChannelHandlerContext接口,再介绍实现类DefaultChannelHandlerContext、HeadContext和TailContext,最后再介绍抽象类AbstractChannelHandlerContext,以便突出AbstractChannelHandlerContext的核心作用。

2.1 ChannelHandlerContext接口

介绍ChannelHandlerContext接口之前,有必要简单介绍一下其父类ChannelInboundInvoker和ChannelOutboundInvoker。ChannelInboundInvoker和ChannelOutboundInvoker分别定义了主动触发Inbound和Outbound事件的方法。如当可读消息到达当前节点,并需要向下级传递时,需要通过Invoker。
ChannelHandlerContext对象(后续使用ctx表示),每个ctx都有关联的通道channel、所属的Pipeline、所包含的处理器handler、内部使用的线程池executor,接口需要定义获取这些组件的方法。
ctx还需要拥有向上游或者下游传递事件和消息的能力,因此需要继承ChannelInboundInvoker和ChannelOutboundInvoker接口。
ChannelHandlerContext接口定义如下所示:

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {Channel channel();ChannelPipeline pipeline();ChannelHandler handler();EventExecutor executor();boolean isRemoved();@Override// 重写ChannelInboundInvoker, ChannelOutboundInvoker的方法,修改返回值为ChannelHandlerContext// fireChannelRegistered/fireChannelUnregistered、fireChannelActive/fireChannelInactive、fireChannelRead/fireChannelReadComplete、fireExceptionCaught、fireChannelWritabilityChanged/fireUserEventTriggered、read/flushChannelHandlerContext fireChannelRegistered();//...
}

另外,引入了isRemoved方法表示ctx是否从所属的pipeline中被移除了。

2.2 DefaultChannelHandlerContext

DefaultChannelHandlerContext实现类极其简单,内部维护一个ChannelHandler类型的处理器handler,重写handler()返回该handler对象。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {private final ChannelHandler handler;DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, handler.getClass());this.handler = handler;}@Overridepublic ChannelHandler handler() {return handler;}
}

其构造函数在DefaultChannelPipeline中的newContext被调用,用于构造ctx.

2.3 HeadContext和TailContext

HeadContext:

final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, HeadContext.class);unsafe = pipeline.channel().unsafe();setAddComplete();}@Overridepublic ChannelHandler handler() {return this;}// handlerRemoved和handlerAdded实现为空@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {}//...// 【outbound】bind/connect/disconnect/close/deregister/read/write/flush@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}//...//  【inbound】channelRegistered/channelUnregistered、channelActive/channelInactive、channelRead/channelReadComplete、userEventTriggered/channelWritabilityChanged、exceptionCaught@Overridepublic void channelRegistered(ChannelHandlerContext ctx) {invokeHandlerAddedIfNeeded();ctx.fireChannelRegistered();}//...
}

整体上看,HeadContext属于ChannelInboundHandler也属于ChannelOutboundHandler,即可处理入站和出站事件:
[1] HeadContext内部维持了一个unbsafe对象,取自pipeline关联的通道的unsafe对象(后续文章介绍),HeadContext将内部所有的outbound事件委托给unsafe处理, 如上述源码的bind方法;
[2] 对于所有的Inbound消息,HeadContext直接通过Invoker向下游传递。
TailContext:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {TailContext(DefaultChannelPipeline pipeline) {super(pipeline, null, TAIL_NAME, TailContext.class);setAddComplete();}@Overridepublic ChannelHandler handler() {return this;}// channelRegistered/channelUnregistered、channelActive/channelInactive、handlerAdded/handlerRemoved、userEventTriggered/exceptionCaught、channelRead/channelReadComplete// 实现为空,即消息到此丢弃,不再向前或向后传递@Overridepublic void channelRegistered(ChannelHandlerContext ctx) { }//...
}

整体上看,TailContext是一个ChannelInboundHandler,对于所有的消息直接忽略,不向前和前后传递任何消息。

2.4 AbstractChannelHandlerContext抽象类

由于AbstractChannelHandlerContext承载的内容较多,为理解方便,对其按照功能进行拆分,分别介绍。

2.4.1 获取channel、Pipeline、handler、executor方法

channel、Pipeline、handler、executor是ctx工作的基础:
[1] 内部维持一个DefaultChannelPipeline类型的pipeline对象,赋值来自构造函数;
[2] channel与pipeline相互绑定, 因此可通过pipeline获取channel;
[3] handler在子类中实现;
[4] executor来自构造函数,一般入参为空,使用channel绑定的NioEventLoop线程池对象;
对应的方法实现如下:

@Override
public Channel channel() {return pipeline.channel();
}@Override
public ChannelPipeline pipeline() {return pipeline;
}@Override
public EventExecutor executor() {if (executor == null) {return channel().eventLoop();} else {return executor;}
}

2.4.2 next和prev属性

Pipeline内部包含一个ChannelHandlerContext类型的双向链表,ctx通过next和prev的指向将多个ctx串连成链表,属性定义如下:

volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;

因此,pipeline内部仅需要维护首尾节点即可。

2.4.3 executionMask属性和findContextInbound和findContextOutbound方法

Netty引入executionMask表示该ctx对象可处理的事件类型。findContextInbound(mask)用于沿着head->ctx->tail的顺序查找下一个可以处理mask类型Inbound事件的ctx对象;findContextOutbound沿着反向(tail -> ctx -> head)顺序查找上一个可以处理mask;类型Outbound事件的ctx对象。
先看一下mask的定义:

static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
//... 从1 << 1定义至1 << 16,用于表示不同的读写事件
static final int MASK_CHANNEL_READ = 1 << 5;
//...
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

具体为每个读写事件定义了一个宏定义,MASK_ONLY_INBOUND包括所有的Inbound事件,MASK_ALL_OUTBOUND包括所有的Outbound事件,MASK_ALL_INBOUND包括所有的Inbound和Outbound事件。
ctx对象在构造阶段,通过ChannelHandler的类型确定executionMask属性值:

this.executionMask = mask(handlerClass);static int mask(Class<? extends ChannelHandler> clazz) {Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();Integer mask = cache.get(clazz);if (mask == null) {mask = mask0(clazz);cache.put(clazz, mask);}return mask;
}

mask对mask0添加了一层缓存优化:

private static int mask0(Class<? extends ChannelHandler> handlerType) {int mask = MASK_EXCEPTION_CAUGHT;if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {mask |= MASK_ALL_INBOUND;if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_REGISTERED;}//... 省略相同逻辑:channelUnregistered、channelActive/channelInactive、channelRead/channelReadComplete、channelWritabilityChanged/userEventTriggeredif (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_UNREGISTERED;}}if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {mask |= MASK_ALL_OUTBOUND;if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,SocketAddress.class, ChannelPromise.class)) {mask &= ~MASK_BIND;}//... 省略相同逻辑:connect/disconnect、close/deregister, read、write/flushif (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_UNREGISTERED;}}//...return mask;
}

逻辑较为简单,根据ChannelHandler的类型和是否实现对应的方法(不包括从父类继承的)计算mask值。如前面提到的ChannelInboundHandlerFirst:

public class ChannelInboundHandlerFirst extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 打印消息System.out.println(msg);// 修改消息,并继续向下传递消息ctx.fireChannelRead("First_" + msg);}
}

属于Inbound类型,且仅实现了channelRead方法,因此ChannelInboundHandlerFirst最终计算得到的mask值为32, 即宏定义MASK_CHANNEL_READ的值.
findContextInbound和findContextOutbound方法根据mask的值沿着pipeline的ctx链查找满足条件的ctx对象。
如findContextInbound(MASK_CHANNEL_READ)和findContextInbound(MASK_CHANNEL_READ_COMPLETE)表示查找下一个可以处理channelRead和channelReadComplete事件的InboundHandler, 具体实现如下:

private AbstractChannelHandlerContext findContextInbound(int mask) {AbstractChannelHandlerContext ctx = this;EventExecutor currentExecutor = executor();do {// 从当前ctx的下一个向后搜索ctx = ctx.next;} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));return ctx;
}private AbstractChannelHandlerContext findContextOutbound(int mask) {AbstractChannelHandlerContext ctx = this;EventExecutor currentExecutor = executor();do {// 从当前ctx的上一个向前搜索ctx = ctx.prev;} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));return ctx;
}

2.4.4 handlerState状态属性

引入handlerState属性记录加入Pipeline的状态,存在4种类型:

// 初始状态
private static final int INIT = 0;
// 正在加入到Pipeline
private static final int ADD_PENDING = 1;
// 已成功加入Pipeline
private static final int ADD_COMPLETE = 2;
// 已成功从pipeline移除
private static final int REMOVE_COMPLETE = 3;

handlerState初始值为INIT,引入一个原子对象用于修改handlerState的值:

private volatile int handlerState = INIT;private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

提供了三个方法(提供给Pipeline对象)用于设置状态值,以及提供一个方法查询是否被移除:

// 从INIT状态设置为ADD_PENDING状态
final void setAddPending() {boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);assert updated;
}// 设置为REMOVE_COMPLETE状态
final void setRemoved() {handlerState = REMOVE_COMPLETE;
}// 设置为ADD_COMPLETE状态
final boolean setAddComplete() {for (;;) {int oldState = handlerState;if (oldState == REMOVE_COMPLETE) {return false;}if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {return true;}}
}
// 查询是否被移除
public boolean isRemoved() {return handlerState == REMOVE_COMPLETE;
}

2.4.5 处理Inbound和Outbound事件

ChannelHandlerContext因实现了ChannelInboundInvoker接口具备了触发Inbound事件的能力,包括fireChannelRegistered/fireChannelUnregistered, fireChannelActive/fireChannelInactive, fireChannelRead/fireChannelReadComplete等。通过调用ctx对象的这些方法可以将消息以Inbound事件的方式在ctx链路间中传递,即将消息发送给Pipeline处理。

由于机制相同,本文仅以channelRead为案例进行介绍。

调用ctx的fireChannelRead时通过msg参数传递消息:

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);return this;
}

findContextInbound(MASK_CHANNEL_READ)前文已介绍:从当前ctx(不包括当前)向后找,找到下一个可以处理channelRead的ctx对象。
继续根据invokeChannelRead方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {// pipeline对特殊类型的处理,这里可以理解为直接返回msg(不处理)final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}

这里判断当前线程是否是ctx的线程池线程,如果是直接执行next.invokeChannelRead(m), 否则提交给ctx的线程池线程执行;从而确保了next.invokeChannelRead(m)任务只能由ctx的线程池执行。
使用Netty框架时,向pipeline添加ChannelHandler时一般不会额外指定线程池,因此ctx对象的线程池对象就是channel绑定的NioEventLoop对象;因此,此处逻辑为直接执行next.invokeChannelRead(m).
跟进next.invokeChannelRead(m);语句,实现如下:

private void invokeChannelRead(Object msg) {// 根据状态判断是否可以处理if (invokeHandler()) {try {// 进入Handler的channelRead方法((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {invokeExceptionCaught(t);}} else {// 当前handler不处理,继续向下传递fireChannelRead(msg);}
}

逻辑较为清晰,通过invokeHandler()判断当前ctx状态(handlerState)是否可以处理(是否已加入Pipeline),如果可以获取ctx的handler, 并调用handler的channelRead处理msg消息,否者将消息通过fireChannelRead继续向下传递。
处理Outbound事件与处理Inbound事件相同,不再单独展开介绍。

3.ChannelPipeline

整体上看,ChannelPipeline与ctx和handler的关系如下图所示:
在这里插入图片描述
在理解ChannelHandler和ctx后,可以将Pipeline理解为对二者的封装和增强。需要对外提供添加handler到pipeline链的方法以及触发Inbound和Outbound事件的方法,因此ChannelPipeline也需要继承ChannelInboundInvoker和ChannelOutboundInvoker。
接口定义如下:

public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {ChannelPipeline addFirst(ChannelHandler... handlers);ChannelPipeline addLast(ChannelHandler... handlers);// ... 定义在pipeline增删改查ChannelHandler的接口// 重写ChannelInboundInvoker和ChannelOutboundInvoker方法,返回值修改为ChannelPipeline// 重写的方法有:	//fireChannelRegistered/fireChannelUnregistered、fireChannelActive/fireChannelInactive、fireChannelRead/fireChannelReadComplete、fireExceptionCaught/fireUserEventTriggered、fireChannelWritabilityChanged、flush@OverrideChannelPipeline fireChannelRegistered();// ...
}

核心接口可以分为两类:
[1] 将ChannelHandler添加到pipeline的ctx链路;
[2] ChannelInboundInvoker和ChannelOutboundInvoker提供的invoke方法;
由于AbstractChannelHandlerContext承载的内容较多,为理解方便,对其按照功能进行拆分,分别介绍。

3.1 head和tail属性

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;protected DefaultChannelPipeline(Channel channel) {//...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}

创建DefaultChannelPipeline对象时,实例化TailContext和HeadContext,并相互指向。

3.2 channel

pipeline与channel相互持有,每个channel都有一个所属的pipeline.

protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");//...
}

构造DefaultChannelPipeline时需要传入channel对象。

3.3 registered注册状态

DefaultChannelPipeline中的registered表示该Pipeline关联的channel是否已被注册成功(注册到选择器)。当注册成功时,registered会被设置为true.

3.4 添加handler

pipeline重载了很多添加handler的方法,以addLast(ChannelHandler)为例进行介绍:

public final ChannelPipeline addLast(ChannelHandler handler) {return addLast(null, handler);
}public final ChannelPipeline addLast(String name, ChannelHandler handler) {return addLast(null, name, handler);
}

继续跟进重载的last方法, 此时入参group和name为空:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}callHandlerAdded0(newCtx);return this;
}

逻辑可以分为四个部分:
[1] 添加前的校验: checkMultiplicity校验非注解Sharable的ChannelHandlerAdapter类型的Handler对象是否重复添加, 对其他类型不进行校验。
[2] 构造ctx对象:newContext方法根据传入ChannelHandler构造ctx对象;
[3] 加入pipeline链表:addLast0将新创建的ctx添加值tail的前驱结点;
[4] 加入后逻辑:根据pipeline是否已注册、当前线程是否为ctx线程池线程确定不同的后续逻辑。
checkMultiplicity、newContext和addLast0逻辑较为简单,不进行介绍,这里重点关注一下第四部分。
pipeline未注册:
当pipeline未注册时(关联的channel未注册到选择器),按如下逻辑执行:

if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;
}

将ctx对象的状态设置为加入中AddPending,并调用callHandlerCallbackLater将任务保存到内存中,以便注册后取出来执行。

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {assert !registered;PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);PendingHandlerCallback pending = pendingHandlerCallbackHead;if (pending == null) {pendingHandlerCallbackHead = task;} else {while (pending.next != null) {pending = pending.next;}pending.next = task;}
}

将ctx对象包装为PendingHandlerAddedTask对象,添加到pendingHandlerCallbackHead队尾。这里的pendingHandlerCallbackHead属性保存了任务链表,当pipeline被注册后,可通过调用pipeline的invokeHandlerAddedIfNeeded方法按序执行这些任务。

final void invokeHandlerAddedIfNeeded() {assert channel.eventLoop().inEventLoop();if (firstRegistration) {firstRegistration = false;callHandlerAddedForAllHandlers();}
}

firstRegistration布尔属性保证了callHandlerAddedForAllHandlers只会被调用一次,跟进callHandlerAddedForAllHandlers方法:

private void callHandlerAddedForAllHandlers() {final PendingHandlerCallback pendingHandlerCallbackHead;synchronized (this) {assert !registered;registered = true;pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;this.pendingHandlerCallbackHead = null;}PendingHandlerCallback task = pendingHandlerCallbackHead;while (task != null) {task.execute();task = task.next;}
}

registered属性设置为true后从pendingHandlerCallbackHead属性取出PendingHandlerAddedTask任务,并依次执行。查看PendingHandlerAddedTask的execute方法:

void execute() {EventExecutor executor = ctx.executor();if (executor.inEventLoop()) {callHandlerAdded0(ctx);} else {try {executor.execute(this);} catch (RejectedExecutionException e) {if (logger.isWarnEnabled()) {logger.warn("Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",executor, ctx.name(), e);}atomicRemoveFromHandlerList(ctx);ctx.setRemoved();}}
}

callHandlerAdded0(ctx);方法调用委托给ctx的线程池(如果当前线程是ctx的线程池,直接执行)。
pipeline已注册:
已注册时,整理得到的执行逻辑如下:

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;
} else {callHandlerAdded0(newCtx);return this;
}

与PendingHandlerAddedTask的execute方法的逻辑类似,将callHandlerAdded0(newCtx);的执行任务交给ctx的线程池。
callHandlerAdded0方法删除catch的异常逻辑后代码如下:

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {//... 省略异常处理逻辑}
}final void callHandlerAdded() throws Exception {if (setAddComplete()) {handler().handlerAdded(this);}
}

核心逻辑为调用ctx关联的channelHandler对象的handlerAdded方法。
一般而言,自定义channelHandler的handlerAdded方法的实现为空;这里是为了处理ChannelInitializer这类特殊的channelHandler子类:

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {removeState(ctx);}}}
}

这里的步骤是调用ChannelInitializer的initChannel方法(细节在netty启动流程文章中介绍)。

3.6 pipeline的封装

pipeline的作用是处理channel相关的IO消息,在Netty框架中对应Inbound和Outbound事件;因此需要向外暴露触发Inbound和Outbound事件的接口,以便向消息发送给Pipeline处理。
逻辑相同,本文仅以fireChannelRead为例介绍。

public final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;
}

这里的head是pipeline的ctx链路的首节点,跟踪AbstractChannelHandlerContext的invokeChannelRead方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}

通过executor.inEventLoop()判断:非ctx线程—将核心逻辑提交给ctx线程池处理,否则直接执行。核心逻辑通过调用head的invokeChannelRead方法,将msg消息沿着head向下传递,消息传递过程在本章第2章已介绍过,不再赘述。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/147863.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能不是人工“制”能

文/孟永辉 如果你去过今年在上海举办的世界人工智能大会&#xff0c;就会知道当下的人工智能行业在中国是多么火爆。 的确&#xff0c;作为第四次工业革命的重要组成部分&#xff0c;人工智能愈发引起越来越多的重视。 不仅仅是在中国&#xff0c;当今世界的很多工业强国都在将…

828华为云征文|云服务器Flexus X实例|MacOS系统-宝塔部署Nuxt项目

文章目录 1. Flexus云服务器X实例1.1 与Flexus应用服务器L实例相比具备以下优势1.2 服务器的详细配置 2.宝塔部署Nuxt项目2.1 登录实例2.1 宝塔面板 3. Nuxt 项目与部署3.1 Nuxt3.2创建Nuxt项目3.3 部署3.4 部署成功 4.结语 1. Flexus云服务器X实例 华为云的Flexus云服务是为中…

C++高精度计时方法总结(测试函数运行时间)

文章目录 一、clock()函数——毫妙级二、GetTickCount()函数&#xff08;精度16ms左右&#xff09;——毫妙级三、高精度时控函数QueryPerformanceCounter()——微妙级四、高精度计时chrono函数——纳妙级五、几种计时比较六、linux下的计时函数gettimeofday()-未测试参考文献 …

typedef的用法

typedef只有一种用法&#xff0c;那就是&#xff1a; 1,代替各种类型或某类&#xff08;结构体&#xff09;成员。 比如下列代码&#xff1a; #include <iostream> #include <string> int main() {typedef int i;i e3;int f3;std::string t_or_f(ef)?"tru…

OpenAI的O1模型达到AGI二级,类人推理能力被提示危险,细思极恐!

大家好&#xff0c;我是Shelly&#xff0c;一个专注于输出AI工具和科技前沿内容的AI应用教练&#xff0c;体验过300款以上的AI应用工具。关注科技及大模型领域对社会的影响10年。关注我一起驾驭AI工具&#xff0c;拥抱AI时代的到来。 今天让我们一起来聊聊最近科技圈的大新闻—…

利士策分享,家庭内耗:隐形的风暴,无声的侵蚀

利士策分享&#xff0c;家庭内耗&#xff1a;隐形的风暴&#xff0c;无声的侵蚀 在温馨的灯光下&#xff0c;家本应是我们心灵的港湾&#xff0c;是疲惫时最坚实的依靠。 然而&#xff0c;当家庭内部出现裂痕&#xff0c;无形的内耗便如同冬日里的寒风&#xff0c;悄无声息地…

SpringBoot 3.4.0还没来之前,又又又更新啦!SpringBoot 3.3.4版本依赖升级,性能与稳定性再提升!

为什么要使用SpringBoot在现代开发中&#xff0c;高效与灵活性是每个开发团队追求的核心目标。然而&#xff0c;如何在不牺牲灵活性的前提下&#xff0c;快速构建复杂的应用程序&#xff0c;常常成为开发者的难题。SpringBoot的出现&#xff0c;正是为了解决这个矛盾。它以“约…

Spring Boot技术在高校心理辅导系统中的应用研究

3 系统分析 3.1可行性分析 在进行可行性分析时&#xff0c;我们通常根据软件工程里方法&#xff0c;通过四个方面来进行分析&#xff0c;分别是技术、经济、操作和法律可行性。因此&#xff0c;在基于对目标系统的基本调查和研究后&#xff0c;对提出的基本方案进行可行性分析。…

【C++初阶】探索STL之——vector

【C初阶】探索STL之——vector 1.什么是vector2.vector的使用2.1 vector的定义2.2 vector iterator(迭代器)的使用2.3 vector空间问题2.4 vector的增删查改2.5 vector迭代器失效的问题2.5.1 vector常见迭代器失效的操作 3 动态二位数组 1.什么是vector vector其实就是一个可以…

GNU链接器(LD):设置入口点(ENTRY命令)的用法及实例解析

0 参考资料 GNU-LD-v2.30-中文手册.pdf GNU linker.pdf1 前言 一个完整的编译工具链应该包含以下4个部分&#xff1a; &#xff08;1&#xff09;编译器 &#xff08;2&#xff09;汇编器 &#xff08;3&#xff09;链接器 &#xff08;4&#xff09;lib库 在GNU工具链中&…

3.5.2 __ipipe_init()之完成中断处理程序设置

点击查看系列文章 》 Interrupt Pipeline系列文章大纲-CSDN博客 原创不易&#xff0c;需要大家多多鼓励&#xff01;您的关注、点赞、收藏就是我的创作动力&#xff01; 3.5.2 __ipipe_init()之完成中断处理程序设置 __ipipe_init()最核心的就是__ipipe_enable_pipeline()&am…

Mybatis自定义TypeHandler,直接存储枚举类对象

在这篇文章中&#xff0c;我们已经知道如何使用枚举类直接接受前端的数字类型参数&#xff0c;省去了麻烦的转换。如果数据库需要保存枚举类的code&#xff0c;一般做法也是代码中手动转换&#xff0c;那么能不能通过某种机制&#xff0c;省去转换&#xff0c;达到代码中直接保…

PowerMill 2025简体中文版百度云资源分享下载

如大家所了解的&#xff0c;PowerMill是一款专业的CAM&#xff08;计算机辅助制造&#xff09;软件。主要用于加工行业&#xff0c;可以帮助用户进行高效、精准的加工工艺设计和数控编程&#xff0c;以达到生产部件的高精度和高质量。 对于初次接触的小伙伴来说&#xff0c;目…

k均值vs高斯混合模型

K均值&#xff08;K-means&#xff09;和高斯混合模型&#xff08;Gaussian Mixture Model, GMM&#xff09;是常用的聚类算法。 K均值是非概率模型&#xff0c;根据&#xff08;欧氏&#xff09;距离判断&#xff0c;类比最小距离分类器&#xff08;分类&#xff09;。高斯混…

240922-chromadb的基本使用

A. 基本使用 ChromaDB 是一个专门为向量数据库和嵌入查询优化的数据库。它可以与嵌入模型结合使用&#xff0c;存储和查询高维向量数据&#xff0c;通常用于大规模语义搜索、推荐系统等领域。 以下是使用 ChromaDB 的步骤&#xff1a; 1. 安装 ChromaDB 你可以通过 pip 安装…

96. UE5 GAS RPG 实现闪电链技能(一)

闪电链有一个施法的过程&#xff0c;就是在按键按下的过程&#xff0c;会在按下的过程一直持续造成伤害&#xff0c;一直等到条件不满足&#xff08;技能键位抬起&#xff0c;蓝量不足&#xff0c;被眩晕&#xff09;时&#xff0c;将结束技能&#xff0c;并退出技能状态。 所以…

【WSL迁移】将WSL2迁移到D盘

首先查看WSL状态&#xff1a;wsl -l -v 以压缩包的形式导出到其他盘。 wsl --export Ubuntu D:\Ubuntu_WSL\ubuntu.tar 注销原有的linux系统 wsl --unregister Ubuntu 导入系统到D盘 wsl --import Ubuntu D:\Ubuntu_WSL D:\Ubuntu_WSL\Ubuntu.tar 恢复默认用户 Ubuntu co…

如何保护您的机器学习模型

在计算机技术领域&#xff0c;很少有领域像人工智能(AI)和机器学习(ML)一样受到如此多的关注。这门学科位于计算机科学和数据分析的交叉点&#xff0c;已成为移动应用程序、语音助手、欺诈交易检测、图像识别、自动驾驶甚至医疗诊断不可或缺的一部分。 背景介绍由于机器学习模型…

数据结构与算法——Java实现 9.习题——删除链表倒数节点

目录 19. 删除链表的倒数第 N 个结点 方法1 通过链表长度直接删除 方法2 递归加入哨兵节点 ListNode 方法3 快慢指针法 苦难&#xff0c;区区挫折罢了&#xff0c;而我必定站在幸福的塔尖 —— 24.9.22 19. 删除链表的倒数第 N 个结点 给你一个链表&#xff0c;删除链表的倒数第…

预付费计量系统整体概念

1.预付费计量系统整体概念 A Payment Metering System is a collective infrastructure that supports the contractual relationship between a supplier of goods or services and a customer. It includes processes, functions, data elements, system entities (devices a…