写在前面
源码 。
本文看下netty如何实现同步调用,netty本身是不具备这种能力的,但是我们可以结合juc包的相关工具类来让其具备这种能力。接下来就一起看下吧!!!
1:server
- 请求和响应对象
package com.dahuyou.netty.sync.msg;public class Request {private String requestId;private Object result;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Object getResult() {return result;}public void setResult(Object result) {this.result = result;}@Overridepublic String toString() {return "Request{" +"requestId='" + requestId + '\'' +", result=" + result +'}';}
}
package com.dahuyou.netty.sync.msg;public class Response {private String requestId;private String param;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public String getParam() {return param;}public void setParam(String param) {this.param = param;}}
server 启动类:
public class ServerSocket implements Runnable {private ChannelFuture f;@Overridepublic void run() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch){ch.pipeline().addLast(new RpcDecoder(Request.class),new RpcEncoder(Response.class),new MyServerHandler());}});ChannelFuture f = null;f = b.bind(7397).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}}
MyServerHandler:
public class MyServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj){Request msg = (Request) obj;//反馈Response request = new Response();request.setRequestId(msg.getRequestId());request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理{}。");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}ctx.writeAndFlush(request);//释放ReferenceCountUtil.release(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}}
在消息处理类中我们休眠3秒,来模拟消息的处理耗时,后续可以用来测试客户端的同步等待,以及等待超时等场景。
2:client
client启动类:
package com.dahuyou.netty.sync.client;import com.dahuyou.netty.sync.codec.RpcDecoder;
import com.dahuyou.netty.sync.codec.RpcEncoder;
import com.dahuyou.netty.sync.msg.Request;
import com.dahuyou.netty.sync.msg.Response;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class ClientSocket implements Runnable {private ChannelFuture future;@Overridepublic void run() {EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new RpcDecoder(Response.class),new RpcEncoder(Request.class),new MyClientHandler());}});ChannelFuture f = b.connect("127.0.0.1", 7397).sync();this.future = f;f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}public ChannelFuture getFuture() {return future;}public void setFuture(ChannelFuture future) {this.future = future;}
}
MyClientHandler:
package com.dahuyou.netty.sync.client;import com.dahuyou.netty.sync.future.SyncWriteFuture;
import com.dahuyou.netty.sync.future.SyncWriteMap;
import com.dahuyou.netty.sync.msg.Response;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class MyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {Response msg = (Response) obj;String requestId = msg.getRequestId();SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);if (future != null) {future.setResponse(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}
其中SyncWriteFuture是java.util.concurrent.Future
的子类,方法future.setResponse(msg);
如下:
public void setResponse(Response response) {this.response = response;latch.countDown();
}
设置响应结果后,就会使阻塞在方法get
的数据读取线程结束阻塞,获取response:
public Response get() throws InterruptedException, ExecutionException {latch.wait();return response;
}public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (latch.await(timeout, unit)) {return response;}return null;
}
这样就实现了同步的阻塞等待response了,你可以根据具体情况使用永久等待,或者带有超时的版本。这里等待是通过栅栏类的不同方法来实现的。
3:测试
- 服务端启动类:
package com.dahuyou.netty.sync.test;import com.dahuyou.netty.sync.server.ServerSocket;public class StartServer {public static void main(String[] args) {new Thread(new ServerSocket()).start();System.out.println("netty server start done. {}");}}
- 客户端启动类
package com.dahuyou.netty.sync.test;import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelFuture;
import com.dahuyou.netty.sync.client.ClientSocket;
import com.dahuyou.netty.sync.future.SyncWrite;
import com.dahuyou.netty.sync.msg.Request;
import com.dahuyou.netty.sync.msg.Response;public class StartClient {private static ChannelFuture future;public static void main(String[] args) {System.out.println("hi ...");ClientSocket client = new ClientSocket();new Thread(client).start();while (true) {try {//获取future,线程有等待处理时间if (null == future) {future = client.getFuture();Thread.sleep(500);continue;}//构建发送参数Request request = new Request();request.setResult("查询{}用户信息");SyncWrite s = new SyncWrite();
// Response response = s.writeAndSync(future.channel(), request, 1000);Response response = s.writeAndSync(future.channel(), request, 4000);System.out.println("收到响应:" + JSON.toJSON(response));
// Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}}}
代码Response response = s.writeAndSync(future.channel(), request, 4000);
最终会阻塞在方法Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
等待响应结果,代码:
package com.dahuyou.netty.sync.future;import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import com.dahuyou.netty.sync.msg.Request;
import com.dahuyou.netty.sync.msg.Response;import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class SyncWrite {public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {...Response response = doWriteAndSync(channel, request, timeout, future);...return response;}private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {...// 这个会阻塞等待timeout毫秒数Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);...return response;}
}
当前server设置3秒执行时长,而client设置最多等待4秒,所以是可以正常响应的,如下client输出:
我们来修改客户端超时等待为2秒:
此时就会发生超时异常了:
全文done!!!