kafka及异步通知文章上下架

1)自媒体文章上下架

需求分析

2)kafka概述

消息中间件对比

特 性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

开 发 语 言

java

erlang

java

scala

单 机 吞 吐 量

万级

万级

10万级

100万级

时 效 性

ms

us

ms

ms级以内

可 用 性

高(主从)

高(主从)

非常高(分布 式)

非常高(分布 式)

功 能 特 性

成熟的产品、较全的 文档、各种协议支持 好

并发能力强、 性能好、延迟 低

MQ功能比较 完善,扩展性 佳

只支持主要的MQ功能, 主要应用于大数据领域

消息中间件对比-选择建议

消息中间 件

建议

Kafka

追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务

RocketMQ

靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验

RabbitMQ

性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的 RabbitMQ

kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apach e.org/

kafka介绍-名词解释

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

3)kafka安装配置

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

https://blog.csdn.net/m0_70325779/article/details/137248462

4)kafka入门

生产者发送消息,多个消费者只能有一个消费者接收到消息生产者发送消息,多个消费者都可以接收到消息

(1)创建kafka-demo项目,导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
(2)生产者发送消息
/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) throws ExecutionException, 
InterruptedException {//1.kafka链接配置信息Properties prop = new Properties();//kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.se
rialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.
serialization.StringSerializer");//2.创建kafka生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String,String>
(prop);//3.发送消息/*** 第一个参数 :topic* 第二个参数:消息的key* 第三个参数:消息的value*/ProducerRecord<String,String> kvProducerRecord = new 
ProducerRecord<String,String>("topic-first","key-001","hello kafka");//同步发送消息RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();System.out.println(recordMetadata.offset());//4.关闭消息通道 必须要关闭,否则消息发送不成功producer.close();}
}
(3)消费者接收消息
/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties prop = new Properties();//链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");//设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//2.创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(prop);//3.订阅主题consumer.subscribe(Collections.singletonList("topic-first"));//4.拉取消息while (true) {ConsumerRecords<String, String> consumerRecords = 
consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : 
consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}
}
使用情景:
  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息(一对一)
  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息(一对多)

springboot集成kafka入门

1.导入spring-kafka依赖信息
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>
2.在resources下创建文件application.yml
server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: 
org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
4.消息消费者

传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
  • 接收消息

自媒体文章上下架功能完成

需求分析

已发表且已下架的文章可以上架

流程说明

接口定义

说明

接口路径

/api/v1/news/down_or_up

请求方式

POST

参数

DTO

响应结果

ResponseResult

DTO
@Data
public class WmNewsDto {private Integer id;/*** 是否上架 0 下架 1 上架*/private Short enable;}
ResponseResult

自媒体文章上下架-功能实现

接口定义

water-wemedia工程

在water-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){return null;
}

在WmNewsDto中新增enable属性,完整的代码如下:

@Data
public class WmNewsDto {private Integer id;/*** 标题*/private String title;/*** 频道id*/private Integer channelId;/*** 标签*/private String labels;/*** 发布时间*/private Date publishTime;/*** 文章内容*/private String content;/*** 文章封面类型 0 无图 1 单图 3 多图 -1 自动*/private Short type;/*** 提交时间*/private Date submitedTime; /*** 状态 提交为1 草稿为0*/private Short status;/*** 封面图片列表 多张图以逗号隔开*/private List<String> images;/*** 上下架 0 下架 1 上架*/private Short enable;
}
业务层编写

在WmNewsService新增方法

/*** 文章的上下架* @param dto* @return*/
public ResponseResult downOrUp(WmNewsDto dto);
实现方法
/*** 文章的上下架* @param dto* @return*/
@Override
public ResponseResult downOrUp(WmNewsDto dto) {//1.检查参数if(dto.getId() == null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.查询文章WmNews wmNews = getById(dto.getId());if(wmNews == null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不
存在");}//3.判断文章是否已发布if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章
不是发布状态,不能上下架");}//4.修改文章enableif(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){update(Wrappers.
<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
.eq(WmNews::getId,wmNews.getId()));}return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
控制器
@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){return wmNewsService.downOrUp(dto);
}
测试
消息通知article端文章上下架
在water-common模块下导入kafka依赖
<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
在自媒体端的nacos配置中心配置kafka的生产者
spring:kafka:bootstrap-servers: localhost:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
在自媒体端文章上下架后发送消息
//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){Map<String,Object> map = new HashMap<>();map.put("articleId",wmNews.getArticleId());map.put("enable",dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONS
tring(map));
}
常量类:
public class WmNewsMessageConstants {public static final String 
WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}
在article端的nacos配置中心配置kafka的消费者
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: 
org.apache.kafka.common.serialization.StringDeserializer
在article端编写监听,接收数据
@Component
@Slf4j
public class ArtilceIsDownListener {@Autowiredprivate ApArticleConfigService apArticleConfigService;@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}
}

修改ap_article_config表的数据新建ApArticleConfigService

public interface ApArticleConfigService extends IService<ApArticleConfig> {/*** 修改文章配置* @param map*/public void updateByMap(Map map);
}
实现类:
@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends 
ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements 
ApArticleConfigService {/*** 修改文章配置* @param map*/@Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}//修改文章配置update(Wrappers.
<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articl
eId")).set(ApArticleConfig::getIsDown,isDown));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1522914.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

c++(list)

目录 迭代器 sort(随机迭代器)​编辑 list(双向迭代器) vector(随记迭代器) find(input迭代器--只读--可传任意类型迭代器) ​编辑 尾插 push_back/emplace_back 插入数据 删除 交换(swap) 排序 链表合并(merge) 删除(remove) 剪切(splice) 去重(un…

Opencv中的直方图(3)直方图比较函数compareHist()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 比较两个直方图。 函数 cv::compareHist 使用指定的方法比较两个密集或两个稀疏直方图。 该函数返回 d ( H 1 , H 2 ) d(H_1, H_2) d(H1​,H2​…

巧用xrename批量重命名下载的影视文件

网上下载了个电视剧&#xff0c;可是文件名比较长&#xff0c;而且是集数用中文表示的&#xff0c;排序都是乱的。期望的是&#xff1a; 1.文件名改短 2.中文的数字改成阿拉伯数字 看下原始文件名&#xff1a; 期望将文件名改短&#xff0c;例如&#xff1a; 修改前&#xff…

Golang环境安装、配置详细

Windows下安装Go开发环境 点我下载 Windows配置Go环境变量 出现工具install失败时&#xff0c;切换其它代理 # 1. 七牛 CDN go env -w GOPROXYhttps://goproxy.cn,direct# 2. 阿里云 go env -w GOPROXYhttps://mirrors.aliyun.com/goproxy/,direct# 3. 官方 go env -w GOP…

硬件工程师笔试面试知识器件篇——电阻

目录 1、电阻 1.1 基础 电阻原理图 阻实物图 1.1.1、定义 1.1.2、工作原理 1.1.3、类型 1.1.4、材料 1.1.5、标记 1.1.6、应用 1.1.7、特性 1.1.8、测量 1.1.9、计算 1.1.10、颜色编码 1.1.11、公差 1.1.12、功率 1.1.13、重要性 1.2、相关问题 1.2.1、电阻…

电路分析 ---- 同相比例放大器和电压跟随器

1 同相比例运算放大器 同相比例运算放大电路引入了电压串联负反馈&#xff0c;故可以认为输入电阻无穷大&#xff0c;输出电阻为0 分析过程 虚短&#xff0c; u N u P u I u_{N}u_{P}u_{I} uN​uP​uI​ i F i R → u o − u N R f u N − 0 R → u o − u I R f u I R i…

【前端面试】leetcode指针解法javascript

最大盛水 https://leetcode.cn/problems/container-with-most-water/ var maxArea = function(height) {// 左右指针靠拢let left = 0;let right = height.length-1;let maxArea = 0; while(left<right){// 计算出 当前的容积 与最大容积比较,取出最大的const currentAre…

算法数学加油站:一元高斯分布(正态分布)Python精美科研绘图(PDF、CDF、PPF、ECDF曲线;QQ图)

这类博客针对算法学习时可能遇到的数学知识补充&#xff0c;但不会太多废话&#xff0c;主要是公式结合Python代码精美绘图理解&#xff01; 本期重点&#xff1a; 参数&#xff1a;期望、标准差曲线&#xff1a;概率密度曲线PDF、累积概率密度函数CDF、百分点函数PPF应用&am…

【Webpack】基本使用方法

参考视频&#xff1a; 30 分钟掌握 Webpack_哔哩哔哩_bilibili 什么是webpack 简单来说就是一个 打包工具&#xff0c; 可以将互相依赖的html、css、js以及图片字体等资源文件&#xff0c;经过处理打包成一个可执行的项目文件 &#x1f330;看例子 环境初始化 在需要使用…

C语言 09 流程控制

if 如果需要判断某个条件&#xff0c;当满足此条件时&#xff0c;才执行某些代码&#xff0c;那这个时候该怎么办呢&#xff1f;可以使用if语句来实现&#xff1a; #include <stdio.h>int main() {int i 0;// 只希望i大于10的时候才执行下面的打印语句if (i > 10) …

YUM配置文件开启缓存

可设置一台服务器或者云主机作为外网YUM源&#xff0c;并且开启yum在线下载缓存&#xff0c;之后可将服务器所有安装包缓存同步到内网本地。 # vim /etc/yum.conf 将 “keepcache0” 改为 “keepcache1” 缓存目录为 /var/cache/yum/xxx/xxx/xxx 日后可下载到本地然后上传到…

024集—— 正则表达式、replace、DateTime日期的用法——C#学习笔记

DateTime 是一个struct结构体。 代码如下&#xff1a; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace ConsoleApp1 {internal class Program{static void Main(string[] args){args new s…

HNU OS实验六

本内容针对湖南大学特色os实验前言 — os2024 lab 文档

ubuntu驱动掉了,重装nvidia驱动

跑深度学习&#xff0c;忽然发现显卡驱动掉了 主要根据这篇文章&#xff1a;[超级详细系列]ubuntu22.04配置深度学习环境(显卡驱动CUDAcuDNNPytorch)--[1]安装显卡驱动_ubuntu22 cuda cudnn pytorch-CSDN博客 用里面的在线安装方法不行&#xff0c;换成用2.2 离线安装方法。从…

动态路由和路由导航守卫及其案例分析

为什么需要动态路由&#xff1f; 动态路由其实用的不多&#xff0c;在实际开发中&#xff0c;如果遇到权限分配问题&#xff0c;比如对于一个公司人员的后台管理系统&#xff0c;那对不同成员的权限肯定不同&#xff0c;对于人事部&#xff0c;他们有权限进入成员表对人员的流…

云计算实训41——部署project_exam_system项目(续)

# 创建脚本&#xff0c;可以在java环境中运行任何的jar包或者war包#!/bin/bash/usr/local/jdk/bin/java -jar /java/src/*.?ar一、思路分析 &#xff08;1&#xff09;nginx 1、下载镜像&#xff0c;将本地的dist项目的目录挂载在容器的/usr/share/nginx/html/ 2、启动容器 …

性能工具之 JMeter ajax 简单登录案例实战

文章目录 一、前言二、前置工作三、登陆密码分析四、JMeter脚本开发四、登陆性能分析五、小结 一、前言 想起论语中的 “学而时习之不亦说乎” &#xff0c;也想找个开源项目实战一把&#xff0c;下面用一个开源ERP系统中的登陆做今天的实战。 二、前置工作 开源ERP项目地址…

getLocation:fail, the permission value is offline verifying

getLocation:fail, the permission value is offline verifying 后端会根据appid和secret生成 签名&#xff0c;前端wx配置时一定用appid来验证签名的正确 本次错误为配置初始化失败&#xff1a;前端与后端的appId不一致&#xff0c;我的失误也

IP 协议详解

一、认识 IP 地址与网络层的职责 网络层是OSI七层模型中的第三层&#xff0c;也是TCP/IP四层模型中的网络接入层。在这一层&#xff0c;数据包被封装并加上IP层的头部信息&#xff0c;以便在网络之间传输。网络层的主要功能包括路由选择、分段与重组、拥塞控制以及IP地址管理等…

stm32的内部时钟源 | RC震荡电路

文章目录 前言学习了解 前言 了解到 内部高速RC振荡器&#xff08;HSI&#xff09;就是RC震荡器实现的&#xff0c;故想对RC震荡做些了解与分析。 学习了解 【不需要晶振&#xff0c;也可产生时钟脉冲&#xff01;RC振荡器的工作原理&#xff0c;维恩电桥振荡器&#xff01;…