文章目录
- 前言
- 11 自动补全
- 11.1 拼音分词器
- 11.2 自定义分词器
- 11.3 自动补全查询
- 12 数据同步
- 12.1 实现方案
- 12.1.1 同步调用
- 12.1.2 异步通知
- 12.1.3 监听binlog
- 12.2 异步通知实现数据同步
- 12.2.1 声明交换机和队列
- 12.2.2 发送MQ消息
- 12.2.3 接收MQ消息并操作ES
前言
ElasticSearch学习笔记(一)倒排索引、ES和Kibana安装、索引操作
ElasticSearch学习笔记(二)文档操作、RestHighLevelClient的使用
ElasticSearch学习笔记(三)RestClient操作文档、DSL查询文档、搜索结果排序
ElasticSearch学习笔记(四)分页、高亮、RestClient查询文档
ElasticSearch学习笔记(五)Bucket聚合、Metric聚合
11 自动补全
在搜索页面,当用户在搜索框输入字符时,应该提示出与该字符有关的搜索项。例如:
这种根据用户输入的字母,提示完整词条的功能,就是自动补全。由于需要根据拼音字母来推断,因此要用到拼音分词功能。
11.1 拼音分词器
要实现根据拼音字母做自动补全,就必须对文档按照拼音分词。
在GitHub上下载elasticsearch的拼音分词插件,地址:https://github.com/medcl/elasticsearch-analysis-pinyin
- 1)将下载的
elasticsearch-analysis-pinyin-7.12.1.zip
上传到服务器并解压
- 2)将插件移动到ES的插件目录
/var/lib/docker/volumes/es-plugins/_data/
下
- 3)重启ES
- 4)功能测试
11.2 自定义分词器
默认的拼音分词器会将每个汉字单独分为拼音,但仍然不能满足需求,我们希望的是每个词条形成一组拼音。为此需要对拼音分词器做个性化定制,形成自定义分词器。
ES分词器(analyzer)的组成包含三部分:
- character filters:在tokenizer之前对文本进行处理,例如删除字符、替换字符等;
- tokenizer:将文本按照一定的规则切割成词条(term),例如keyword(不分词)、ik_smart等;
- tokenizer filter:将tokenizer输出的词条做进一步处理,例如大小写转换、同义词处理、拼音处理等。
声明自定义分词器的DSL语法如下:
PUT /test
{"settings": {"analysis": {"analyzer": {"my_analyzer": { // 自定义分词器名称"tokenizer": "ik_max_word", // 词条切割规则"filter": "py" // 自定义的处理器}},"filter": {"py": { // 自定义处理器的具体实现,使用拼音处理"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}
功能测试:
11.3 自动补全查询
ES提供了Completion Suggester
查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型也有一些约束:
- 参与补全查询的字段必须是
completion
类型。 - 字段的内容一般是用来补全的多个词条形成的数组。
例如,把酒店的品牌、城市、商圈等信息放入一个completion
类型的字段中,作为自动补全的提示。
- 1)由于已经创建好的索引库是无法修改的,因此要删除然后重新创建
DELETE /hotel
- 2)修改索引库结构,主要做如下改动:设置自定义拼音分词器;修改
name
、all
字段,使用自定义分词器;添加一个新字段suggestion
,类型为completion
类型,使用自定义的分词器内容
// 酒店数据索引库
PUT /hotel
{"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {// 设置自定义拼音分词器"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},// 使用自定义分词器"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},// 使用自定义分词器"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},// 添加一个新字段suggestion,类型为completion类型"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}}
}
- 3)给
HotelDoc
类添加suggestion
字段,内容包含brand
、business
、city
public class HotelDoc {// ......private String brand;private String city;private String business;private List<String> S;public HotelDoc(Hotel hotel) {// ......this.brand = hotel.getBrand();this.city = hotel.getCity();this.business = hotel.getBusiness();// 组装suggestionif(this.business.contains("/")){// business有多个值,需要切割String[] arr = this.business.split("/");// 添加元素this.suggestion = new ArrayList<>();this.suggestion.add(this.brand);this.suggestion.add(this.city);Collections.addAll(this.suggestion, arr);} else {this.suggestion = Arrays.asList(this.brand, this.business, this.city);}}
}
- 4)重新导入数据到hotel索引库
- 5)DSL实现自动补全查询
GET /hotel/_search
{"suggest": {"mySuggestion": { // 自定义名字"text": "sh", // 关键字"completion": {"field": "suggestion", // 要补全的字段"skip_duplicates": true, // 跳过重复项"size": 10 // 查询10条数据}}}
}
- 5)RestAPI实现自动补全查询
@Test
public void testSuggestion() throws IOException {// 1.创建Request对象SearchRequest request = new SearchRequest("hotel");// 2.准备参数request.source().suggest(new SuggestBuilder().addSuggestion("mySuggestion", // 自定义名字SuggestBuilders.completionSuggestion("suggestion") // 要补全的字段.prefix("sh") // 关键字.skipDuplicates(true) // 跳过重复项.size(10)) // 查询10条数据);// 3.发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.处理结果Suggest suggest = response.getSuggest();// 根据名称获取补全结果CompletionSuggestion mySuggestion = suggest.getSuggestion("mySuggestion");// 获取options并遍历for (CompletionSuggestion.Entry.Option option : mySuggestion.getOptions()) {// 获取option中的textString text = option.getText().string();System.out.println(text);}
}
执行以上单元测试,得到如下结果:
12 数据同步
ES中的酒店数据来源于MySQL数据库,因此MySQL数据发生改变时,ES也必须跟着改变,这个就是ES与MySQL之间的数据同步。
12.1 实现方案
12.1.1 同步调用
如上图所示,hotel-demo酒店搜索服务对外提供了一个接口。hotel-admin酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,修改ES中的数据。这种方式实现简单,但业务耦合度较高。
12.1.2 异步通知
如上图所示,hotel-admin酒店管理服务在完成数据库操作后,发送对应的MQ消息到队列。hotel-demo酒店搜索服务监听MQ,接收到消息后完成ES数据修改。这种方式实现难度一般,且低耦合,但对MQ的可靠性依赖较高。
12.1.3 监听binlog
如上图所示,MySQL开启了binlog功能,hotel-admin酒店管理服务完成增、删、改操作都会记录在binlog中
。hotel-demo酒店搜索服务基于canal监听binlog变化,实时更新ES中的内容。这种方式完全解除服务间耦合,但开启binlog会增加数据库负担,且实现复杂度高。
12.2 异步通知实现数据同步
12.2.1 声明交换机和队列
使用docker安装rabbitmq的方法参考:RabbitMQ学习笔记(一)RabbitMQ部署、5种队列模型
- 1)引入依赖
<!--amqp-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 2)声明交换机和队列的名称
public class MqConstants {/*** 交换机名称*/public final static String HOTEL_EXCHANGE = "hotel.topic";/*** 新增和修改的队列名称*/public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";/*** 删除的队列名称*/public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";/*** 新增或修改的RoutingKey*/public final static String HOTEL_INSERT_KEY = "hotel.insert";/*** 删除的RoutingKey*/public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
- 3)声明交换机和队列
package com.star.sc.totel.mq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {@Beanpublic TopicExchange topicExchange(){return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);}@Beanpublic Queue insertQueue(){return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);}@Beanpublic Queue deleteQueue(){return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);}@Beanpublic Binding insertQueueBinding(){return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);}@Beanpublic Binding deleteQueueBinding(){return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);}
}
12.2.2 发送MQ消息
@Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testSaveHotel() {Hotel hotel = new Hotel();hotel.setId(2359697L);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
执行以上单元测试,向rabbitmq发送消息,在管理页面可以看到这条消息:
12.2.3 接收MQ消息并操作ES
package com.star.sc.totel.mq;import com.alibaba.fastjson.JSON;
import com.star.sc.totel.pojo.Hotel;
import com.star.sc.totel.pojo.HotelDoc;
import com.star.sc.totel.service.IHotelService;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class HotelListener {@Autowiredprivate RestHighLevelClient client;@Autowiredprivate IHotelService hotelService;/*** 监听酒店新增或修改的业务* @param id 酒店id*/@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)public void listenHotelInsertOrUpdate(Long id) throws IOException {System.out.println("监听到酒店新增或修改的业务,id=" + id);// 1.根据id查询酒店数据Hotel hotel = hotelService.getById(id);HotelDoc hotelDoc = new HotelDoc(hotel);// 2.发送请求IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);client.index(request, RequestOptions.DEFAULT);}/*** 监听酒店删除的业务* @param id 酒店id*/@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)public void listenHotelDelete(Long id){System.out.println("监听到酒店删除的业务,id=" + id);}}
启动该监听器,日志显示读取到了MQ消息:
…
本节完,更多内容请查阅分类专栏:微服务学习笔记
感兴趣的读者还可以查阅我的另外几个专栏:
- SpringBoot源码解读与原理分析
- MyBatis3源码深度解析
- Redis从入门到精通
- MyBatisPlus详解
- SpringCloud学习笔记