使用微服务Spring Cloud集成Kafka实现异步通信

在微服务架构中,使用Spring Cloud集成Apache Kafka来实现异步通信是一种常见且高效的做法。Kafka作为一个分布式流处理平台,能够处理高吞吐量的数据,非常适合用于微服务之间的消息传递。

微服务之间的通信方式包括同步通信和异步通信。

1)同步通信:通常通过HTTP RESTful API或RPC(远程过程调用)实现。服务消费者通过发送HTTP请求到服务提供者,服务提供者处理请求后返回响应。这种方式简单直接,但可能会受到网络延迟和并发量的影响。

同步通信的实现代码参见博文:微服务3:微服务间接口远程调用(同步通信方式)-CSDN博客

2)异步通信:通过消息队列(如RabbitMQ、Kafka等)实现。服务消费者将消息发送到队列中,服务提供者从队列中拉取消息并进行处理。这种方式实现了服务之间的解耦,提高了系统的可扩展性和容错性。但也需要考虑消息的顺序性、一致性和可靠性等问题。

1、本文目标

本文的目标是使用微服务Spring Cloud集成Kafka实现异步通信。本文实现了一个简单的Kafka Producer微服务,连接至部署再Ubuntu系统上的Kafka Server,同时在Ubuntu通过命令行终端启动一个监听的消费者,当通过浏览器测试接口想Kafka Producer微服务发送一条消息,Kafka Producer微服务即刻将该消息发送至Ubuntu系统上的Kafka Server,同时在Kafka consumer终端上可收到并显示出该消息。具体系统架构如下图所示。

部署Kafka Server和Kafka consumer,参见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的实现,参见博文:

微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

2、创建Kafka Producer

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka -DarchetypeArtifactId=maven-archetype-quickstart

完整代码的目录如下:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.test</groupId><artifactId>microservice-kafka</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>microservice-kafka</name><url>http://maven.apache.org</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>         <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>Hoxton.SR4</version><type>pom</type><scope>import</scope></dependency>               </dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

编辑application.yml,配置kafka:

bootstrap-servers: 192.168.23.131:9092其中192.168.23.131是Kafka Server的IP地址。

server:port: 8020
spring:application:name: microservice-kafkakafka:bootstrap-servers: 192.168.23.131:9092producer:retries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: alleureka:client:serviceUrl:defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: true            

App.java的完整代码如下:

package com.test;import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;@SpringBootApplication
@EnableDiscoveryClient
public class App 
{public static void main( String[] args ){System.out.println( "Hello World!" );SpringApplication.run(App.class, args);}
}

KafkaController.java的完整代码如下:

package com.test;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.kafka.core.*;@RequestMapping("/kafka")
@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("sendMsg")public String helloProducer(String msg){kafkaTemplate.send("mydemo1",msg);return "ok";}}

启动Kafka Producer 和Eureka

mvn spring-boot:run

3、启动Kafka Server及Consumer

bin/kafka-server-start.sh config/server.properties&

创建主题

./bin/kafka-topics.sh --create --bootstrap-server demo1:9092 --replication-factor 1 --partitions 1 --topic mydemo1

在命令行终端启动消费者

bin/kafka-console-consumer.sh --bootstrap-server demo1:9092 --topic mydemo1

4、浏览器测试

在浏览器输入:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

此时在Ubuntu的Consumer终端可以看到从浏览器输入的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1550808.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

ansible之playbook\shell\script模块远程自动安装nginx

通过shell模块&#xff0c; 编写安装nginx脚本&#xff0c;为yaml脚本&#xff0c;远程到135机器上安装并启动nginx - hosts: 192.168.45.135remote_user: roottasks:- name: 安装Nginx依赖环境和库文件yum: namewget,tar,make,gcc,pcre-devel,pcre,zlib-devel stateinstalle…

tr命令:替换文本中的字符

一、命令简介 ​tr​ 命令用于转换或删除文件中的字符。它可以从标准输入中读取数据&#xff0c;对数据进行字符替换、删除或压缩&#xff0c;并将结果输出到标准输出。 ‍ 二、命令参数 格式 tr [选项] [集合1] [集合2]选项和参数 ​ ​-c​​: 指定 集合 1 的补集。​ …

【STM32开发笔记】移植AI框架TensorFlow到STM32单片机【下篇】

【STM32开发笔记】移植AI框架TensorFlow到STM32单片机【下篇】 一、上篇回顾二、项目准备2.1 准备模板项目2.2 支持计时功能2.3 配置UART4引脚2.4 支持printf重定向到UART42.5 支持printf输出浮点数2.6 支持printf不带\r的换行2.7 支持ccache编译缓存 三、TFLM集成3.1 添加tfli…

“卷”智能, 从高质量算力开始

算力即国力&#xff0c;这已是产业共识。 当人工智能浪潮席卷全球之际&#xff0c;大家深刻感受到发展算力产业的重要性和紧迫性&#xff0c;高质量的人工智能算力已经与国家竞争、产业升级和企业转型息息相关。 去年&#xff0c;《算力基础设施高质量发展行动计划》的颁布&a…

springboot整合MybatisPlus+MySQL

上一篇&#xff1a;springboot整合sentinel和对feign熔断降级 文章目录 一、准备二、主要工作三、具体步骤3.1 准备数据库环境3.20 pre引入依赖3.2 引入依赖3.3 bootstrap.yml配置mybatisplus3.40 pre引入service、mapper3.4 引入实体类、service、mapper 四、测试目录结构 五…

InnoDB 死锁

文章目录 死锁案例等待超时时间InnoDB 状态信息死锁日志 死锁检测死锁日志分析 死锁是指多个事务无法继续进行的情况&#xff0c;因为每个事务都持有另一个事务所需的锁。因为所有涉及的事务都在等待同一资源可用&#xff0c;所以它们都不会释放它所持有的锁。 当事务锁定多个…

MongoDB 工具包安装(mongodb-database-tools)

首先到官网下载工具包&#xff0c;进入下面页面&#xff0c;复制连接地址&#xff0c;使用wget下载 cd /usr/local/mongodb5.0.14/wget https://fastdl.mongodb.org/tools/db/mongodb-database-tools-rhel70-x86_64-100.6.1.tgz 安装 tar -zxvf mongodb-database-tools-rhel70-…

Python库matplotlib之五

Python库matplotlib之五 小部件(widget)RangeSlider构造器APIs应用实列 TextBox构造器APIs应用实列 小部件(widget) 小部件(widget)可与任何GUI后端一起工作。所有这些小部件都要求预定义一个Axes实例&#xff0c;并将其作为第一个参数传递。 Matplotlib不会试图布局这些小部件…

LeetCode 热题 100 回顾2

干货分享&#xff0c;感谢您的阅读&#xff01;原文见&#xff1a;LeetCode 热题 100 回顾_力code热题100-CSDN博客 一、哈希部分 1.两数之和 &#xff08;简单&#xff09; 题目描述 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标…

自制CANTool_DBC_Layout仿制_布局读取Signal(三)

1、读取DBC中解析格式空格问题报错解决方法 原来解析方式&#xff1a;BO_ 258 EPS_CANFD_StrWhlASts: 8 Test 有的DBC中数据格式&#xff1a;BO_ 80 GW_50: 8 GW &#xff08;多了一个空格&#xff09; 解析匹配规则修订为&#xff1a; string MessageRegex "BO…

手机改IP地址怎么弄?全面解析与操作指南

在当今数字化时代&#xff0c;IP地址作为设备在网络中的唯一标识&#xff0c;其重要性不言而喻。有时候&#xff0c;出于隐私保护、网络访问需求或其他特定原因&#xff0c;我们可能需要更改手机的IP地址。然而&#xff0c;对于大多数普通用户来说&#xff0c;如何操作可能还是…

一文说完c++全部基础知识,IO流(二)

一、IO流 流、一连串连续不断的数据集合。 看下图&#xff0c;继承关系 using namespace 流类的构造函数 eg:ifstream::ifstream (const char* szFileName, int mode ios::in, int); #include <iostream> #include <fstream> using namespace std; int main()…

云计算 Cloud Computing

文章目录 1、云计算2、背景3、云计算的特点4、云计算的类型&#xff1a;按提供的服务划分5、云计算的类型&#xff1a;按部署的形式划分 1、云计算 定义&#xff1a; 云计算是一种按使用量付费的模式&#xff0c;这种模式提供可用的、便捷的、按需的网络访问&#xff0c;进入可…

0基础学习QT——配置开发环境

大纲 安装Qt配置Visual Studio 2022安装插件配置 测试 Qt框架&#xff0c;以其跨平台、高性能以及丰富的UI组件库而著称&#xff0c;是开发图形用户界面应用程序的理想选择。Visual Studio 2022提供了对Qt项目的深度支持&#xff0c;包括智能代码提示、代码导航、调试工具等&am…

矩阵奇异值

一、ATA 任给一个矩阵A&#xff0c;都有&#xff1a; ATA 为一个对称矩阵 例子&#xff1a;A为一个mn的矩阵&#xff0c;A的转置为一个nm的矩阵 对称矩阵的重要性质如下&#xff1a; ① 对称矩阵的特征值全为实数&#xff08;实数特征根&#xff09; ② 任意一个n阶对称矩阵…

基于微信小程序的旧衣回收系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

C++ string的基本运用详细解剖

string的基本操作 一.与C语言中字符串的区别二.标准库中的string三.string中常用接口的介绍1.string中常用的构造函数2.string类对象的容量操作函数3.string类对象的访问及遍历操作4.string类对象的修改操作5.string类的非成员函数6.string中的其他一些操作 一.与C语言中字符串…

Windows下VScode快速配置OpenCV开发环境 【快乐篇】

1.前言 由于业务需要本人通过vscode快速迭代配置了一版OpenCV的开发环境&#xff0c;因为要快所以直接用大佬们构建好的openCV就行。本人这里是64位的Window11下配置的。 2.前置工具 vscode IDE工具 3.安装VScode插件 C/CC/C Extension PackC/C ThemesCMakeCMake Tools 4.…

云服务架构与华为云架构

目录 1.云服务架构是什么&#xff1f; 1.1 云服务模型 1.2 云部署模型 1.3 云服务架构的组件 1.4 云服务架构模式 1.5 关键设计考虑 1.6 优势 1.7 常见的云服务架构实践 2.华为云架构 2.1 华为云服务模型 2.2 华为云部署模型 2.3 华为云服务架构的核心组件 2.4 华…

MySQL-MySQL访问

文章目录 前言一、使用步骤1.MYSQL *mysql_init(MYSQL *mysql);2.MYSQL *mysql_real_connectint mysql_query(MYSQL *mysql, const char *q);MYSQL_RES *mysql_store_result(MYSQL *mysql);my_ulonglong mysql_num_rows(MYSQL_RES *res);unsigned int mysql_num_fields(MYSQL_R…