SpringBoot Kafka发送消息与接收消息实例

前言 

Kafka的基本工作原理 

 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。 

 1.引入spring-kafka的jar包

在pom.xml里面导入spring-kafka包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>SpringBootKafka</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringBootKafka</name><description>SpringBootKafka</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- pom.xml --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency></dependencies><repositories><repository><id>central</id><name>aliyun maven</name><url>https://maven.aliyun.com/repository/public/</url><layout>default</layout><!-- 是否开启发布版构件下载 --><releases><enabled>true</enabled></releases><!-- 是否开启快照版构件下载 --><snapshots><enabled>false</enabled></snapshots></repository></repositories><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

2.编写配置文件

在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者 

spring:kafka:bootstrap-servers: 192.168.110.105:9092#streams:#application-id: my-streams-appconsumer:group-id: myGroupIdauto-offset-reset: latestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 5

3.编写生产者

使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果

package com.example.springbootkafka.service;import com.example.springbootkafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;@Slf4j
@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;private final ObjectMapper objectMapper;@Autowiredpublic KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {this.kafkaTemplate = kafkaTemplate;this.objectMapper = objectMapper;}public void sendMessage(String message) {log.info("KafkaProducer message:{}", message);//kafkaTemplate.send("test", message).addCallback();Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});// 使用whenComplete方法completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});/*future.whenComplete((result, ex) -> {if (ex == null) {// 成功发送RecordMetadata metadata = result.getRecordMetadata();System.out.println("Message sent successfully with offset: " + metadata.offset());} else {// 发送失败System.err.println("Failed to send message due to: " + ex.getMessage());}});*/}public void sendUser(User user) throws JsonProcessingException {//final ProducerRecord<String, String> record = createRecord(data);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);String userJson = objectMapper.writeValueAsString(user);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);/*future.addCallback(success -> System.out.println("Message sent successfully: " + userJson),failure -> System.err.println("Failed to send message: " + failure.getMessage()));*/CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});}
}

 4.编写消费者

通过org.springframework.kafka.annotation.KafkaListener来监听消息

package com.example.springbootkafka.service;import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class KafkaConsumer {@KafkaListener(topics = "test", groupId = "myGroupId")public void consume(String message) {System.out.println("Received message: " + message);log.info("KafkaConsumer message:{}", message);}
}

5.测试消息的生成与发送

package com.example.springbootkafka.controller;import com.example.springbootkafka.entity.User;
import com.example.springbootkafka.service.KafkaProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
public class MessageController {private final KafkaProducer producer;@Autowiredpublic MessageController(KafkaProducer producer) {this.producer = producer;}@GetMapping("/send-message")public String sendMessage() {log.info("MessageController sendMessage start!");producer.sendMessage("hello, Kafka!");log.info("MessageController sendMessage end!");return "Message sent successfully.";}@GetMapping("/send")public String sendMessage1() {log.info("MessageController sendMessage1 start!");User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();try {producer.sendUser(user);} catch (JsonProcessingException e) {throw new RuntimeException(e);}log.info("MessageController sendMessage1 end!");return "Message sendMessage1 successfully.";}
}

 6.查看结果:

 

详细代码见https://gitee.com/dylan_2017/springboot-kafka.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/142318.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Qt --- 信号和信号槽

前言 Linux信号Signal&#xff0c;系统内部的通知机制&#xff0c;进程间通信方式。 信号源&#xff1a;谁发的信号。 信号的类型&#xff1a;哪种类别的信号。 信号的处理方式&#xff1a;注册信号处理函数&#xff0c;在信号被触发的时候自动调用执行。 Qt中的信号和Lin…

利士策分享,中秋佳节:月满人团圆的文化传承与演绎

利士策分享&#xff0c;中秋佳节&#xff1a;月满人团圆的文化传承与演绎 在中国丰富多彩的传统节日中&#xff0c;中秋节以其独特的魅力&#xff0c;承载着深厚的文化底蕴和民族情感。 这一节日的起源&#xff0c;宛如一幅缓缓展开的历史画卷&#xff0c;融合了古人对天象的…

栈、队列、树、哈希表

栈 先进后出&#xff0c;添加元素直接memcpy 到对应数组位置就可以&#xff0c;top是栈中存储的元素个数&#xff0c;最后一个元素下标为top-1&#xff1b; 删除元素时直接top--&#xff1b; 后面添加进入的数据会覆盖原来在栈上被删除的数据。 main.c符号匹配 链栈 main.c 队…

为什么说开放式耳机比入耳式的好?学生党必入的蓝牙耳机推荐

因为开放式耳机相比入耳式耳机更具优势&#xff0c;具体如下&#xff1a; 佩戴舒适度更高&#xff1a; 开放式耳机通常不需要插入耳道&#xff0c;不会对耳道产生压迫&#xff0c;长时间佩戴耳朵不易感到闷热、疼痛或不适&#xff0c;减少了对耳部的物理压迫和摩擦&#xff0…

深入浅出Docker

1. Docker引擎 Docker引擎是用来运行和管理容器的核心软件。通常人们会简单的将其指代为Docker或Docker平台。 基于开放容器计划&#xff08;OCI&#xff09;相关的标准要求&#xff0c;Docker引擎采用了模块化的设计原则&#xff0c;其组件是可替换的。 Docker引擎由如下主…

形态学的基本操作在图片中的应用

一、形态学——腐蚀操作 &#xff08;缩小、变细&#xff09; import cv2 import numpy as npimg_pig cv2.imread(pig.png) cv2.imshow(image_pig,img_pig) cv2.waitKey(0) cv2.destroyAllWindows()def cv_show(name, img):cv2.imshow(name, img)cv2.waitKey(0)cv2.destroyAl…

element-plus的菜单组件el-menu

菜单是几乎是每个管理系统的软件系统中不可或缺的&#xff0c;element-plus提供的菜单组件可以快速完成大部分的菜单的需求开发&#xff0c; 该组件内置和vue-router的集成&#xff0c;使用起来很方便。 主要组件如下 el-menu 顶级菜单组件 主要属性 mode:决定菜单的展示模式…

MySQL篇(高级字符串函数/正则表达式)(持续更新迭代)

目录 讲点一&#xff1a;高级字符串函数 一、简介 二、常见字符串函数 1. CONCAT() 2. SUBSTRING() 3. LENGTH() 4. REPLACE() 5. TRIM() 6. UPPER() 7. LOWER() 8. LEFT() 9. RIGHT() 10. INSTR() 11. LENTH(str) 讲点二&#xff1a;正则表达式 一、简介 二、…

Defining Additional PhysicalConstraints

步骤3&#xff1a;定义附加物理 约束条件 在此步骤中&#xff0c;您将定义设计的其他物理约束&#xff0c;例如 PACKAGE_PIN和禁止约束。 1.选择布局→I/O规划&#xff0c;从布局选择器打开I/O规划视图布局 在工具栏菜单中。 I/O规划视图布局显示包窗口以及I/O端口和 封装引脚窗…

华为地图服务 - 如何开启和展示“我的位置”? -- HarmonyOS自学10

一. 场景介绍 本章节将向您介绍如何开启和展示“我的位置”功能&#xff0c;“我的位置”指的是进入地图后点击“我的位置”显示当前位置点的功能。效果如下&#xff1a; 二. 接口说明 “我的位置”功能主要由MapComponentController的方法实现&#xff0c;更多接口及使用方法…

学习笔记(一)

前言 一、对象 1、由类建模而成&#xff0c;是消息、数据和行为的组合 2、可以接收和发送消息&#xff0c;并利用消息进行彼此的交互。消息要包含传送给对象接收的信息 3、类的实例化&#xff1a;把类转换为对象的过程叫类的实例化。 4、对象的特性 (1) 对象有状态&#…

QUIC的loss detection学习

PTO backoff backoff 补偿 /ˈbkɒf/PTO backoff 是QUIC&#xff08;Quick UDP Internet Connections&#xff09;协议中的一种机制&#xff0c;用于处理探测超时&#xff08;Probe Timeout, PTO&#xff09;重传策略 它逐步增加探测超时的等待时间&#xff0c;以避免网络拥塞…

【FreeRTOS】任务

1.使用stm32cubemx配置freertos 2.创建任务 我们需要在MX_FREERTOS_Init()里面创建任务 我们根据上面的任务创建方式&#xff0c;实现GPIO_PIN_10的反转 1.任务句柄 2.任务结构体 3.任务执行函数 4.任务函数声明 5.创建线程执行任务 hal_delay和osDelay区别&#xff1f;…

Qt (17)【Qt 文件操作 读写保存】

阅读导航 引言一、Qt文件概述二、输入输出设备类三、文件读写类四、文件和目录信息类五、自定义“记事本” 引言 在上一篇文章中&#xff0c;我们学习了Qt的事件处理机制&#xff0c;知道了如何响应用户的操作。但应用程序常常还需要处理文件&#xff0c;比如读写数据。所以&a…

应用软件系统开发 实操一:任务需求描述

一、实操一&#xff1a;任务需求描述 软件和信息技术服务业数据统计平台是一个为不同级别管理员提供定制化服务的系统。它主要面向数据管理员和系统运维管理员&#xff0c;每一种用户角色各自拥有特定的功能权限。系统运维管理员是专门针对平台基础功能的管理人员&#xff0c;它…

08_Python数据类型_字典

Python的基础数据类型 数值类型&#xff1a;整数、浮点数、复数、布尔字符串容器类型&#xff1a;列表、元祖、字典、集合 字典 字典&#xff08;Dictionary&#xff09;是一种可变容器模型&#xff0c;它可以存储任意类型对象&#xff0c;其中每个对象都存储为一个键值对。…

如何评估土壤功能?瓦赫宁根大学研究团队在土壤学一区TOP期刊最新成果给出答案!

本文首发于“生态学者”微信公众号&#xff01; 土壤健康是农业可持续发展的关键因素之一&#xff0c;而土壤有机碳&#xff08;Soil Organic Carbon, SOC&#xff09;含量是衡量土壤健康最常用的指标。然而&#xff0c;许多土壤功能不仅受SOC总量的影响&#xff0c;还与其质量…

FreeRTOS学习——链表list

FreeRTOS学习——链表&#xff08;列表&#xff09;list&#xff0c;仅用于记录自己阅读与学习源码 FreeRTOS Kernel V10.5.1 参考大佬的好文章&#xff1a; freertos内核原理 Day1(链表) FreeRTOS-链表的源码解析 *list_t只能存储指向list_item_t的指针。每个list_item_t都…

photozoom classic 9解锁码2024年最新25位解锁码

photozoom classic 9 破解版顾及比恐龙还要稀有&#xff0c;我曾经和你一样一直再找&#xff0c;找了好几个月&#xff0c;也没有找到真的破解版&#xff0c;下载很多次&#xff0c; 都是病毒插件之类的 我昨天下了几次&#xff0c;没有一个不附带插件病毒木马的.......&#x…

基于深度学习的图像分类或识别系统(含全套项目+PyQt5界面)

目录 一、项目界面 二、代码实现 1、网络代码 2、训练代码 3、评估代码 4、结果显示 三、项目代码 一、项目界面 二、代码实现 1、网络代码 该网络基于残差模型修改 import torch import torch.nn as nn import torchvision.models as modelsclass resnet18(nn.Modul…