xxljob分片广播+多线程实现高效定时同步elasticsearch索引库

需求:为了利用elasticsearch实现高效搜索,需要将mysql中的数据查出来,再定时同步到es里,同时在同步过程中通过分片广播+多线程提高同步数据的效率。

1. 添加映射

  • 使用kibana添加映射
PUT  /app_info_article
{"mappings":{"properties":{"id":{"type":"long"},"publishTime":{"type":"date"},"layout":{"type":"integer"},"images":{"type":"keyword","index": false},"staticUrl":{"type":"keyword","index": false},"authorId": {"type": "long"},"authorName": {"type": "keyword"},"title":{"type":"text","analyzer":"ik_max_word","copy_to": "all"},"content":{"type":"text","analyzer":"ik_max_word","copy_to": "all"},"all":{"type": "text","analyzer": "ik_max_word"}}}
}
  • 使用http请求
    PUT请求添加映射:http://192.168.200.130:9200/app_info_article
    GET请求查询映射:http://192.168.200.130:9200/app_info_article
    DELETE请求,删除索引及映射:http://192.168.200.130:9200/app_info_article
    GET请求,查询所有文档:http://192.168.200.130:9200/app_info_article/_search
    在这里插入图片描述

2. springboot测试

引入依赖

<!--elasticsearch--><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.12.1</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.12.1</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.12.1</version><exclusions><exclusion><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-smile</artifactId></exclusion><exclusion><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId></exclusion></exclusions></dependency>

elasticsearch配置

#自定义elasticsearch连接配置
elasticsearch:host: 192.168.200.131port: 9200

elasticsearch配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {private String host;private int port;@Beanpublic RestHighLevelClient client(){return new RestHighLevelClient(RestClient.builder(new HttpHost(host,port,"http")));}
}

实体类

@Data
public class SearchArticleVo {// 文章idprivate Long id;// 文章标题private String title;// 文章发布时间private Date publishTime;// 文章布局private Integer layout;// 封面private String images;// 作者idprivate Long authorId;// 作者名词private String authorName;//静态urlprivate String staticUrl;//文章内容private String content;}

测试。测试成功后别忘了删除app_info_article的所有文档

@SpringBootTest
@RunWith(SpringRunner.class)
public class ApArticleTest {@Autowiredprivate ApArticleMapper apArticleMapper;@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 注意:数据量的导入,如果数据量过大,需要分页导入*  1)查询数据库数据*  2)将数据写入到ES中即可*     创建BulkRequest*            ================================*            ||A:创建XxxRequest*            ||B:向XxxRequest封装DSL语句数据*            ||                             X C:使用RestHighLevelClient执行远程请求*            ================================*            将XxxRequest添加到BulkRequest*       使用RestHighLevelClient将BulkRequest添加到索引库* @throws Exception*/@Testpublic void init() throws Exception {//1)查询数据库数据List<SearchArticleVo> searchArticleVos = apArticleMapper.loadArticleList();//2)创建BulkRequest - 刷新策略BulkRequest bulkRequest = new BulkRequest()//刷新策略-立即刷新.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);for (SearchArticleVo searchArticleVo : searchArticleVos) {//A:创建XxxRequestIndexRequest indexRequest = new IndexRequest("app_info_article")//B:向XxxRequest封装DSL语句数据.id(searchArticleVo.getId().toString()).source(JSON.toJSONString(searchArticleVo), XContentType.JSON);//3)将XxxRequest添加到BulkRequestbulkRequest.add(indexRequest);}//4)使用RestHighLevelClient将BulkRequest添加到索引库restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);}}

3. 核心代码

3.1 xxljob配置

xxl:job:accessToken: default_tokenadmin:addresses: http://127.0.0.1:8080/xxl-job-adminexecutor:address: ''appname: hmttip: ''logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30port: 9998
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
}

3.2 elasticsearch配置

和前面一样

3.3 任务编写

@Component
public class SyncIndexTask {@Autowiredprivate IArticleClient articleClient;@Autowiredprivate RestHighLevelClient restHighLevelClient;//线程池public static ExecutorService pool = Executors.newFixedThreadPool(10);/**** 同步索引任务*  1)当数量大于100条的时候,才做分片导入,否则只让第1个导入即可*      A:查询所有数据量 ->searchTotal total>100 [判断当前分片不是第1个分片]*      第N个分片执行数据处理范围-要计算   确定当前分片处理的数据范围  limit #{index},#{size}*                                                                [index-范围]**      B:执行分页查询-需要根据index判断是否超过界限,如果没有超过界限,则并开启多线程,分页查询,将当前分页数据批量导入到ES**      C:在xxl-job中配置作业-策略:分片策略**/@XxlJob("syncIndex")public void syncIndex()  {//1、获取任务传入的参数   {"minSize":100,"size":10}String jobParam = XxlJobHelper.getJobParam();Map<String,Integer> jobData = JSON.parseObject(jobParam,Map.class);int minSize = jobData.get("minSize"); //分片处理的最小总数据条数int size =  jobData.get("size"); //分页查询的每页条数   小分页//2、查询需要处理的总数据量  total=IArticleClient.searchTotal()Long total = articleClient.searchTotal();//3、判断当前分片是否属于第1片,不属于,则需要判断总数量是否大于指定的数据量[minSize],大于,则执行任务处理,小于或等于,则直接结束任务int cn = XxlJobHelper.getShardIndex(); //当前节点的下标if(total<=minSize && cn!=0){//结束return;}//4、执行任务   [index-范围]   大的分片分页处理//4.1:节点个数int n = XxlJobHelper.getShardTotal();//4.2:当前节点处理的数据量int count = (int) (total % n==0? total/n :  (total/n)+1);//4.3:确定当前节点处理的数据范围//从下标为index的数据开始处理  limit #{index},#{count}int indexStart = cn*count;int indexEnd = cn*count+count-1; //最大的范围的最后一个数据的下标//5.小的分页查询和批量处理int index =indexStart; //第1页的indexSystem.out.println("分片个数是【"+n+"】,当前分片下标【"+cn+"】,处理的数据下标范围【"+indexStart+"-"+indexEnd+"】");do {//=============================================小分页================================//5.1:分页查询//5.2:将数据导入ESpush(index,size,indexEnd);//5.3:是否要查询下一页 index+sizeindex = index+size;}while (index<=indexEnd);}/*** 数据批量导入* @param index* @param size* @param indexEnd* @throws IOException*/public void push(int index,int size,int indexEnd)  {pool.execute(()->{System.out.println("当前线程处理的分页数据是【index="+index+",size="+(index+size>indexEnd? indexEnd-index+1 : size)+"】");//1)查询数据库数据List<SearchArticleVo> searchArticleVos = articleClient.searchPage(index, index+size>indexEnd? indexEnd-index+1 : size);  //size可能越界//2)创建BulkRequest - 刷新策略BulkRequest bulkRequest = new BulkRequest()//刷新策略-立即刷新.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);for (SearchArticleVo searchArticleVo : searchArticleVos) {//A:创建XxxRequestIndexRequest indexRequest = new IndexRequest("app_info_article")//B:向XxxRequest封装DSL语句数据.id(searchArticleVo.getId().toString()).source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);//3)将XxxRequest添加到BulkRequestbulkRequest.add(indexRequest);}//4)使用RestHighLevelClient将BulkRequest添加到索引库if(searchArticleVos!=null && searchArticleVos.size()>0){try {restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}}});}
}

3.4 xxl-admin新增任务

  • 新增执行器 这里的appName要和XxlJobConfig 中的appname保持一致。也可以不用新增执行器,执行器就相当于一个分组的作用。
    在这里插入图片描述
  • 新增任务
    在这里插入图片描述

4. 模拟集群,运行项目

  • 运行3个SearchApplication项目,设置xxl.job.executor.port分别为9998,9997,9996
    在这里插入图片描述
  • 执行一次任务
    在这里插入图片描述
  • 查看结果,3个application都成功了
    在这里插入图片描述
  • kibana查看是否有数据,发现有18条,mysql的数据全部被导入了

在这里插入图片描述

5. 总结

  • xxljob分片广播:假如一共有1000条数据,有3个节点上运行着SearchApplication服务。那么每个节点需要同步的数据总条数为334,334,332条。
    在这里插入图片描述

  • 分页查询:节点0的任务总条数为334条,那么需要做分页(假设分页size为20)查询的次数为17次,每查1次后,将查到的数据通过restHighLevelClient发送到es中。

do {//5.1:分页查询//5.2:将数据导入ESpush(index,size,indexEnd);  //分页查询+导入es的操作//5.3:是否要查询下一页 index+sizeindex = index+size;
}while (index<=indexEnd);
  • 多线程:上述代码,push方法包括了分页查询+导入es的操作,是低效的。并且push方法不结束的话,下一页的操作不会开始。这里可以用多线程,每次push的时候都开启一个新线程,这样每一页的操作都是独立的,可以同时查询,同时导入到es,不会互相影响。这里使用了线程池。
public static ExecutorService pool = Executors.newFixedThreadPool(10);
......
public void push(int index,int size,int indexEnd)  {pool.execute(()->{//分页查询+导入到es});}
  • elasticsearch的相关api:参考我另一篇博客 项目中使用Elasticsearch的API相关介绍
//2)创建BulkRequest - 刷新策略BulkRequest bulkRequest = new BulkRequest()//刷新策略-立即刷新.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);for (SearchArticleVo searchArticleVo : searchArticleVos) {//A:创建XxxRequestIndexRequest indexRequest = new IndexRequest("app_info_article")//B:向XxxRequest封装DSL语句数据.id(searchArticleVo.getId().toString()).source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);//3)将XxxRequest添加到BulkRequestbulkRequest.add(indexRequest);}//4)使用RestHighLevelClient将BulkRequest添加到索引库if(searchArticleVos!=null && searchArticleVos.size()>0){try {restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1424522.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

传输层协议——TCP协议

TCP协议又叫传输控制协议&#xff0c;TCP/IP协议是计算机通信网络中目前使用最多的协议&#xff0c;同时也融入了生活的方方面面&#xff0c;不管是浏览网页使用的http/https协议、物联网设备使用的MQTT/MQTTS协议与下载文件使用的ftp协议、工业以太网中使用的Modbus TCP协议等…

不用投稿邮箱,怎样向各大新闻媒体投稿?

身为单位的信息宣传员,我深知肩上责任重大。每个月,完成单位在媒体上投稿发表文章的考核任务,就如同一场无声的赛跑,既要保证速度,更要注重质量。起初,我遵循“前辈们”的老路,一头扎进了邮箱投稿的海洋。但很快,现实给了我一记重拳——邮箱投稿的竞争犹如千军万马过独木桥,稿件…

维护表空间中的数据文件

目录 向表空间中添加数据文件 从表空间中删除数据文件 删除users表空间中的users02.dbf数据文件 对数据文件的自动扩展设置 Oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 维护表空间中的数据文件主要包括向表空间中添…

单位个人怎样向报社的报纸投稿?

作为一名单位的信息宣传员,我肩负着每月定期在媒体上投稿发表文章的重任。然而,在投稿的道路上,我经历了不少波折和挫折。 一开始,我天真地以为只要将稿件发送到报社的投稿邮箱,就能轻松完成任务。然而,现实却远比我想象的复杂。邮箱投稿的竞争异常激烈,编辑们会在众多稿件中挑…

【图神经网络——消息传递】

消息传递机制 画图先&#xff1a;导包&#xff1a;画图&#xff1a; 实现消息传递&#xff1a;例子一&#xff1a;例子二&#xff1a; 画图先&#xff1a; 导包&#xff1a; import networkx as nx import matplotlib.pyplot as plt import torch from torch_geometric.nn im…

tensorrtx-yolov5-v6.0部署在windows系统

前言&#xff1a;最近几天一直在搞这个东西&#xff0c;现在跑通了&#xff0c;为了以后自己看和帮助他人&#xff0c;就记录一下。虽然是跑通了但是觉得怪怪的&#xff0c;感觉不是自己想要的效果&#xff0c;另外这个只能检测图片&#xff0c;不能摄像头实时监测(我暂时没找到…

震撼发布!GPT-4o 上线!

5 月 14日凌晨一点&#xff0c;OpenAI 发布了 GPT-4o&#xff01; 新模型的功能简单概括就是&#xff1a;更快、更智能、更像人类。 秉承着持续更新的态度&#xff0c;Hulu AI 快速接入 GPT-4o 啦&#xff01; 继 5 月份上线 Suno 之后&#xff0c;这次是 Hulu AI 的又一重大…

Linux---编辑器vim的认识与简单配置

前言 我们在自己的电脑上所用的编译软件&#xff0c;就拿vs2022来说&#xff0c;我们可以在上面写C/C语言、python、甚至java也可以在上面进行编译&#xff0c;这种既可以用来编辑、运行编译&#xff0c;又可以支持很多种语言的编译器是一种集成式开发环境&#xff0c;集众多于…

MP3解码入门(基于libhelix)

主要参考资料: 【Arduino Linux】基于 Helix 解码库实现 MP3 音频播放: https://blog.csdn.net/weixin_42258222/article/details/122640413 libhelix-mp3: https://github.com/ultraembedded/libhelix-mp3/tree/master 目录 一、MP3文件二、MP3 解码库三、libhelix-mp3库3.1 …

如何去除字符串两侧的空白字符?

TRIM函数会去掉字符串左侧和右侧的空格&#xff0c;语法是&#xff1a;TRIM(字符串) excel中&#xff0c;TRIM函数能去掉字符串左侧和右侧的空格&#xff0c;它的ASCII码是32。 以下设定一个字符串组合&#xff0c;它的第一个字符中空格&#xff0c;最后一个字符是换行符 &q…

Pathlib,一个不怕迷路的 Python 向导

大家好&#xff01;我是爱摸鱼的小鸿&#xff0c;关注我&#xff0c;收看每期的编程干货。 一个简单的库&#xff0c;也许能够开启我们的智慧之门&#xff0c; 一个普通的方法&#xff0c;也许能在危急时刻挽救我们于水深火热&#xff0c; 一个新颖的思维方式&#xff0c;也许能…

年度更新!统信UOS服务器版V20(1070)超越期待

不负广大客户期待&#xff01; 统信UOS服务器版V20&#xff08;1070&#xff09;年度首更 功能更强大、性能更卓越、生态更丰富 助您畅享安全、便捷、高效的产品和服务 新平台&#xff0c;新生态 统信UOS服务器版始终坚持进行生态适配&#xff0c;目前已支持超过百万种兼容…

Python 全栈体系【四阶】(四十四)

第五章 深度学习 九、图像分割 3. 常用模型 3.4 DeepLab 系列 3.4.3 DeepLab v3&#xff08;2017&#xff09; 在DeepLab v3中&#xff0c;主要进行了以下改进&#xff1a; 使用更深的网络结构&#xff0c;以及串联不同膨胀率的空洞卷积&#xff0c;来获取更多的上下文信…

使用TerraScan静态扫描KubernetsIaC文件

terrascan https://github.com/tenable/terrascan Terrascan 是基础架构即代码的静态代码分析器。Terrascan 允许&#xff1a; 将基础架构作为代码无缝扫描&#xff0c;以查找错误配置。监控已配置的云基础架构&#xff0c;以查找引入终端安全评估漂移的配置更改&#xff0…

Java面试八股之Collection和Collections的区别

Java中Collection和Collections的区别 Collection 是一个接口&#xff0c;位于 java.util 包中&#xff0c;它是 Java 集合框架的顶层接口之一&#xff0c;代表了一组对象的集合。Collection 接口定义了所有集合类型&#xff08;如 List、Set、Queue 等&#xff09;所共有的基…

每周一算法:恰好经过K条边的最短路

题目描述 牛站 给定一张由 M M M 条边构成的无向图&#xff0c;点的编号为 1 ∼ 1000 1\sim 1000 1∼1000 之间的整数。 求从起点 S S S 到终点 E E E 恰好经过 K K K 条边&#xff08;可以重复经过&#xff09;的最短路。 注意: 数据保证一定有解。 输入格式 第 1 …

AquaCrop模型运行及结果分析、代码解析;气象、土壤、作物和管理措施等数据的准备和输入;农业水资源管理

目录 专题一 模型原理与数据要求 专题二 模型数据准备 专题三 模型运行及结果分析 专题四 参数分析 专题五 源代码分析 更多应用 AquaCrop是由世界粮食及农业组织&#xff08;FAO&#xff09;开发的一个先进模型&#xff0c;旨在研究和优化农作物的水分生产效率。这个模型…

海外住宅IP适用场景

海外住宅IP是指海外互联网服务提供商分配给海外家庭的IP地址&#xff0c;通常长期部署在特定区域的公网中&#xff0c;网络连接稳定&#xff0c;能够快速浏览指定网页内容&#xff0c;请求响应成功。它的适用场景十分广泛&#xff0c;接下来将详细介绍哪些领域会使用海外住宅IP…

错误: 找不到或无法加载主类问题(已解决)

今天在虚拟机中安装了idea2023.2的版本&#xff0c;运行代码时发现错误找不到主类&#xff01; 直接说结论&#xff1a; 我先clean了一下target&#xff0c;然后重新build&#xff0c;发现maven报错了&#xff0c;idea2023.2默认使用了内置的maven&#xff0c;然后我切换了一下…

垃圾分类管理系统java项目

文章目录 垃圾分类管理系统一、项目演示二、项目介绍三、系统部分功能截图四、部分代码展示五、底部获取项目&#xff08;9.9&#xffe5;带走&#xff09; 垃圾分类管理系统 一、项目演示 垃圾分类管理系统 二、项目介绍 系统角色&#xff1a;管理员、用户 1、登录、注册功能…