Kafka入门:Java客户端库的使用

在现代的分布式系统中,消息队列扮演着至关重要的角色,而Apache Kafka以其高吞吐量、可扩展性和容错性而广受欢迎。本文将带你了解如何使用Kafka的Java客户端库来实现生产者(Producer)和消费者(Consumer)的基本操作。

Kafka简介

Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流式应用程序。它具有高吞吐量、可扩展性和容错性,适用于处理实时数据。

环境准备

在开始之前,请确保你已经安装了以下环境:

  1. Java开发环境(JDK)
  2. Kafka服务器(可以是本地安装或远程服务器)
  3. Kafka客户端库(通常通过Maven或Gradle引入)

Maven依赖

如果你使用Maven来管理项目依赖,可以在pom.xml文件中添加以下依赖:

<dependencies><!-- Kafka客户端依赖 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version></dependency>
</dependencies>

Kafka生产者(Producer)

生产者负责将消息发送到Kafka的特定主题(Topic)。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {// 配置属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者Producer<String, String> producer = new KafkaProducer<>(props);// 创建消息ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");// 发送消息producer.send(record, (metadata, exception) -> {if (exception != null) {exception.printStackTrace();} else {System.out.println("Message sent successfully");}});// 关闭生产者producer.close();}
}

Kafka消费者(Consumer)

消费者负责从Kafka的特定主题(Topic)接收消息。

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {// 配置属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");// 创建消费者Consumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

运行截图

生产者:

消费者:

总结

通过上述示例,我们了解了如何使用Kafka的Java客户端库来创建生产者和消费者。这些基本操作是构建基于Kafka的分布式应用的基石。Kafka的强大功能远不止于此,包括但不限于消息持久化、分区、复制等高级特性,这些都需要在实际项目中根据具体需求进行深入学习和应用。

希望这篇文章能帮助你快速入门Kafka的Java客户端库使用。如果你有任何问题或需要进一步的指导,请随时留言讨论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/13013.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

使用 npm 安装 Yarn

PS E:\WeChat Files\wxid_fipwhzebc1yh22\FileStorage\File\2024-11\spid-admin\spid-admin> yarn install yarn : 无法将“yarn”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写&#xff0c;如果包括路径&#xff0c;请确保路径正确&#xff0c;然后…

力扣617:合并二叉树

给你两棵二叉树&#xff1a; root1 和 root2 。 想象一下&#xff0c;当你将其中一棵覆盖到另一棵之上时&#xff0c;两棵树上的一些节点将会重叠&#xff08;而另一些不会&#xff09;。你需要将这两棵树合并成一棵新二叉树。合并的规则是&#xff1a;如果两个节点重叠&#…

谷歌浏览器支持的开发者工具详解

谷歌浏览器&#xff08;Google Chrome&#xff09;是全球最受欢迎的网页浏览器之一&#xff0c;它不仅提供了快速、安全的浏览体验&#xff0c;还为开发者提供了强大的开发者工具。本文将详细介绍如何使用谷歌浏览器的开发者工具&#xff0c;并解答一些常见问题。&#xff08;本…

HTB:OpenAdmin[WriteUP]

目录 连接至HTB服务器并启动靶机 使用nmap对靶机TCP端口进行开放扫描 继续使用nmap对靶机22、80端口进行脚本、服务扫描 使用Dirbuster对靶机网页路径进行递归扫描 ​编辑 尝试在searchsploit中搜索改WebAPP漏洞 横向移动(其实没有横) 启动Metasploit 特权提升 USER_…

IO作业5

设置双信号实现交替生产者线程和消费者线程 #include <myhead.h> int n0; pthread_mutex_t fastmutex;//定义互斥锁 pthread_cond_t cond;//定义条件变量 pthread_cond_t cond2; void *product() {int i;for(i0;i<10;i){n;printf("我生产了一台特斯拉n%d\n"…

Web3.0安全开发实践|BNB链安全开发,这10大实用tips一定要收藏

BNB Chain是Web3世界中最受欢迎的区块链之一&#xff0c;其费用合理、交易迅速以及项目生态系统丰富几大原因吸引了广大用户。与任何的区块链都一样&#xff0c;BNB Chain上的开发者在开发过程中首先考虑的应该是安全问题&#xff1a;因为任何资金的损失都会导致用户对协议及平…

QT开发笔记之小知识

QCoreApplication::aboutToQuit 主事件循环退出前发出的信号&#xff0c;是程序退出前等待QT线程退出回收资源的神器。 官方帮助文档 [signal] void QCoreApplication::aboutToQuit() 该信号在应用程序即将退出主事件循环时发出&#xff0c;例如&#xff1a;当事件循环级别降至…

插入排序(C语言)

直接插入排序的基本思想&#xff1a;把待排序的记录按其关键码值的大小逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完为止&#xff0c;得到一个新的有序序列 。 一、步骤 1.给定一个乱序的数组&#xff0c;如 从第一个元素开始排序&#xff0c;当只…

文心一言 VS 讯飞星火 VS chatgpt (389)-- 算法导论25.1 2题

二、为什么要求对于所有的 1 ⩽ i ⩽ n 1⩽i⩽n 1⩽i⩽n&#xff0c;有 w i i 0 w_{ii}0 wii​0 &#xff1f;如果要写代码&#xff0c;请用go语言。 文心一言&#xff1a; 在许多数学和计算应用中&#xff0c;要求矩阵 W W W 的对角线元素 w i i 0 w_{ii} 0 wii​0 是…

Java多线程详解⑦(全程干货!!!)内存可见性 || volatile || JMM || wait notify notifyAll

这里是Themberfue 在上一节中&#xff0c;我们讨论了死锁的概念&#xff0c;产生的场景 &#xff0c;产生的必要条件...... 内存可见性 我们先来看一段百度百科关于 "内存可见性" 的解释 "内存可见性" 就是造成线程安全问题的原因之一 如果是单纯地看…

安装双系统(linux操作系统(debian)安装)

参考博客&#xff1a;戴尔服务器安装Debian11过程_戴尔t130安装debian-CSDN博客 一.腾出一个50G以上的空间&#xff0c;准备装操作系统 1.底部搜索计算机管理&#xff0c;选择磁盘管理 本人已预留400GB磁盘空间安装ubuntu系统&#xff0c;若没有预留空间&#xff0c;则可以选…

心系天下三星W25:记录盛世影像 见证华彩时光

悠然岁月中&#xff0c;被定格的瞬间总是历久弥新。心系天下三星W25以传世经典之姿跃然于掌中&#xff0c;将精致外形与精湛工艺合二为一&#xff0c;彰显出持有者的高雅气质。同时&#xff0c;强悍的影像系统则使之成为光影艺术的记录者&#xff0c;无论是捕捉人生风华&#x…

修改msyql用户密码及更新mysql密码策略

查看mysql中初始的密码策略 SHOW VARIABLES LIKE validate_password% 2. 修改默认密码策略 -- 0 或者 LOW 只验证长度-- 1 或者 MEDIUM 验证长度、数字、大小写、特殊字符-- 2 或者 STRONG 验证长度、数字、大小写、特殊字符、字典文件set global validate_password.policy0;…

f.lux电脑屏幕护眼神器,告别熬夜伤眼

前言 之前分享了一款护眼宝&#xff0c;也是可以调节屏幕色温&#xff0c;但是都是需要手动调节 今天分享一款通过智能调节屏幕亮度&#xff0c;减少长时间使用电脑对眼睛的伤害。它可以根据环境光线和用户的使用习惯&#xff0c;自动调整屏幕亮度&#xff0c;确保用户在不同时…

小家电器产品三维动画渲染怎么做快一些?

在快节奏的市场竞争中&#xff0c;快速制作小家电器产品的三维动画渲染显得尤为重要。本文将为您揭示如何高效完成这一过程&#xff0c;让您的产品以最直观的方式吸引消费者的目光。 一、电器产品动画渲染需要软件 原文出自 电器产品三维动画渲染怎么做-电器产品3D动画渲染需要…

Cesium基础-(Entity)-(model )

里边包含Vue、React框架代码详细步骤、以及代码详细解释 公众号:GISer世界 三维模型地址 :https://download.csdn.net/download/weixin_44857463/89986869 效果: 以下是Cesium中Model的属性和方法: 属性 属性名称类型默认值描述urlstring | Resource.gltf或.glb文件的URLb…

RAG综述:《A Comprehensive Survey of Retrieval-Augmented Generation (RAG)》

来源于《A Comprehensive Survey of Retrieval-Augmented Generation (RAG): Evolution, Current Landscape and Future Directions》 一、RAG所解决的问题 如何有效地从外部知识源检索相关信息&#xff0c;如何将这些信息无缝地融入到生成文本中&#xff0c;以及如何在保证生…

带你读懂什么是AI Agent智能体

一、智能体的定义与特性 定义&#xff1a;智能体是一个使用大语言模型&#xff08;LLM&#xff09;来决定应用程序控制流的系统。然而&#xff0c;智能体的定义并不唯一&#xff0c;不同人有不同的看法。Langchain的创始人Harrison Chase从技术角度给出了定义&#xff0c;但更…

数据库类型介绍

1. 关系型数据库&#xff08;RDBMS&#xff09; 关系型数据库是最常见的一类数据库&#xff0c;它们通过表&#xff08;Table&#xff09;来存储数据&#xff0c;表之间通过关系&#xff08;如主键和外键&#xff09;来关联。 • MySQL&#xff1a;开源的关系型数据库管理系统&…

【红帽Linux】简述Linux文件系统结构

原创 厦门微思网络 Linux 文件系统结构划分清晰、功能明确&#xff0c;每个目录都有特定的用途。以下是各个主要目录的介绍&#xff1a; n/bin: 包含系统启动和单用户模式下的基本命令的二进制文件&#xff0c;例如常见的基本命令 ls 和 cp。 n/boot: 保存与系统启动相关的文…