【Netty】ByteToMessageDecoder源码解析

目录

1.协议说明

2.类的实现

3.Decoder工作流程

4.源码解析

4.1 ByteToMessageDecoder#channelRead

4.2 累加器Cumulator

 4.3 解码过程

4.4 Decoder实现举例

5. 如何开发自己的Decoder


1.协议说明

Netty框架是基于Java NIO框架,性能彪悍,支持的协议丰富,广受Java爱好者亲莱,支持如下协议

  • TCP/UDP:Netty提供了基于NIO的TCP和UDP编程框架,可以用来构建高性能、高可用性的网络应用。
  • HTTP/HTTPS:Netty提供了HTTP/HTTPS编程框架,可以用来开发Web服务器和客户端。
  • WebSocket:Netty提供了WebSocket编程框架,可以用来实现双向通信应用程序,如聊天室等。
  • SPDY/HTTP2:Netty提供了SPDY和HTTP2编程框架,可以用来实现高效的Web应用程序。
  • MQTT/CoAP:Netty提供了MQTT和CoAP编程框架,可以用来构建IoT应用程序。
     

我们在基于Netty框架开发过程中往往需要自定义私有协议,如端到端的通信协议,端到平台数据通信协议,我们需要根据业务的特点自定义数据报文格式,举例如下:

数据报文格式定义(TCP)
帧头版本命令标识符序列号设备编码帧长正文校验码
1byte1byte1byte2byte4byte       4byteN个byte2byte

假如我们定义了上述私有协议的TCP报文,通过netty框架发送和解析

发送端:某类通信设备(client)

接收端:Java应用服务(Server)

本节我主要分析一下server端解析报文的一个过程,client当然也很重要,尤其在建立TCP连接和关闭连接需要严格控制,否则服务端会发现大量的CLOSE_WAIT(被动关闭连接),甚至大量TIME_WAIT(主动关闭连接),关于这个处理之前的文章有讲解。

本节Server端是基于Netty版本:netty-all-4.1.30.Final

本节源码分析需求就是要解析一个自定义TCP协议的数据报文进行解码,关于编码解码熟悉网络编程的同学都明白,不清楚的可以稍微查阅一下资料有助于学习为什么要解码以及如何解码。本节不会对具体报文的解析做具体讲解,只对Netty提供的解码器基类ByteToMessageDecoder做一下源码分解,以及如何使用ByteToMessageDecoder开发属于自己的Decoder,接下来我们看看ByteToMessageDecoder的定义。

#继承ChannelInboundHandlerAdapter
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
}

2.类的实现

解码器的ByteToMessageDecoder ,该类继承了ChannelInboundHandlerAdapter ,ChannelInboundHandlerAdapter继承ChannelHandlerAdapter,

ChannelInboundHandlerAdapter实现ChannelInboundHandler接口,也就是说ChannelInboundHandler定义了解码器需要处理的工作(方法)
ChannelInboundHandlerAdapter是一个适配器模式,负责Decoder的扩展。它的实现有很多,简单列举一下:
  • HeartBeatHandler
  • MessageToMessageDecoder
  • SimpleChannelInboundHandler(抽象了方法channelRead0)
  •  ByteToMessageDecoder
  •  。。。。。。

以上都是比较常用的Decoder或Handler,基于这些基类还定义了很多handler,有兴趣的同学可以跟代码查阅。

3.Decoder工作流程

每当数据到达Server端时,SocketServer通过Reactor模型分配具体的worker线程进行处理数据,处理数据就需要我们的事先定义好的Decoder以及handler,假如我们定义了以下两个对象:

  • MyDecoder extends ByteToMessageDecoder{} 作为解码器
  • MyHandler extends SimpleChannelInboundHandler{} 作为解码后的业务处理器

worker线程——〉MyDecoder#channelRead实际就是调用ByteToMessageDecoder#channelRead——〉Cumulator累加器处理——〉

解码器decode处理(MyDecoder需要实现decode方法)——〉Myhandler#channelRead0处理具体的数据(msg)

4.源码解析

4.1 ByteToMessageDecoder#channelRead

    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//如果是设置在ServerBootstrap的childHandler那么msg的对象类型就是ByteBuf,否则就执行elseif (msg instanceof ByteBuf) {//CodecOutputList对象可以查阅文档https://www.freesion.com/article/4800509769///这个out对象随着callDecode方法进行传递,解码后的数据保存在out中CodecOutputList out = CodecOutputList.newInstance();try {ByteBuf data = (ByteBuf) msg;//1.cumulation是累加器,处理tcp半包与粘包问题first = cumulation == null;if (first) {//2.第一次收到数据累加器为nullcumulation = data;} else {//3.第二次收到数据累加器需要评估ByteBuf的capacity,够用则追加到cumulation,capacity不够则进行扩容cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);}//4.调用callDecode进行解码//5.CodecOutputList out对象保存解码后的数据,它的实现是基于AbstractList,//重新定义了add(),set(),remove()等方法,其中add()方法实现对Array数组中//进行insert,没有直接拷贝而是通过对象引用,将对象指向数据索引的index,是性能的一个提升。callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {//6.如果累加器cumulation中的数据被解码器读完了,则可以完全释放累加器cumulationif (cumulation != null && !cumulation.isReadable()) {numReads = 0;cumulation.release();cumulation = null;} else if (++ numReads >= discardAfterReads) {// We did enough reads already try to discard some bytes so we not risk to see a OOME.// See https://github.com/netty/netty/issues/4275//7.释放累加器cumulation里面的已读数据,防止cumulation无限制增长numReads = 0;discardSomeReadBytes();}int size = out.size();decodeWasNull = !out.insertSinceRecycled();//8.解码完成后需要触发事先定义好的handler的channelRead()方法处理解码后的out数据fireChannelRead(ctx, out, size);//9.最终需要回收out对象out.recycle();}} else {//10.非ByteBuf直接向后触发传递ctx.fireChannelRead(msg);}}

4.2 累加器Cumulator

累加器的作用是解决tcp数据包中出现半包和粘包问题。

半包:接收到的byte字节不足一个完整的数据包,

半包处理办法:不足一个完整的数据包先放入累加器不做解码,等待续传的数据包;

粘包:接收到的byte字节数据包中包括其他数据包的数据(靠数据包协议中定义的帧头帧尾标识来识别,多于1个以上的帧头或帧尾数据包为粘包数据),

粘包处理办法:按照数据包帧结构定义去解析,需要结合累加器,解析完一个数据包交给handler去处理,剩下的不足一个数据包长度的字节保存在累加器等待续传的数据包收到之后继续解码。

ByteToMessageDecoder内部定义了Cumulator接口

    /*** Cumulate {@link ByteBuf}s.*/public interface Cumulator {/*** Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.*/ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);}

其中在类最开始的时候构建了两个对象,分别是MERGE_CUMULATOR,COMPOSITE_CUMULATOR,代码如下

    /*** Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.*/public static final Cumulator MERGE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {try {final ByteBuf buffer;//1.如果累加器ByteBuf 剩余可写的capacity不满足当前需要写入的ByteBuf(in)长度,则进行扩容累加器ByteBuf容量,执行expandCumulation方法if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {buffer = expandCumulation(alloc, cumulation, in.readableBytes());} else {buffer = cumulation;}//2.写入累加器并返回更新后的cumulationbuffer.writeBytes(in);return buffer;} finally {// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw// for whatever release (for example because of OutOfMemoryError)//3.由于是对in的拷贝,所以需要releasein.release();}}};//通过对CompositeByteBuf的累加器的实现,CompositeByteBuf内部使用ComponentList//实现对ByteBuf进行追加//ComponentList是ArrayList的实现,所以每次Add操作都是一次内存拷贝。public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {ByteBuf buffer;try {if (cumulation.refCnt() > 1) {buffer = expandCumulation(alloc, cumulation, in.readableBytes());buffer.writeBytes(in);} else {CompositeByteBuf composite;if (cumulation instanceof CompositeByteBuf) {composite = (CompositeByteBuf) cumulation;} else {composite = alloc.compositeBuffer(Integer.MAX_VALUE);composite.addComponent(true, cumulation);}composite.addComponent(true, in);in = null;buffer = composite;}return buffer;} finally {if (in != null) {//因为是对ByteBuf in的拷贝,所以需要释放in.release();}}}};

 4.3 解码过程

ByteToMessageDecoder#channelRead看到了将累加器交给callDecoder方法

//这里ByteBuf in 就是累加器对象cumulaction
// List<Object> out解码后的对象
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {//1. 循环读取累加器对象的bytewhile (in.isReadable()) {int outSize = out.size();//2.如果解码后out对象中产生数据则触发后边的handler(MyHandler)处理数据if (outSize > 0) {fireChannelRead(ctx, out, outSize);out.clear();if (ctx.isRemoved()) {break;}outSize = 0;}//3.继续解析累加器传递过来的byteint oldInputLength = in.readableBytes();//4.注意out对象是从channelRead()方法传递过来,继续传递下去decodeRemovalReentryProtection(ctx, in, out);if (ctx.isRemoved()) {break;}//4.如果这次解码没有获得任何消息if (outSize == out.size()) {//5.如果解码器decode没有消费累加器 in 任何字节,结束循环                    if (oldInputLength == in.readableBytes()) {break;//6.否则继续循环调用解码器decode} else {continue;}}//7.如果累加器ByteBuf in中可读字节数依然没有变化,说明实现的解码器decode()方法有问题,需要检查自身代码问题if (oldInputLength == in.readableBytes()) {throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");}//8.是否设定每次调用解码器一次,如果是,则结束本次解码if (isSingleDecode()) {break;}                }} catch (DecoderException e) {} catch (Exception cause) {}}

继续查看ByteToMessageDecoder#decodeRemovalReentryProtection方法

    //1.此方法不允许重写final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {//2.核心方法decode,这是一个抽象方法,没有实现,需要在自定义的Decoder(Mydecoder)进行实现//3.自定义Decoder需要将解码后的数据放入到out对象中decode(ctx, in, out);} finally {boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;decodeState = STATE_INIT;if (removePending) {handlerRemoved(ctx);}}}//解码decode方法需要子类(自定义的实现类)去实现该方法,最终将解码后的数据放入List<Object> outprotected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

4.4 Decoder实现举例

基于ByteToMessageDecoder的实现很多,简单列举一下

  • JsonObjectDecoder
  • RedisDecoder
  • XmlDecoder
  • MqttDecoder
  • ReplayingDecoder
  • SslDecoder
  • DelimiterBasedFrameDecoder
  • FixedLengthFrameDecoder
  • LengthFieldBasedFrameDecoder
  • ....

我们拿JsonObjectDecoder举例如下:

    @Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 省略代码。。。。。。int idx = this.idx;int wrtIdx = in.writerIndex();//省略代码。。。。。。。for (/* use current idx */; idx < wrtIdx; idx++) {byte c = in.getByte(idx);if (state == ST_DECODING_NORMAL) {decodeByte(c, in, idx);if (openBraces == 0) {ByteBuf json = extractObject(ctx, in, in.readerIndex(), idx + 1 - in.readerIndex());//1.解析后的对象加入out中if (json != null) {out.add(json);}in.readerIndex(idx + 1);reset();}} else if (state == ST_DECODING_ARRAY_STREAM) {//2.自身实现解析json格式的方法decodeByte(c, in, idx);if (!insideString && (openBraces == 1 && c == ',' || openBraces == 0 && c == ']')) {for (int i = in.readerIndex(); Character.isWhitespace(in.getByte(i)); i++) {in.skipBytes(1);}// skip trailing spaces.int idxNoSpaces = idx - 1;while (idxNoSpaces >= in.readerIndex() && Character.isWhitespace(in.getByte(idxNoSpaces))) {idxNoSpaces--;}ByteBuf json = extractObject(ctx, in, in.readerIndex(), idxNoSpaces + 1 - in.readerIndex());//3.解析后的对象加入out中if (json != null) {out.add(json);}in.readerIndex(idx + 1);if (c == ']') {reset();}}} //省略代码。。。。。。}if (in.readableBytes() == 0) {this.idx = 0;} else {this.idx = idx;}this.lastReaderIndex = in.readerIndex();}

5. 如何开发自己的Decoder

读了ByteToMessageDecoder的部分源码,以及它的实现JsonObjectDecoder,那么如果我们自己实现一个Decoder该如何实现,这里提供三个思路给大家,有时间再补充代码。

  • 基于ByteToMessageDecoder实现,MyDecoder extends ByteToMessageDecoder{实现decode()方法},可参考RedisDecoder、XmlDecoder等实现。
  • 基于ChannelInboundHandlerAdapter实现,这个时候需要自己负责解决TCP报文半包和粘包问题,重写其中的channelRead()方法。
  • 直接使用已经实现ByteToMessageDecoder的解码器,如FixedLengthFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder。

注意事项:

 * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
 * annotated with {@link @Sharable}.

ByteToMessageDecoder的子类不能使用@Sharable注解修饰,因为解码器只能单独为一个Channel进行解码,也就是说每个worker线程需要独立的Decoder。


 * <p>
 * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
 * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
 * to avoid leaking memory.

如果基于ChannelInboundHandlerAdapter自己实现Decoder#channelRead()方法时注意内存泄露问题,ByteBuf#readBytes(int)方法会产生一个新的ByteBuf,需要手动释放。

或者

基于ByteToMessageDecoder实现decode()方法时将解析后的对象放入out对象中(上面源码分析中有提示)

或者

使用派生的ByteBuf,如调用ByteBuf#readSlice(int)方法,返回的ByteBuf与原有ByteBuf共享内存,不会产生新的Reference count,可以避免内存泄露。

Netty Project官网也有说明:

Reference counted objects
ByteBuf.duplicate(), ByteBuf.slice() and ByteBuf.order(ByteOrder) create a derived buffer which shares the memory region of the parent buffer. A derived buffer does not have its own reference count, but shares the reference count of the parent buffer.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145862.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

在Qt中,怎么获取到在mainwindow.ui文件中添加的控件

2023年9月30日&#xff0c;周六晚上 假设我在mainwindow.ui中添加了一个名为textEdit的QTextEdit对象 在mainwindow.cpp中&#xff0c;可以通过ui对象来获取到这个控件

让大脑自由

前言 作者写这本书的目的是什么&#xff1f; 教会我们如何让大脑更好地为自己工作。 1 大脑的运行机制是怎样的&#xff1f; 大脑的基本运行机制是神经元之间通过突触传递信息&#xff0c;神经元的兴奋和抑制状态决定了神经网络的运行和信息处理&#xff0c;神经网络可以通过…

【图像分割】图像检测(分割、特征提取)、各种特征(面积等)的测量和过滤(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

系统集成|第二十一章(笔记)

目录 第二十一章 知识产权与法律法规21.1 知识产权21.2 法律法规 上篇&#xff1a;第二十章、收尾管理 第二十一章 知识产权与法律法规 21.1 知识产权 概述&#xff1a;狭义的知识产权就是传统意义上的知识产权&#xff0c;包括著作权&#xff08;含邻接权&#xff09;&#x…

iOS自动化测试方案(二):Xcode开发者工具构建WDA应用到iphone

文章目录 一、环境准备1.1、软件环境1.2、硬件环境1.3、查看版本 二、安装WDA过程2.7、构建失败&#xff0c;这类错误有很多&#xff0c;比如在选择开发者账号后&#xff0c;就会提示:Failed to register bundle identifier表示应用唯一注册失败2.9、第二个错误&#xff0c;完全…

嵌入式Linux应用开发-第十四章查询方式的按键驱动程序

嵌入式Linux应用开发-第十四章查询方式的按键驱动程序 第十四章 查询方式的按键驱动程序_编写框架14.1 LED驱动回顾14.2 按键驱动编写思路14.3 编程&#xff1a;先写框架14.3.1 把按键的操作抽象出一个button_operations结构体14.3.2 驱动程序的上层&#xff1a;file_operation…

Oracle的递归公共表表达式

查询节点id为2的所有子节点的数据&#xff0c;包括向下级联 WITH T1 (id, parent_id, data) AS (SELECT id, parent_id, dataFROM nodesWHERE id 2UNION ALLSELECT t.id, t.parent_id, t.dataFROM nodes tJOIN T1 n ON t.parent_id n.id ) SELECT * FROM T1; --建表语句 C…

layui 树状控件tree优化

先上效果图&#xff1a; 我选的组件是这个&#xff1a; 动态渲染完后&#xff0c;分别在窗体加载完成&#xff0c;节点点击事件分别加入js&#xff1a; //侧边栏图标替换//layui-icon-subtraction$(function () {$(".layui-icon-file").addClass("backs&quo…

【HTML】表格行和列的合并

概述 当我们需要在 HTML 表格中展示复杂的数据时&#xff0c;行和列的合并可以帮助我们实现更灵活的布局和结构。通过合并行和列&#xff0c;我们可以创建具有更多层次和结构的表格&#xff0c;使数据更易于理解和分析。 在 HTML 表格中&#xff0c;我们可以使用 rowspan 和 …

普通用户在Linux下免密执行sudo命令,真的可以吗?

主旨 在linux的日常运维中&#xff0c;我们会发现&#xff0c;使用root用户的权限太大了&#xff0c;很多时候一不小心就删错了&#xff0c;而且恢复不回来&#xff0c;我们应该怎么避免呢&#xff1f; 我们可以使用普通用户进行服务器的登录&#xff0c;如果有权限不够的情况&…

C/C++跨平台构建工具CMake-----在C++源码中读取CMakeLists.txt配置文件中的内容

文章目录 1.需求描述2.需求准备2.1 创建项目2.2 编辑CMakeLists.txt文件2.3 编写C文件2.4 编译构建项目 3.需求实现3.1 在CMakeLists.txt中输出日志信息3.2 增加配置生成C头文件3.3在C 源码中访问配置的值3.4 C文件中读取CMakeLists.txt中的字符串 总结 1.需求描述 当我们开发…

HTML之如何下载网页中的音频(二)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

Kafka(一)使用Docker Compose安装单机Kafka以及Kafka UI

文章目录 Kafka中涉及到的术语Kafka镜像选择Kafka UI镜像选择Docker Compose文件Kafka配置项说明KRaft vs Zookeeper和KRaft有关的配置关于Controller和Broker的概念解释Listener的各种配置 Kafka UI配置项说明 测试Kafka集群Docker Compose示例配置 Kafka中涉及到的术语 对于…

Linux—进程间通信之System V共享内存

目录 简介System V共享内存特点及用法 共享内存的创建共享内存的关联与去关联共享内存的删除共享内存通信代码实现总结 简介 System V共享内存是一种在Unix-like系统中广泛使用的共享内存机制。它是基于System V IPC&#xff08;Inter-Process Communication&#xff0c;进程间…

SpringBoot整合阿里云OSS文件存储解决方案

&#x1f9d1;‍&#x1f4bb;作者名称&#xff1a;DaenCode &#x1f3a4;作者简介&#xff1a;啥技术都喜欢捣鼓捣鼓&#xff0c;喜欢分享技术、经验、生活。 &#x1f60e;人生感悟&#xff1a;尝尽人生百味&#xff0c;方知世间冷暖。 &#x1f4d6;所属专栏&#xff1a;Sp…

毛玻璃态登录表单

效果展示 页面结构组成 通过上述的效果展示可以看出如下几个效果 底部背景有三个色块并且效果是毛玻璃效果登录表单是毛玻璃效果登录表单的周围的小方块也是有毛玻璃效果并且与登录表单有层次效果 CSS3 知识点 filter 属性backdrop-filter 属性绝对定位属性动画属性 底部背…

(高阶) Redis 7 第16讲 预热/雪崩/击穿/穿透 缓存篇

面试题 什么是缓存预热/雪崩/击穿/穿透如何做缓存预热如何避免或减少缓存雪崩穿透和击穿的区别?穿透和击穿的解决方案出现缓存不一致时,有哪些修补方案缓存预热 理论 将需要的数据提前加载到缓存中,不需要用户使用的过程中进行数据回写。(比如秒杀活动数据等) 方案 1.…

idea Springboot 校园助学贷款系统VS开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 springboot 校园助学贷款系统是一套完善的信息系统&#xff0c;结合springboot框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用springboot框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统 具有完整的源代码和数据库&…