目录
一、Hello World
1. 目标
2. 服务器端
3. 客户端
4. 流程梳理
💡 提示
5. 运行结果截图
二、Netty执行流程
1. 流程分析
2. 代码案例
2.1. 引入依赖
2.2. 服务端
服务端
服务端处理器
2.3. 客户端
客户端
客户端处理器
2.4. 代码截图
一、Hello World
1. 目标
开发一个简单的服务器端和客户端
- 客户端向服务器端发送 hello, world
- 服务器仅接收,不返回
加入依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version>
</dependency>
2. 服务器端
new ServerBootstrap().group(new NioEventLoopGroup()) // 1 .channel(NioServerSocketChannel.class) // 2.childHandler(new ChannelInitializer<NioSocketChannel>() { // 3protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new StringDecoder()); // 5ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.out.println(msg);}});}}).bind(8080); // 4
代码解读
- 1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector 后面会详细展开
- 2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现
还有
- 3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给
ServerSocketChannel。
ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行
initChannel 以便添加更多的处理器
- 4 处,ServerSocketChannel 绑定的监听端口
- 5 处,SocketChannel 的处理器,解码 ByteBuf => String
- 6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果
3. 客户端
new Bootstrap().group(new NioEventLoopGroup()) // 1.channel(NioSocketChannel.class) // 2.handler(new ChannelInitializer<Channel>() { // 3@Overrideprotected void initChannel(Channel ch) {ch.pipeline().addLast(new StringEncoder()); // 8}}).connect("127.0.0.1", 8080) // 4.sync() // 5.channel() // 6.writeAndFlush(new Date() + ": hello world!"); // 7
代码解读
- 1 处,创建 NioEventLoopGroup,同 Server
- 2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有
- 3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端
SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
- 4 处,指定要连接的服务器和端口
- 5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕
- 6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作
- 7 处,写入消息并清空缓冲区
- 8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
- 数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程
4. 流程梳理
💡 提示
一开始需要树立正确的观念
- 把 channel 理解为数据的通道
- 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后
输出又变成 ByteBuf
- 把 handler 理解为数据的处理工序
-
- 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成...)传播给每个
handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
-
- handler 分 Inbound 和 Outbound 两类
- 把 EeventLoop 理解为处理数据的工人
-
- 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
- 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel
的待处理任务,任务分为普通任务、定时任务
-
- 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工
人
5. 运行结果截图
二、Netty执行流程
1. 流程分析
- Netty 抽象出两组线程池BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写
- BossGroup和WorkerGroup类型都是NioEventLoopGroup
- NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是
NioEventLoop
- NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个 selector ,
用于监听绑定在其上的 socket 的网络通讯 用于监听绑定在其上的 socket 的网络通讯
- NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop
- 每个BossNioEventLoop 循环执行的步骤有3步轮询 accept事件,处理 accept事件, 与client建立连
接 , 生成 NioSocketChannel ,并将其注册到某 个worker 的NIOEventLoop上的selector处理任务队
列的任务,即runAllTasks
- 每个Worker NIOEventLoop循环执行的步骤
轮询 read, write 事件,处理 i/o 事件,即read , write事件,在对应 NioSocketChannel处理,处理
任务队列的任务 ,即runAllTasks
- 每个Worker 的NIOEventLoop处理业务时,会使用 pipeline(管道), pipeline中包含了channel ,
即通过 pipeline可以获取到对应通道, 管道中维护了很多的处理器
2. 代码案例
2.1. 引入依赖
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.19.1</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.42.Final</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.6.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies>
2.2. 服务端
服务端
public class NettyServer {public static void main(String[] args) {//创建bossgroupEventLoopGroup bossGroup = new NioEventLoopGroup();//创建workerGroupEventLoopGroup workerGroup = new NioEventLoopGroup();try {//创建服务器启动对象ServerBootstrap bootstrap = new ServerBootstrap();//设置启动的数学bootstrap.group(bossGroup, workerGroup)//设置老板组合工作组.channel(NioServerSocketChannel.class)//设置通道类型.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列的连接数.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态//设置工作组的通道初始化,设置处理器的管道线.childHandler(new ChannelInitializer<SocketChannel>() {//初始化通道的方法protected void initChannel(SocketChannel channel) throws Exception {System.out.println("客户端 socketChannel 初始化" + channel);//处理器管道线中添加处理器channel.pipeline().addLast(new NettyServerHandler());}});System.out.println("服务器启动...");//绑定服务端口,返回异步通道返回对象ChannelFuture channelFuture = bootstrap.bind(6668).sync();//添加监听channelFuture.addListener(new ChannelFutureListener() {//监听操作完成public void operationComplete(ChannelFuture channelFuture) throws Exception {if (channelFuture.isSuccess()) {System.out.println("监听端口6668成功");} else {System.out.println("监听端口6668失败");}}});//设置异步通道关闭事件channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
服务端处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter{/**读事件,客户端发送数据会触发该方法1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址2. Object msg: 就是客户端发送的数据 默认Object*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();System.out.println("服务器读取线程 "+Thread.currentThread().getName()+" channel="+channel);System.out.println("server ctx="+ctx);ByteBuf buf = (ByteBuf) msg;System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));System.out.println("客户端地址:"+channel.remoteAddress());}/**读完毕触发*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//写并且清缓冲区,一般讲,我们对这个发送的数据进行编码ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));}/*处理异常, 一般是需要关闭通道*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}
2.3. 客户端
客户端
public class NettyClient {public static void main(String[] args) {//创建事件循环组EventLoopGroup group = new NioEventLoopGroup();try {//重建客户端启动类Bootstrap bootstrap = new Bootstrap();bootstrap.group(group)//设置循环组.channel(NioSocketChannel.class)//设置通道//设置通道初始化.handler(new ChannelInitializer<SocketChannel>() {//初始化通道protected void initChannel(SocketChannel channel) throws Exception {//设置管道线上的处理器channel.pipeline().addLast(new NettyClientHandler());}});System.out.println("客户端 ok");//请求连接,返回异步结果对象ChannelFuture future = bootstrap.connect("127.0.0.1", 6668).sync();//设置异步通道关闭事件future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally {group.shutdownGracefully();}}
}
客户端处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {/*通道就绪就会触发该方法*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client:" +ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello server", CharsetUtil.UTF_8));}/*通道内有读取事件时候触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));System.out.println("服务器地址:"+ctx.channel().remoteAddress());}/*异常*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}