kafka用java收发消息

用java客户端代码来对kafka收发消息
具体代码如下

package com.cool.interesting.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;public class KafkaTest {private static final String BOOTSTRAP_SERVERS = "192.168.47.145:9092";private static final String TOPIC_NAME = "test";public static void main(String[] args) {// 生产者示例produceMessage();// 消费者示例consumeMessage();//从指定偏移量消费消息consumeOffsetMessage();}//生产者代码private static void produceMessage() {Properties props = new Properties();//acks是保证消息的发送机制,有以下几个值//acks = 0:表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。//acks = 1: 表示leader副本成功写入就会响应Producer,而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。默认值即是1。//acks = all或-1: 表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。//调优建议:建议根据实际情况设置,如果要严格保证消息不丢失,请设置为all或-1;如果允许存在丢失,建议设置为1;一般不建议设为0,除非无所谓消息丢不丢失。props.put(ProducerConfig.ACKS_CONFIG,1);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//key和value序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//其他配置参数详见org.apache.kafka.clients.producer.ProducerConfig类try (Producer<String, String> producer = new KafkaProducer<>(props)) {for (int i = 0; i < 10; i++) {String message = "Message " + i;//异步发送Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC_NAME,  message));System.out.println("Sent message: " + message);}}}//正常消费者代码private static void consumeMessage() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test99");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅一个topicconsumer.subscribe(Collections.singletonList(TOPIC_NAME));while (true) {//设置kafak从broker拉取消息的超时时间// (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : records) {System.out.println("Received_message: " + record.value());}}}//指定偏移量开始消费private static void consumeOffsetMessage() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅一个topicconsumer.subscribe(Collections.singletonList(TOPIC_NAME));//如果要指定偏移量,必须先poll一次,不然代码报错ConsumerRecords<String, String> poll = consumer.poll(0);System.out.println("poll:"+poll.isEmpty());//创建一个分区(参数为topic_name,和分区序号)TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);// 指定要消费的偏移量long offset = 3;//从指定偏移量开始消息消息consumer.seek(topicPartition, offset);while (true) {//设置kafka从broker拉取消息的超时时间// (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : records) {System.out.println("Received_message: " + record.value());}}}}

安装kafka的可视化工具:offset explorer
offset explorer 是一个用于查看和管理 Kafka 消费者组的工具,它允许你检查消费者组的偏移量(offset),并且可以查看每个消费者组在每个分区上的偏移量情况。这对于监控和调试 Kafka 消费者组非常有用。
下载地址为:https://www.kafkatool.com/download.html
如下图所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1424449.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

基于springboot+vue+Mysql的大学生社团活动平台

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

电脑缺失api-ms-win-crt-runtime-l1-1-0.dll文件的几种修复方法

当您在使用电脑过程中遇到程序启动失败&#xff0c;提示缺少“api-ms-win-crt-runtime-l1-1-0.dll”文件时&#xff0c;不必过于焦虑&#xff0c;此问题通常与Windows系统的Visual C Redistributable组件未正确安装或损坏有关。小编将介绍5种修复电脑缺失api-ms-win-crt-runtim…

STM32-09-IWDG

文章目录 STM32 IWDG1. IWDG2. IWDG框图3. IWDG寄存器4. IWDG寄存器操作步骤5. IWDG溢出时间计算6. IWDG配置步骤7. 代码实现 STM32 IWDG 1. IWDG IWDG Independent watchdog&#xff0c;即独立看门狗&#xff0c;本质上是一个定时器&#xff0c;这个定时器有一个输出端&#…

webpack生成模块关系依赖图示例:查看构建产物的组成部分 依赖关系图

npm i -D webpack-bundle-analyzer core-js babel-loaderwebpack.config.js const BundleAnalyzerPlugin require(webpack-bundle-analyzer).BundleAnalyzerPlugin; module.exports {entry: ./src/index.js,output: {filename: main.js,},// mode: production, // 或者 produ…

学前端网络安全这块还不懂?细说CSRF

什么是CSRF&#xff1f; 举个栗子&#xff0c;比如我们需要在某个博客上删除一个文章&#xff0c;攻击者首先在自己的域构造一个页面&#xff0c;使用了一个img标签&#xff0c;其地址指向了删除博客的链接。攻击者诱使目标用户&#xff0c;也就是博客主访问这个页面&#xff…

如何利用命令提示符列出文件?这里提供了几个实例供你参考

序言 什么命令可以用来列出目录中的文件&#xff1f;如何在命令提示符Windows 10/11中列出文件&#xff1f;很多人对这些问题感到困惑。在这篇文章中&#xff0c;我们详细解释了命令提示符列出文件的主题。 CMD&#xff08;命令提示符&#xff09;是一个功能强大的Windows内置…

Android实践:查看Activity信息

问题&#xff1a;本地Android SDK的monitor无法正常运行&#xff0c;看不了进程相关信息&#xff0c;确认当前显示Activity十分不便 解决办法&#xff1a;使用adb shell指令可以快速查看 命令&#xff1a; adb shell dumpsys activity activities 这个命令用于获取Android设…

8个迹象表明你需要一台新笔记本电脑,看一下你的笔记本是否有其中一个

序言 当你第一次打开你的笔记本电脑的盒子时,它会以最高性能运行,电池寿命更长,过热最小,资源使用效率高。然而,随着笔记本电脑的老化,它将不能满足预期用途。以下几个迹象表明,可能是时候寻找并投资一款新设备了。 你的设备不再具有预期用途 如果你的笔记本电脑不再…

大模型日报2024-05-15

大模型日报 2024-05-15 大模型资讯 OpenAI推出全新AI模型GPT-4o&#xff0c;具备文本、图像和音频处理能力 摘要: OpenAI公司继ChatGPT后&#xff0c;最新推出了名为GPT-4o的AI模型。这一模型不仅能够理解和生成文本&#xff0c;还新增了图像和音频的解释及生成功能。GPT-4o作为…

战网国际服怎么下载 暴雪战网一键下载安装图文教程

战网国际版&#xff0c;或称为Battle.net全球版&#xff0c;是暴雪娱乐构建的一项跨越国界的综合游戏交流平台&#xff0c;它无视地理限制&#xff0c;旨在服务全球每一个角落的游戏爱好者。不同于地区专属版本&#xff0c;国际版为玩家开启了一扇无门槛的大门&#xff0c;让每…

一文搞懂CPU是如何进行计算的?

你好&#xff0c;我是 shengjk1&#xff0c;多年大厂经验&#xff0c;努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注&#xff01;你会有如下收益&#xff1a; 了解大厂经验拥有和大厂相匹配的技术等 希望看什么&#xff0c;评论或者私信告诉我&#xff01; 文章目录 一…

参考文献自检指南

参考文献作为论文的最后组成部分&#xff0c;可能不是加分项&#xff0c;但是做不好的话绝对会被吐槽&#xff0c;而且是个要命的减分项。因此要做好检查&#xff0c;以下是一些可以遵循的规范。&#xff08;如有疏漏&#xff0c;欢迎指出&#xff09; .bib文件 1.字段的选…

Java——运行环境搭建

操作步骤&#xff1a; JDK的下载环境变量PATH的配置JDK的重点目录 bin&#xff1a; 该目录主要存放命令lib&#xff1a;该目录主要存放JDK的类库lib/src.zip:JDK源码 下载JDK 官网地址&#xff1a;https://www.oracle.com/ 安装 双击.exe文件 安装完成&#xff01; 配置环…

docker安装minio附带图片

1.拉镜像 docker pull minio/minio 2.创建挂载点目录 mkdir -p /usr/local/minio/config mkdir -p /usr/local/minio/data 3.创建minio容器 docker run \ -p 19000:9000 \ -p 9090:9090 \ --nethost \ --name minio \ -d --restartalways \ -e "MINIO_ACCESS_KEYmini…

VUE之旅—day2

文章目录 Vue生命周期和生命周期的四个阶段created应用—新闻列表渲染mounted应用—进入页面搜索框就获得焦点账单统计&#xff08;Echarts可视化图表渲染&#xff09; Vue生命周期和生命周期的四个阶段 思考&#xff1a; 什么时候可以发送初始化渲染请求&#xff1f;&#xff…

学习笔记-C++

目录 1、何为常量 2、关键字 3、实型 4、水平制表符 5、string字符串 6、C中的三目运算符 7、随机数种子 8、结构体 9、各的区 10、引用 11、函数默认参数 12、函数占位参数 13、函数重载 14、私有属性 15、让另一个类作为本类的成员 16、声明和实现的文件…

Java开发大厂面试第03讲:线程的状态有哪些?它是如何工作的?

线程&#xff08;Thread&#xff09;是并发编程的基础&#xff0c;也是程序执行的最小单元&#xff0c;它依托进程而存在。一个进程中可以包含多个线程&#xff0c;多线程可以共享一块内存空间和一组系统资源&#xff0c;因此线程之间的切换更加节省资源、更加轻量化&#xff0…

win11快速安装mysql数据库系统

win11快速安装mysql数据库系统 1、下载 1.1 打开官网 1.2 向下滚动页面 1.3 进入下载选项 1.4 下载8.0.4 LTS 1.5 开始下载 1.6 下载中 2、解压 大家注意&#xff0c;此时解压后目录是没有data目录的。 3、数据库初始化 3.1 管理员身份打开CMD 开始菜单上&#xff0c;输入…

1. 杜克大学官方宣布2027届新生画像什么是vue关键特点核心概念简单示例生态系统

目录 1. 杜克大学官方宣布2027届新生画像 什么是vue 关键特点 核心概念 简单示例 生态系统 1. 杜克大学官方宣布2027届新生画像 杜克大学校报《The Chronicle》已连续第七年对杜克大学的一年级新生进行深入调查&#xff0c;探讨该群体家庭受教育背景、家庭收入水平以及…

SketchUp Pro 2024 mac草图大师 激活版 专业的3D建模软件

对于追求专业、高效的设计师们来说&#xff0c;SketchUp Pro 2024 for Mac无疑是最佳的选择。它提供了线条、形状、曲线、文本和图像等多种建模元素&#xff0c;让您能够根据需求自由创作。同时&#xff0c;软件还支持智能标注和尺寸功能&#xff0c;让建模过程更加精确、高效。…