gRPC-拦截器

简介

在构建 gRPC 应用程序时,无论是客户端应用程序,还是服务器端应用程序,在远程方法执行之前或之后,都可能需要执行一些通用逻辑。

gRPC 提供了简单的 API,用来在客户端和服务器端的 gRPC 应用程序中实现并安装拦截器。它是 gRPC 核心扩展机制之一,在一些使用场景中(如日志、身份验证、授权、性能度量指标、跟踪以及其他一些自定义需求),拦截器拦截每个 RPC 调用的执行,可以使用拦截器进行日志记录、身份验证/授权、指标收集以及许多其他可以跨 RPC 共享的功能。

在 gRPC 应用程序中,拦截器根据拦截的 RPC 调用类型可以分为以下的两大类:
第一个是一元拦截器(unary interceptor),它拦截一元 RPC 的调用;
第二个是流拦截器(streaming interceptor),它处理流式 RPC 的调用;
客户端和服务端都可使用一元拦截器和流拦截器。

一元拦截器

客户端
public static void main(String[] args) {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50050).usePlaintext().intercept(new ClientLoggingInterceptor()).build();ProductInfoGrpc.ProductInfoBlockingStub stub = ProductInfoGrpc.newBlockingStub(channel);ProductId productId = ProductId.newBuilder().setValue("1").build();Product product = stub.getProduct(productId);System.out.println("product.getName() = " + product.getName());channel.shutdown();
}
public static class ClientLoggingInterceptor implements ClientInterceptor{@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel next) {System.out.println("执行ClientLoggingInterceptor拦截器...");ClientCall<ReqT, RespT> clientCall = next.newCall(methodDescriptor, callOptions);// 调用下一个拦截器return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {@Overridepublic void start(Listener<RespT> responseListener, Metadata headers) {// 在调用开始前执行System.out.println("客户端调用:" + methodDescriptor.getFullMethodName());super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {@Overridepublic void onMessage(RespT message) {// 收到响应后执行System.out.println("服务端返回:" + message);super.onMessage(message);}}, headers);}};}
}
服务端
public void start() throws IOException {int port = 50050;server = ServerBuilder.forPort(port).addService(new ProductInfoImpl()).intercept(new ServerExecuteTimeInterceptor()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(() -> {ProductInfoServer.this.stop();}));System.out.println("server start on port 50050");
}
public static class ServerExecuteTimeInterceptor implements ServerInterceptor{@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata headers, ServerCallHandler<ReqT, RespT> next) {String methodName = serverCall.getMethodDescriptor().getFullMethodName();System.out.println("receive request :" + methodName);ServerCall.Listener<ReqT> listener = next.startCall(serverCall, headers);return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(listener) {long start=0, end=0;String method  = methodName;@Overridepublic void onHalfClose() {System.out.println("client half close");super.onHalfClose();}@Overridepublic void onCancel() {System.out.println("client cancel");super.onCancel();}@Overridepublic void onComplete() {System.out.println("call complete");super.onComplete();end = System.currentTimeMillis();System.out.println("请求:"+method+"耗时:" + (end-start));}@Overridepublic void onMessage(ReqT message) {System.out.println("收到客户端消息:" + message);super.onMessage(message);start = System.currentTimeMillis();}};}
}

流拦截器

客户端
public static void main(String[] args) throws Exception{CountDownLatch countDownLatch = new CountDownLatch(1);ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50050).usePlaintext().intercept(new ClientStreamingInterceptor()).build();ProductInfoGrpc.ProductInfoStub stub = ProductInfoGrpc.newStub(channel);StreamObserver<Product> requestObserver = stub.saveProductBatch(new StreamObserver<ProductResult>() {@Overridepublic void onNext(ProductResult productResult) {System.out.println("服务端返回:" + productResult.getSuccess());}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();}});for (int i = 0; i < 10; i++) {Product p = Product.newBuilder().setId(""+i).setName("p"+i).build();System.out.println("客户端发送:" + p);requestObserver.onNext(p);}requestObserver.onCompleted();System.out.println("客户端发送完成");countDownLatch.await();
}
public static class ClientStreamingInterceptor implements ClientInterceptor{@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {System.out.println("执行ClientStreamingInterceptor拦截器...");//把自己开发的ClientStreamTracerFactory融入到gRPC体系callOptions = callOptions.withStreamTracerFactory(new ClientStreamTracer.Factory() {@Overridepublic ClientStreamTracer newClientStreamTracer(ClientStreamTracer.StreamInfo info, Metadata headers) {return new ClientStreamTracer() {@Override//用于输出响应头public void outboundHeaders() {System.out.println("client: 用于输出请求头.....");super.outboundHeaders();}@Override//设置消息编号public void outboundMessage(int seqNo) {System.out.println("client: 设置流消息的编号: " + seqNo);super.outboundMessage(seqNo);}@Overridepublic void outboundUncompressedSize(long bytes) {System.out.println("client: 获得未压缩消息的大小:" + bytes);super.outboundUncompressedSize(bytes);}@Override//用于获得 输出消息的大小public void outboundWireSize(long bytes) {System.out.println("client: 用于获得 输出消息的大小:" + bytes);super.outboundWireSize(bytes);}@Override//拦截消息发送public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {System.out.println("client: 监控请求操作 outboundMessageSent:" + seqNo);super.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize);}//inbound  对于相应相关操作的拦截@Overridepublic void inboundHeaders() {System.out.println("用于获得响应头....");super.inboundHeaders();}@Overridepublic void inboundMessage(int seqNo) {System.out.println("获得响应消息的编号..." + seqNo);super.inboundMessage(seqNo);}@Overridepublic void inboundWireSize(long bytes) {System.out.println("获得响应消息的大小... " + bytes);super.inboundWireSize(bytes);}@Overridepublic void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) {System.out.println("集中获得消息的编号 ,大小 ,未压缩大小..." + seqNo +" " + optionalWireSize +" "+ optionalUncompressedSize);super.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize);}@Overridepublic void inboundUncompressedSize(long bytes) {System.out.println("获得响应消息未压缩大小..." + bytes);super.inboundUncompressedSize(bytes);}@Overridepublic void inboundTrailers(Metadata trailers) {System.out.println("响应结束..");super.inboundTrailers(trailers);}};}});return next.newCall(method, callOptions);}
}
服务端
public void start() throws IOException {int port = 50050;server = ServerBuilder.forPort(port).addService(new ProductInfoImpl()).addStreamTracerFactory(new ServerStreamingInterceptor()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(() -> {ProductInfoServer.this.stop();}));System.out.println("server start on port 50050");
}
public static class ServerStreamingInterceptor extends ServerStreamTracer.Factory{@Overridepublic ServerStreamTracer newServerStreamTracer(String s, Metadata metadata) {return new ServerStreamTracer(){@Overridepublic void inboundMessage(int seqNo) {super.inboundMessage(seqNo);}@Overridepublic void inboundWireSize(long bytes) {super.inboundWireSize(bytes);}@Overridepublic void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) {System.out.println("server: 获得client发送的请求消息 ..." + seqNo+","+optionalWireSize+","+optionalUncompressedSize);super.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize);}@Overridepublic void inboundUncompressedSize(long bytes) {super.inboundUncompressedSize(bytes);}//outbound 拦截请求@Overridepublic void outboundMessage(int seqNo) {super.outboundMessage(seqNo);}@Overridepublic void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {System.out.println("server: 响应数据的拦截 ..." + seqNo+","+optionalWireSize+","+optionalUncompressedSize);super.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize);}@Overridepublic void outboundWireSize(long bytes) {super.outboundWireSize(bytes);}@Overridepublic void outboundUncompressedSize(long bytes) {super.outboundUncompressedSize(bytes);}};}
}

完整的源码下载:https://github.com/xjs1919/learning-demo/tree/master/grpc-demo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/2733.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

C# 日志框架 NLog、log4net 和 Serilog对比

文章目录 前言NLog、log4net 和 Serilog 三个框架的详细对比:一、NLog优点:缺点:二、 log4net优点缺点三、Serilog优点缺点四、Serilog使用举例总结前言 NLog、log4net 和 Serilog 三个框架的详细对比: NLog、log4net 和 Serilog 是三个非常流行的 .NET 日志框架,它们各自…

本地缓存库分析(四):fastcache

文章目录 本系列前言设计索引和数组怎么判断是否被覆盖其他问题 源码走读数据结构setget 总结 本系列 本地缓存库分析&#xff08;一&#xff09;&#xff1a;golang-lru本地缓存库分析&#xff08;二&#xff09;&#xff1a;bigcache本地缓存库分析&#xff08;三&#xff0…

安科瑞5G基站直流叠光监控系统-安科瑞黄安南

基站现状和趋势 5G基站是专门提供5G网络服务的公用移动通信基站。5G基站主要用于提供5G空口协议功能&#xff0c;支持与用户设备、核心网之间的通信。按照逻辑功能划分&#xff0c;5G基站可分为5G基带单元与5G射频单元&#xff0c;二者之间可通过CPRI或eCPRI接口连接。 2019年…

Pr 视频效果:过渡

效果面板/视频效果/过渡 Video Effects/Transition Adobe Premiere Pro 的视频效果中&#xff0c;过渡 Transition效果组用于创建单个剪辑内过渡效果的一组视频效果。这些效果可以增强视频的视觉连贯性&#xff0c;添加创意性的视觉转换&#xff0c;为观众提供流畅的观看体验。…

DataX 的安装配置和使用 (详细版)

1&#xff0c;上传解压 1&#xff0c;开始上传安装包到你虚拟机上放置安装包的文件夹 2&#xff0c;开始解压 ,配置环境变量 1、上传 /opt/modules 2、解压 tar -zxvf datax.tar.gz -C /opt/installs 3、修改 vi /etc/profile 配置环境变量&#xff1a; export DAT…

zookeeper安装

安装之前&#xff1a;先关闭三台服务器的防火墙&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; systemctl stop firewalld systemctl disable firewalld 1)上传 /opt/modules下面 2&#xff09;解压 /opt/installs下面 tar -zxvf zookeeper-3.4.10.tar.gz …

Nature文章《deep learning》文章翻译

这篇文章是对Nature上《deep learning》文章的翻译。原作者 Yann LeCun, Yoshua Bengio& Geoffrey Hinton。 这篇文章的中心思想是深入探讨深度学习在机器学习中的革命性贡献&#xff0c;重点介绍其在特征学习、监督学习、无监督学习等方面的突破&#xff0c;并阐述其在图…

动态规划—整数拆分

class Solution {public int integerBreak(int n) {int[] dp new int[n1];dp[2] 1;for(int i 3; i< n; i){for(int j 1; j< i/2; j){//j拆i&#xff0c;只需要遍历到 i/2 就可以&#xff0c;后面没有必要遍历dp[i] Math.max(dp[i], Math.max(j*(i-j) , j*dp[i-j]));…

OceanBase V4.3.3,首个面向实时分析场景的GA版本发布

在10月23日举办的 OceanBase年度发布会 上&#xff0c;我们怀着激动之情&#xff0c;正式向大家宣布了 OceanBase 4.3.3 GA 版的正式发布&#xff0c;这也是OceanBase 为实时分析&#xff08;AP&#xff09;场景打造的首个GA版本。 2024 年初&#xff0c;我们推出了 4.3.0 版本…

儿童安全座椅行业全面深入分析

儿童安全座椅就是一种专为不同体重&#xff08;或年龄段&#xff09;的儿童设计&#xff0c;将孩子束缚在安全座椅内&#xff0c;能有效提高儿童乘车安全的座椅。欧洲强制性执行标准ECE R44/03的定义是&#xff1a;能够固定到机动车辆上&#xff0c;带有ISOFIX接口、LATCH接口的…

算法笔记:Day-09(初始动态规划)

509. 斐波那契数 斐波那契数 &#xff08;通常用 F(n) 表示&#xff09;形成的序列称为 斐波那契数列 。该数列由 0 和 1 开始&#xff0c;后面的每一项数字都是前面两项数字的和。也就是&#xff1a; F(0) 0&#xff0c;F(1) 1 F(n) F(n - 1) F(n - 2)&#xff0c;其中 …

HTTP和HTTPS 的作用和应用场景 (python 爬虫简单入门)

HTTP和HTTPS HTTP HTTP协议&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;&#xff1a;是一种发布和接收 HTML页面的方法。 HTTP的端口号为80 HTTPS HTTPS&#xff08;Hypertext Transfer Protocol over Secure Socket Layer&#xff09;…

Java多线程编程(三)一>详解synchronized, 死锁,wait和notify

目录&#xff1a; 一.synchronized 的使用&#xff1a; 二. 常见死锁情况&#xff1a; 三 .如何避免死锁&#xff1a; 四.wait和notify 一.synchronized 的使用&#xff1a; 我们知道synchronized锁具有互斥的特点&#xff1a; synchronized 会起到互斥效果, 某个线程…

linux入门——“初识make”

make是linux中的自动化构建工具&#xff0c;一般来说系统会自带make&#xff0c;如果没有&#xff0c;那么可以使用命令“sudo apt install -y make”来安装。 1.初识make make使用的前提是维护makefile/Makefile文件&#xff0c;需要在自己的目录下自己创建。 我在此目录下创…

【K8S系列】Kubernetes 中 Pod 无法通过 Service 名称访问服务的 DNS 解析失败问题【已解决】

在 Kubernetes 中&#xff0c;Service 提供了一种稳定的方式&#xff0c;通过名称访问一组 Pod。当其他 Pod 无法通过 Service 名称访问服务&#xff0c;并且出现 DNS 解析失败时&#xff0c;通常会导致应用无法正常工作。本文将详细分析此问题的常见原因及其解决方案。 一、问…

关于分布式事务,你知道多少?如何落地?

很多人估计会说&#xff0c;我在项目中完全没有涉及到过分布式事务&#xff0c;而面试官老喜欢问&#xff0c;真TM烦&#xff01; 本文就来聊聊分布式事务&#xff0c;有哪些方案和实现。文章有点长&#xff0c;可以先收藏&#xff0c;有时间了慢慢看。 什么是事务&#xff1f;…

SIwave:释放 Resonant Mode Solver 的强大功能

SIwave 是一种电源完整性和信号完整性工具。本文的重点是 Resonant 模式求解器。 进行谐振计算的主要原因是确定 Powerplane 中 Cap 去耦的最佳位置。Powerplane 的大小由最大预期电流和允许的最大电压降决定。然而&#xff0c;即使是最好的设计也没有足够的电容来将宽带频谱的…

【VS+QT】联合开发踩坑记录

0. 写在前面 因为目前在做自动化产线集成软件开发相关的工作&#xff0c;需要用到QT&#xff0c;所以选择了VS联合开发&#xff0c;方便调试。学习QT的过程中也踩了很多坑&#xff0c;在此记录一下&#xff0c;提供给各位参考。 1. 环境配置 Win11Visual Studio 2019Qt 5.12…

【LeetCode】每日一题 2024_11_1 超级饮料的最大强化能量(DP)

前言 每天和你一起刷 LeetCode 每日一题~ LeetCode 启动&#xff01; 题目&#xff1a;超级饮料的最大强化能量 代码与解题思路 先读题&#xff1a; 题目给了两个数组&#xff0c;长度为 n&#xff0c;题目要求在 n 个小时内选择饮料&#xff0c;一个小时可以选一瓶&#x…

IBM服务器修改IMM的IP方法

服务器设备&#xff1a;IBM x3550 M4 Server IMM默认IP地址&#xff1a;192.168.70.125 用户名&#xff1a;USERID 密码&#xff1a;PASSW0RD&#xff08;注意是零0&#xff09; 1.服务器开机按F1进入BIOS界面 2.进入System Settings 3.进入Integrated Management Module 4.…