【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

目录

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

1、一个问题:

 2、查看消费者消费主题__consumer_offsets

3、一个重要前提:消费时要提交offset

二、指定 Offset 消费


假如遇到kafka崩了,你重启kafka之后,想要继续消费,应该怎么办?

  1. 首先确定要消费的主题是哪几个
  2. 其次使用命令或者其他的组件查看 __consumer_offset 主题下的偏移量信息,找到我们关心的主题在崩溃之前消费到了哪里。
  3. 最后使用 java 代码,里面有一个非常重要的方法 seek,指定需要消费的主题,分区以及偏移量,就可以继续消费了。

下面是解决这个问题的具体步骤!!! 

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

        因为__consumer_offset 主题下记录了主题的偏移量信息,所以提交offset之后,消费__consumer_offset 主题便可查看所有主题的偏移量信息

1、一个问题:

__consumer_offset 主题下的数据是不能查看的,怎么解决?

解决方案:

在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false。然后分发一下

分发:(这里我使用了脚本)

xsync.sh /opt/installs/kafka3/config/consumer.properties

注意:修改之前要先关闭kafka和zookeeper,修改完毕后再开启!

说明:默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。如果不修改是无法查看offset的值的,因为这些都是加密数据。

 2、查看消费者消费主题__consumer_offsets

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --consumer.config /opt/installs/kafka3/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 

查询结果如下:

图中:1是消费者组名;2是Topic主题名;3是分区;4是偏移量,说明该主题崩溃前消费到了这里

此时便查询到了偏移量信息!!!

3、一个重要前提:消费时要提交offset

能查询到偏移量的前提是消费时要自动提交 offset(默认开启)或者手动提交 offset

一般不用管,因为自动提交会默认开启!!!

二、指定 Offset 消费

 kafka提供了seek方法,可以让我们从分区的固定位置开始消费。

seek (TopicPartition Partition,offset offset):指定分区和偏移量

package com.bigdata._03offsetTest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;public class _03CustomConsumerSeek {public static void main(String[] args) {Properties properties = new Properties();//连接kafka setProperty和put都行properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test5");// 是否自动提交 offset  通过这个字段设置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics = new ArrayList<>();topics.add("bigdata");// list可以设置多个主题的名称kafkaConsumer.subscribe(topics);// 执行计划// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}// 获取分区0的offset =100 以后的数据kafkaConsumer.seek(new TopicPartition("bigdata",0),100);// 因为消费者是不停的消费,所以是while truewhile(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}
}

执行这个java代码就可以从精确的指定位置继续消费了!!!

结果如下:

从上图可以看出,确实是从指定的主题、分区、偏移量开始消费的! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/13048.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

第七届中国国际进口博览会 2024长三角G60科创走廊高质量发展要素对接大会举行

11月9日&#xff0c;第七届中国国际进口博览会2024长三角G60科创走廊高质量发展要素对接大会在国家会展中心举行。会上&#xff0c;G60科创走廊九城市共赴进博之约&#xff0c;以全面深化改革为动力&#xff0c;深入推进长三角G60科创走廊走深走实&#xff0c;携手推动科技创新…

一键P图新神器!SeedEdit让图片编辑更快捷!

大家还记得MJ推出的图片编辑功能吗&#xff1f;只需涂抹想要修改的区域&#xff0c;再输入提示词&#xff0c;就能一键更改原图。而现在&#xff0c;豆包大模型团队也推出了类似的功能——SeedEdit&#xff0c;而且更加简单快捷&#xff01; Ai 智能办公利器 - Ai-321.com 人工…

【数据分析与数据挖掘】决策树算法

决策树的构造的本质是利用训练数据构造一棵决策树&#xff0c;然后利用这棵树所提炼出来的规则进行预测。算法过程大体分为两步&#xff1a;1.利用训练数据构造决策树 2.利用构造的决策树进行预测 分类相关知识 信息熵 信息熵由香农提出&#xff0c;用来衡量事件的不确定性的…

丹摩征文活动|智谱AI引领是实现文本可视化 - CogVideoX-2b 部署与使用

文章目录 前言一、DAMODEL平台特性二、创建CPU云实例三、CogVedioX介绍四、DAMODEL一键部署CogVideoX1. 创建丹摩实例(参考上述介绍)2. 配置环境和依赖3. 模拟与配置文件4. 开始运行4.1 调试4.2 webUI4.3 端口映射 前言 DAMODEL&#xff08;丹摩智算&#xff09;是一款专为满足…

22. 记录架构

文章目录 第22章 记录架构22.1 架构文档的用途和受众22.2 符号表示22.3 视图模块视图组件和连接器视图C&C 视图的符号表示 分配视图质量视图 22.4 组合视图22.5 记录行为22.6 视图之外22.7 记录基本原理22.8 架构的利益相关者22.9 实际考虑建模工具在线文档、超文本和维基遵…

Python练习19

Python日常练习 题目&#xff1a; 打印如下九九乘法表 1*11 2*12 2*24 3*13 3*26 3*39 4*14 4*28 4*312 4*416 5*15 5*210 5*315 5*420 5*525 6*16 6*212 6*318 6*424 6*530 6*636 7*17 7*214 7*321 7*428 7*535 7*642 7*749 8*18 8*216 8*324 8*432 8*540 8*648 8*756 8*86…

卸载 Python

文章目录 WindowsmacOSLinux总结 卸载 Python 的方法取决于你使用的操作系统。以下是针对不同操作系统的卸载步骤&#xff1a; Windows 打开控制面板&#xff1a; 按 Win R 打开运行对话框&#xff0c;输入 control&#xff0c;然后按 Enter。或者&#xff0c;从开始菜单搜索并…

使用Wireshark获取USB HID(Human Interface Device)报告描述符

使用Wireshark选择需要获取的USB进行抓取数据&#xff0c;找到设备&#xff08;host&#xff09;接收信息的数据 第二栏出现hid报告&#xff0c;右击选择复制流 将复制的内容粘贴到USB标准请求及描述符在线分析工具 - USB中文网 进行解析 以图中获取手写板的数据为例&#xff…

【深度学习】LSTM、BiLSTM详解

文章目录 1. LSTM简介&#xff1a;2. LSTM结构图&#xff1a;3. 单层LSTM详解4. 双层LSTM详解5. BiLSTM6. Pytorch实现LSTM示例7. nn.LSTM参数详解 1. LSTM简介&#xff1a; LSTM是一种循环神经网络&#xff0c;它可以处理和预测时间序列中间隔和延迟相对较长的重要事件。LSTM通…

PyQt5 加载UI界面与资源文件

步骤一: 使用 Qt Designer 创建 XXX.ui文件 步骤二: 使用 Qt Designer 创建 资源文件 步骤三: Python文件中创建相关类, 使用 uic.loadUi(mainwidget.ui, self ) 加载UI文件 import sys from PyQt5 import QtCore, QtWidgets, uic from PyQt5.QtCore import Qt f…

ENSP作业——小型园区网

题目 根据上图&#xff0c;可得需求为&#xff1a; 1.配置交换机上的VLAN及IP地址。 2.设置SW1为VLAN 2/3的主根桥&#xff0c;设置SW2为VLAN 20/30的主根桥&#xff0c;且两台交换机互为主备。 3.可以使用super vlan。&#xff08;本次实验中未使用&#xff09; 4.上层通过静…

计算机网络:运输层 —— 运输层端口号

文章目录 运输层端口号的分类端口号与应用程序的关联应用举例发送方的复用和接收方的分用 运输层端口号的分类 端口号只具有本地意义&#xff0c;即端口号只是为了标识本计算机网络协议栈应用层中的各应用进程。在因特网中不同计算机中的相同端口号是没有关系的&#xff0c;即…

【C++练习】使用C++编写程序计算π的近似值

题目&#xff1a;使用C编写程序计算π的近似值 描述&#xff1a; 编写一个C程序&#xff0c;使用一个特定的数学公式来计算圆周率&#xff08;π&#xff09;的近似值。该程序定义了一个函数calculatePi()&#xff0c;该函数通过一个迭代算法和一个涉及反正切函数&#xff08;…

Hook小程序

下载&#xff1a; https://github.com/JaveleyQAQ/WeChatOpenDevTools-Python 配置&#xff1a; pip install -r requirements 实现&#xff1a; 开启小程序开发者模式&#xff0c;类似浏览器F12 效果&#xff1a; 使用&#xff1a; 退出微信&#xff0c;进入安装的目录…

如何在pycharm中 判断是否成功安装pytorch环境

1、在电脑开始端&#xff0c;找到 2、打开后 在base环境下 输入conda env list 目前我的环境中没有pytorch 学习视频&#xff1a;【Anaconda、Pytorch、Pycharm到底是什么关系?什么是环境?什么是包?】https://www.bilibili.com/video/BV1CN411s7Ue?vd_sourcefad0750b8c6…

AI陪伴走热,网易云信“融合通讯+AI”新方案发布!附场景App及源码

信息秒回、不会失联、724h 情感陪伴、随时提供情绪价值......在 AI 能力越来越强大的今天&#xff0c;我们开始有了“AI 助手”、“AI 搭子”&#xff0c;甚至开始谈起“AI 男友/女友”&#xff0c;AI 的角色早已超越了简单的生产力工具&#xff0c;它正深入到我们生活的方方面…

力扣 LeetCode 704. 二分查找(Day1:数组)

解题思路&#xff1a; 二分查找主要分为[ left , right ]左闭右闭和[ left , right )左闭右开两种 此处采取[ left , right ]左闭右闭写法 注意&#xff1a; 1. right的初始化取值 2. while中取等 3. right mid -1 ; class Solution {public int search(int[] nums, i…

java-AOP编程示例

SpringBoot工程&#xff0c;有不懂的留言or Kimi一下 LogAspect.java package com.xxx.javaaopdemo.Aspect;import com.xxx.javaaopdemo.LogAnnotation; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang…

Kafka入门:Java客户端库的使用

在现代的分布式系统中&#xff0c;消息队列扮演着至关重要的角色&#xff0c;而Apache Kafka以其高吞吐量、可扩展性和容错性而广受欢迎。本文将带你了解如何使用Kafka的Java客户端库来实现生产者&#xff08;Producer&#xff09;和消费者&#xff08;Consumer&#xff09;的基…

使用 npm 安装 Yarn

PS E:\WeChat Files\wxid_fipwhzebc1yh22\FileStorage\File\2024-11\spid-admin\spid-admin> yarn install yarn : 无法将“yarn”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写&#xff0c;如果包括路径&#xff0c;请确保路径正确&#xff0c;然后…