SpringKafka生产者、消费者消息拦截

1 前言

在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。

2 基于Kafka管理的拦截器

Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor
org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下:

2.1 定义拦截器

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在发送消息之前操作System.out.println("Sending message: " + record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {// 可以在这里获取配置}
}

2.2 定义消费者拦截器

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// 处理接收到的消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());});return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {// 资源清理}
}

2.3 添加拦截器

方式一,通过工厂自定义器设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}@Overridepublic void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}

方式二,通过配置设置拦截器

spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor

2.4 拦截器使用Spring容器中的Bean

上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义

@Component
public class MyComponent {public void checkMessage(String message) {System.out.println("Sending message: " + message);}
}

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {private MyComponent myComponent;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {myComponent.checkMessage(record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {myComponent = configs.get("myComponent");}
}

设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {@Autowiredprivate MyComponent myComponent;@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of("myComponent", myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}

3 基于Spring-Kafka管理的拦截器

基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。

3.1 单条消息拦截接口定义

由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {System.out.println(record.topic());return record;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/2142.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【初阶数据结构篇】链式结构二叉树(二叉链)的实现(感受递归暴力美学)

文章目录 须知 &#x1f4ac; 欢迎讨论&#xff1a;如果你在学习过程中有任何问题或想法&#xff0c;欢迎在评论区留言&#xff0c;我们一起交流学习。你的支持是我继续创作的动力&#xff01; &#x1f44d; 点赞、收藏与分享&#xff1a;觉得这篇文章对你有帮助吗&#xff1…

el-talble selection行 初始默认勾选

导言 el-talble selection 行&#xff08;选择列&#xff09;用于显示复选框&#xff0c;让用户可以选择或取消选择某些表格行&#xff0c;常用于批量操作场景。 刚刚试了下&#xff0c;想加深印象记录一下当学习碎片。参考的是表格多选并根据每行值初始化选中状态&#xff08;…

RabbitMQ交换机类型

RabbitMQ交换机类型 1、RabbitMQ工作模型2、RabbitMQ交换机类型2.1、Fanout Exchange&#xff08;扇形&#xff09;2.1.1、介绍2.1.2、示例2.1.2.1、生产者2.1.2.2、消费者2.1.2.3、测试 2.2、Direct Exchange&#xff08;直连&#xff09;2.2.1、介绍2.2.2、示例2.2.2.1、生产…

数据结构---排序(上)

一.直接插入排序 思想&#xff1a;将一个个未排序的数字插入到已经排好顺序的数组中。 例如&#xff1a; 思路&#xff1a;先将前两个数字排序&#xff0c;然后将后面数字与前面数字比较排序。 操作&#xff1a; 1.引入变量 i 遍历数组[1&#xff0c;array.lenth] 2.用临时…

ai翻唱部分步骤

模型部署 我是用的RVC进行的训练&#xff0c;也可以使用so-vits-svc。 通过百度网盘分享的文件&#xff1a;RVC-beta 链接&#xff1a;https://pan.baidu.com/s/1c99jR2fLChoqUFqf9gLUzg 提取码&#xff1a;4090 以Nvida显卡为例&#xff0c;分别下载“RVC1006Nvidia”和…

C++的stack和Queue

1.简单实现stack 构建一个模板&#xff0c;俩个参数&#xff0c;这里第一个一般是数据的类型&#xff0c;第二个是由什么来实现栈&#xff0c;在主函数里传了int和vector<int>&#xff0c;第二个不传参也可以&#xff0c;因为是缺省参数&#xff0c;默认为vector&#x…

默认路由:实现内网所有网段流量走一条默认路由访问外网

默认路由 Tip&#xff1a;默认路由一般指出口网关设备的出口路由。实现所有网段流量都走一条路由。 实验模拟&#xff1a;公司内部pc 通过出口网关 访问运营商内部 baidu服务 isp网关配置&#xff1a; <Huawei>sy Enter system view, return user view with CtrlZ. …

蘑菇书(EasyRL)学习笔记(2)

1、序列决策 1.1、智能体和环境 如下图所示&#xff0c;序列决策过程是智能体与环境之间的交互&#xff0c;智能体通过动作影响环境&#xff0c;环境则返回观测和奖励。智能体的目标是从这些反馈中学习出能最大化长期奖励的策略&#xff0c;这一过程通过不断试错和调整实现强化…

【C语言刷力扣】28.找出字符串中第一个匹配项的下标

题目&#xff1a; 解题思路&#xff1a; 暴力算法 int strStr(char* haystack, char* needle) {int n strlen(haystack), m strlen(needle);for (int i 0; i m < n; i) {bool res true;for (int j 0; j < m; j) {if (haystack[ji] ! needle[j]) {res false;break…

电脑没有下载声卡驱动怎么办?电脑声卡驱动安装方法

在日常使用电脑的过程中&#xff0c;我们可能会遇到电脑没有声音的问题&#xff0c;这往往与声卡驱动缺失或损坏有关。声卡驱动是连接电脑硬件&#xff08;声卡&#xff09;与操作系统之间的桥梁&#xff0c;确保音频信号能够正常输入输出。那么&#xff0c;当电脑没有声卡驱动…

favicon是什么文件?如何制作网站ico图标?

一般我们做网站的话&#xff0c;都会制作一个独特的ico图标&#xff0c;命名为favicon.ico。这个ico图标一般会出现在浏览器网页标题前面。如下图红色箭头所示&#xff1a; 部分博客导航大全也会用到所收录网站的ico图标。比如boke123导航新收录的网站就不再使用网站首页缩略图…

路由策略与路由控制

1. 路由控制概述 2. 路由控制工具 2.1 路由匹配工具 访问控制列表&#xff08;Access Control List, ACL&#xff09;是一个匹配工具。 由若干条 permit / deny 组成的ACL规则组成。 ACL 匹配原则&#xff1a; 一旦命中即停止匹配 ACL在做路由匹配时&#xff0c;更多是匹配…

天生倔强脸的白纸新人,徐畅演艺生涯初舞台获得肯定!

国内首档“微短剧综艺”创新真人秀《开播&#xff01;短剧季》已播出四期&#xff0c;节目集结20余位青年演员竞演角逐&#xff0c;进行经典IP的“二度创作”&#xff0c;最终实现短剧IP孵化。在最新一期正片中&#xff0c;新人演员徐畅凭借一段《离婚律师》的试镜表演&#xf…

Spring Boot解决 406 错误之返回对象缺少Getter/Setter方法引发的问题

目录 前言1. 问题背景2. 问题分析2.1 检查返回对象 3. 解决方案3.1 确保Controller返回Result类型3.2 测试接口响应 4. 原理探讨5. 常见问题排查与优化建议结语 前言 在Spring Boot开发中&#xff0c;接口请求返回数据是系统交互的重要环节&#xff0c;尤其在开发RESTful风格的…

第二十八天|贪心算法|122.买卖股票的最佳时机II,55. 跳跃游戏,45.跳跃游戏II,1005.K次取反后最大化的数组和

目录 122.买卖股票的最佳时机II 方法1&#xff1a;贪心算法&#xff08;简单&#xff09; 方法2&#xff1a;动态规划 55. 跳跃游戏 45.跳跃游戏II 方法1 方法2&#xff08;简洁版&#xff09; 1005.K次取反后最大化的数组和 按照绝对值大小从大到小排序一次 两次排序…

PureMVC在Unity中的使用(含下载链接)

前言 Pure MVC是在基于模型、视图和控制器MVC模式建立的一个轻量级的应用框架&#xff0c;这种开源框架是免费的&#xff0c;它最初是执行的ActionScript 3语言使用的Adobe Flex、Flash和AIR&#xff0c;已经移植到几乎所有主要的发展平台&#xff0c;支持两个版本框架&#xf…

Python CGI编程-cookie的设置、检索

设置检索 其他&#xff1a; 1. http.cookies和http.cookiejar区别&#xff1a; http.cookies主要用于创建和操作单个cookie对象&#xff0c;适用于需要精细控制单个cookie属性的场景。http.cookiejar则用于管理多个cookie&#xff0c;适用于需要自动处理多个请求和响应中的coo…

算法实现 - 快速排序(Quick Sort) - 理解版

文章目录 算法介绍算法分析核心思想三个版本运行过程挖坑法Hoare 原版前后指针法 算法稳定性和复杂度稳定性时间复杂度平均情况O(nlogn)最差情况O( n 2 n^2 n2) 空间复杂度 算法介绍 快速排序是一种高效的排序算法&#xff0c;由英国计算机科学家C. A. R. Hoare在1960年提出&a…

探索Python新境界:Buzhug库的神秘面纱

文章目录 探索Python新境界&#xff1a;Buzhug库的神秘面纱第一部分&#xff1a;背景介绍第二部分&#xff1a;Buzhug库是什么&#xff1f;第三部分&#xff1a;如何安装Buzhug库&#xff1f;第四部分&#xff1a;Buzhug库函数使用方法第五部分&#xff1a;Buzhug库使用场景第六…

Samtec 技术大咖说 | PCB VS 电缆背板?

【摘要/前言】 选择背板设计需要对特定的网络拓扑结构和应用进行权衡。在某些情况下&#xff0c;对PCB与电缆背板的评估不是 "非此即彼"&#xff0c;而是一种组合方式。 Samtec的工程师Andrew Josephson、Brandon Gore和Jonathan Sprigler进行了一次讨论&#xff0c…