netty之实现同步调用

写在前面

源码 。
本文看下netty如何实现同步调用,netty本身是不具备这种能力的,但是我们可以结合juc包的相关工具类来让其具备这种能力。接下来就一起看下吧!!!

1:server

  • 请求和响应对象
package com.dahuyou.netty.sync.msg;public class Request {private String requestId;private Object result;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Object getResult() {return result;}public void setResult(Object result) {this.result = result;}@Overridepublic String toString() {return "Request{" +"requestId='" + requestId + '\'' +", result=" + result +'}';}
}
package com.dahuyou.netty.sync.msg;public class Response {private String requestId;private String param;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public String getParam() {return param;}public void setParam(String param) {this.param = param;}}

server 启动类:

public class ServerSocket implements Runnable {private ChannelFuture f;@Overridepublic void run() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch){ch.pipeline().addLast(new RpcDecoder(Request.class),new RpcEncoder(Response.class),new MyServerHandler());}});ChannelFuture f = null;f = b.bind(7397).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}}

MyServerHandler:

public class MyServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj){Request msg = (Request) obj;//反馈Response request = new Response();request.setRequestId(msg.getRequestId());request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理{}。");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}ctx.writeAndFlush(request);//释放ReferenceCountUtil.release(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}}

在消息处理类中我们休眠3秒,来模拟消息的处理耗时,后续可以用来测试客户端的同步等待,以及等待超时等场景。

2:client

client启动类:

package com.dahuyou.netty.sync.client;import com.dahuyou.netty.sync.codec.RpcDecoder;
import com.dahuyou.netty.sync.codec.RpcEncoder;
import com.dahuyou.netty.sync.msg.Request;
import com.dahuyou.netty.sync.msg.Response;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class ClientSocket implements Runnable {private ChannelFuture future;@Overridepublic void run() {EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new RpcDecoder(Response.class),new RpcEncoder(Request.class),new MyClientHandler());}});ChannelFuture f = b.connect("127.0.0.1", 7397).sync();this.future = f;f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}public ChannelFuture getFuture() {return future;}public void setFuture(ChannelFuture future) {this.future = future;}
}

MyClientHandler:

package com.dahuyou.netty.sync.client;import com.dahuyou.netty.sync.future.SyncWriteFuture;
import com.dahuyou.netty.sync.future.SyncWriteMap;
import com.dahuyou.netty.sync.msg.Response;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class MyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {Response msg = (Response) obj;String requestId = msg.getRequestId();SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);if (future != null) {future.setResponse(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}

其中SyncWriteFuture是java.util.concurrent.Future的子类,方法future.setResponse(msg);如下:

public void setResponse(Response response) {this.response = response;latch.countDown();
}

设置响应结果后,就会使阻塞在方法get的数据读取线程结束阻塞,获取response:

public Response get() throws InterruptedException, ExecutionException {latch.wait();return response;
}public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (latch.await(timeout, unit)) {return response;}return null;
}

这样就实现了同步的阻塞等待response了,你可以根据具体情况使用永久等待,或者带有超时的版本。这里等待是通过栅栏类的不同方法来实现的。

3:测试

  • 服务端启动类:
package com.dahuyou.netty.sync.test;import com.dahuyou.netty.sync.server.ServerSocket;public class StartServer {public static void main(String[] args) {new Thread(new ServerSocket()).start();System.out.println("netty server start done. {}");}}
  • 客户端启动类
package com.dahuyou.netty.sync.test;import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelFuture;
import com.dahuyou.netty.sync.client.ClientSocket;
import com.dahuyou.netty.sync.future.SyncWrite;
import com.dahuyou.netty.sync.msg.Request;
import com.dahuyou.netty.sync.msg.Response;public class StartClient {private static ChannelFuture future;public static void main(String[] args) {System.out.println("hi ...");ClientSocket client = new ClientSocket();new Thread(client).start();while (true) {try {//获取future,线程有等待处理时间if (null == future) {future = client.getFuture();Thread.sleep(500);continue;}//构建发送参数Request request = new Request();request.setResult("查询{}用户信息");SyncWrite s = new SyncWrite();
//                Response response = s.writeAndSync(future.channel(), request, 1000);Response response = s.writeAndSync(future.channel(), request, 4000);System.out.println("收到响应:" + JSON.toJSON(response));
//                Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}}}

代码Response response = s.writeAndSync(future.channel(), request, 4000);最终会阻塞在方法Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);等待响应结果,代码:

package com.dahuyou.netty.sync.future;import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import com.dahuyou.netty.sync.msg.Request;
import com.dahuyou.netty.sync.msg.Response;import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class SyncWrite {public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {...Response response = doWriteAndSync(channel, request, timeout, future);...return response;}private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {...// 这个会阻塞等待timeout毫秒数Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);...return response;}
}

当前server设置3秒执行时长,而client设置最多等待4秒,所以是可以正常响应的,如下client输出:
在这里插入图片描述
我们来修改客户端超时等待为2秒:
在这里插入图片描述
此时就会发生超时异常了:
在这里插入图片描述
全文done!!!

写在后面

参考文章列表

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1523777.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

算法笔试-编程练习-M-01-24

t这套题&#xff0c;偏向灵活&#xff0c;更多的考察了数学、贪心 一、质因数 题目描述 小乖对 gcd (最大公约数) 很感兴趣, 他会询问你t次。 每次询问给出一个大于 1 的正整数 n, 你是否找到一个数字m(2 ≤m ≤ n)&#xff0c;使得 gcd(n,m)为素数. 注&#xff1a;原题为给…

智能优化算法-北方苍鹰优化算法(NGO)(附源码)

目录 1.内容介绍 2.部分代码 3.实验结果 4.内容获取 1.内容介绍 北方苍鹰优化算法 (Northern Goshawk Optimizer, NGO) 是一种基于群体智能的元启发式优化算法&#xff0c;它模拟了北方苍鹰&#xff08;Northern Goshawk&#xff09;的捕食行为、领地行为以及社交互动&#x…

网络攻击全解析:主动、被动与钓鱼式攻击的深度剖析

在当今这个互联网高度普及与深度融合的时代&#xff0c;网络攻击&#xff0c;这一赛博空间的隐形威胁&#xff0c;正以前所未有的频率和复杂度挑战着网络安全乃至国家安全的底线。为了更好地理解并防范这些威胁&#xff0c;本文将深入剖析网络攻击的主要类型——主动攻击、被动…

2024-8-28作业C++/QT

代码&#xff1a; #include <iostream> #include <cstring> #include <array> #include <iomanip> using namespace std; int main() { //array<char,128> a; //array<char,128>::iterator iter; string str; getline(c…

小阿轩yx-Kubernertes日志收集

小阿轩yx-Kubernertes日志收集 前言 在 Kubernetes 集群中如何通过不同的技术栈收集容器的日志&#xff0c;包括程序直接输出到控制台日志、自定义文件日志等 有哪些日志需要收集 日志收集与分析很重要&#xff0c;为了更加方便的处理异常 简单总结一些比较重要的需要收集…

插件千兆网络变压器72PIN应用图片和设计H87202D

华强盛电子导读&#xff1a;前面199中间2643后面0038 千兆4口网络变压器是一种常用于网络通信领域的电子元件&#xff0c;它可以将高频率的信号进行隔离和滤波&#xff0c;保护网络设备免受电磁干扰&#xff0c;同时也能确保信号的稳定传输。这种网络变压器通常具有多个端口&am…

【云原生kubernetes系列之SkyWalking篇】

1、实战案例 1.1单体jar包监控 1.1.1Halo环境准备 注意&#xff1a;Halo需要jdk11以上的版本 apt install -y java-11-openjdk mkdir /apps/halo -p && cd /apps/halo curl -L https://github.com/halo-dev/halo/releases/download/v1.5.4/halo-1.5.4.jar --outpu…

AI创业者必看!免费分享大模型和算法备案的难点解析

大模型和算法的备案&#xff0c;作为人工智能产品进入市场的第一道门槛&#xff0c;对于每一个创业者来说&#xff0c;都是一个必须认真对待的重要环节。备案不仅要求技术的合规性&#xff0c;还强调了数据安全和隐私保护的重要性。创业者在追求技术创新的同时&#xff0c;也需…

9千含读音文件的中文汉语学习ACCESS\EXCEL数据库

现在英语在我们国内是越来越流行&#xff0c;甚至幼儿园都开始Hello了&#xff0c;但是我们也看到越来越多的老外在学我们的汉语、汉字了。而《含读音文件的中文汉语学习ACCESS数据库》就是一份供老外学习汉字汉语的工具。 数据库收集了上万条汉语常用的字词&#xff0c;并且用…

redhat7.9安装zsh以及常用插件

1 安装zsh并更改默认终端 #1.安装软件包 yum -y install zsh git#2.更改默认终端 chsh -s /bin/zsh然后再退出下终端&#xff0c;重新登录用echo $SHELL 查看环境是否是/bin/zsh 2 配置oh-my-zsh #1.从git仓库中拉取oh-my-zsh git clone https://gitee.com/mirrors/oh-my-z…

xss-labs靶场全关通关

1、level-1 1、输入&#xff0c;发现会将我们输入的内容显示&#xff1a; 2、若未做任何过滤就进行输出&#xff0c;那我们就可以嵌入js代码&#xff0c;执行js脚本&#xff1a; 输入&#xff1a;<script>alert(111)</script> <script></script>&…

LaTeX中的\sloppy命令详解及应用实例

诸神缄默不语-个人CSDN博文目录 在使用 LaTeX 排版文档时&#xff0c;有时候我们会遇到一些段落中的文字或 URL 超出页边距的情况&#xff0c;导致文档版式不够美观。在这种情况下&#xff0c;LaTeX 提供了一些命令来处理这些排版问题&#xff0c;其中一个非常实用的命令就是 …

leetcode172. 阶乘后的零,遍历每个因数中5的个数

leetcode172. 阶乘后的零 给定一个整数 n &#xff0c;返回 n! 结果中尾随零的数量。 提示 n! n * (n - 1) * (n - 2) * … * 3 * 2 * 1 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;0 解释&#xff1a;3! 6 &#xff0c;不含尾随 0 示例 2&#xff1a; 输…

代码随想录算法day29 | 动态规划算法part02 | 62.不同路径,63. 不同路径 II

62.不同路径 力扣题目链接(opens new window) 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#xff09;。 问…

828华为云征文|使用sysbench对Mysql应用加速测评

文章目录 ❀前言❀测试环境准备❀测试工具选择❀测试工具安装❀mysql配置❀未开启Mysql加速测试❀开启Mysql加速测试❀总结 ❀前言 大家好&#xff0c;我是早九晚十二。 昨天有梳理一篇关于华为云最新推出的云服务器产品Flexus云服务器X。当时有说过&#xff0c;这次的华为云F…

Kubernetes 简介及部署方法

目录 1 Kubernetes 简介及原理 1.1 应用部署方式演变 1.2 容器编排应用 1.3 kubernetes 简介 1.4 K8S的设计架构 1.5 K8S 各组件之间的调用关系 1.6 K8S 的 常用名词感念 1.7 k8S的分层架构 2 K8S 集群环境搭建 2.1 k8s 中容器的管理方式 2.2 k8s中使用的几种管理容器的介绍 3 …

JavaScript 中,File、Blob、Base64 和 ArrayBuffer 不同的用途和转换方法。

JavaScript 中&#xff0c;File、Blob、Base64 和 ArrayBuffer 不同的用途和转换方法。 图示&#xff1a; 文章目录 Blob:File:Base64:Buffer:ArrayBuffer:常用转换函数base64ToFiledataURLtoBlobbinaryToDataURL导出 csv导出文件常用 office 文件对应的 MIME TYPE 和后缀 Bl…

万维网服务器工作

万维网服务器&#xff08;WWW服务器&#xff09;的工作方式主要基于客户机/服务器&#xff08;Client/Server&#xff09;模式&#xff0c;通过HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;等协议与客户端&#xff08;如网页浏览器&…

U盘不小心格式化了怎么恢复?别慌!教你快速恢复

在日常工作和生活中&#xff0c;U盘已成为我们存储和传输数据的重要工具。然而&#xff0c;有时由于误操作或其他原因&#xff0c;我们可能会不小心格式化U盘&#xff0c;导致重要数据的丢失。这时&#xff0c;如何恢复这些数据就显得尤为重要。下面&#xff0c;我们将介绍几种…

黑神话悟空·幽魂怎么打?超爽攻略!

在正式打法之前&#xff0c;这里推荐一款巨好用的开放式耳机&#xff0c;能够让我们的游戏更加的畅快&#xff01; 有条件的也建议使用南卡OE MIX这款开放式耳机&#xff0c;也非常适合在游戏时使用&#xff0c;不入耳的设计避免了长时间游戏佩戴可能带来的耳道不适和耳朵胀痛…