day07 Elasticsearch搜索引擎3

1、数据聚合

聚合(aggregations)可以让我们极其方便的实现对文档数据的统计、分析、运算。例如:

  • 什么品牌的手机最受欢迎?
  • 这些手机的平均价格、最高价格、最低价格?
  • 这些手机每月的销售情况如何?

实现这些统计功能比数据库的sql要方便得多,而且查询速度非常快,可以实现实时搜索效果。

1.1、聚合的分类

聚合常见的有三类:

  • 桶(Bucket)聚合:用来对文档做分组

    • TermAggregation:按照文档字段值进行分组,例如按照品牌值分组、按照国家分组
    • Date Histogram:按照日期阶梯进行分组,例如一周为一组,或者一个月为一组、一个季度为一组等等

  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等

    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求 max、min、avg、sum 等
  • 管道(pipeline)聚合:以其它聚合的结果为基础做聚合

注意:参加聚合的字段必须是 keyword、日期、数值、布尔类型,也就是聚合的字段一定是不能分词的

DSL实现聚合

现在,我们要统计所有数据中的酒店品牌有几种,其实就是按照品牌对数据进行分组。此时可以根据酒店品牌的名称做聚合,也就是Bucket聚合。

Bucket聚合语法

基本语法:

GET /hotel/_search
{"size": 0,  // 设置size为0,结果中不包含文档,只包含聚合结果"aggs": { // 定义聚合"brandAgg": { // 给聚合起个名字"terms": { // 聚合的类型,按照品牌值聚合,所以选择term"field": "brand", // 参与聚合的字段"size": 20 // 希望获取的聚合结果数量}}}
}

示例:

# 聚合功能
GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20}}}
}

聚合结果排序

默认情况下,Bucket聚合会统计Bucket内的文档数量,记为 _count*,*并且按照 *_*count 降序排序。

我们可以指定 order 属性,自定义聚合的排序方式:

GET /hotel/_search
{"size": 0, "aggs": {"brandAgg": {"terms": {"field": "brand","order": {"_count": "asc" // 按照_count升序排列},"size": 20}}}
}

示例:

# 聚合功能,自定义排序规则
GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"_count": "asc"}}}}
}

限定聚合范围

默认情况下,Bucket聚合是对索引库的所有文档做聚合,但真实场景下,用户会输入搜索条件,因此聚合是对搜索结果做聚合,那么聚合必须添加限定条件。

我们可以限定要聚合的文档范围,只需要添加 query 条件即可:

GET /hotel/_search
{"query": {"range": {"price": {"lte": 200 // 只对200元以下的文档聚合}}}, "size": 0, "aggs": {"brandAgg": {"terms": {"field": "brand","size": 20}}}
}

示例:

# 聚合功能,限定聚合范围
GET /hotel/_search
{"query": {"range": {"price": {"lte": 200}}}, "size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20}}}
}

Metric聚合语法

上面我们对酒店按照品牌进行分组,形成了一个个桶。现在我们需要对桶内的酒店做运算,获取每个品牌的用户评分的 min、max、avg 等值。

这就要用到 Metric 聚合了,例如 stats 聚合:就可以获取 min、max、avg 等结果

基本语法:

GET /hotel/_search
{"size": 0, "aggs": {"brandAgg": { "terms": { "field": "brand", "size": 20},"aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算"score_stats": { // 聚合名称"stats": { // 聚合类型,这里stats可以计算 min、max、avg 等"field": "score" // 聚合字段,这里是score}}}}}
}

示例:

# 嵌套聚合metric
GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"scoreAgg.avg": "desc"}},"aggs": {"scoreAgg": {"stats": {"field": "score"}}}}}
}

这次的 score_stats 聚合是在 brandAgg 的聚合内部嵌套的子聚合,因此我们需要在每个桶分别计算。

另外,我们还可以对聚合结果做个排序,例如按照每个桶的酒店平均分做排序:

1.3、RestAPI实现聚合

API语法

聚合条件与 query 条件同级别,因此需要使用 request.source() 来指定聚合条件。

聚合条件的语法:

聚合的结果也和查询结果不同,API也比较特殊。不过同样是JSON逐层解析:

示例:

     @Testvoid testAggregation() throws IOException {// 1.创建Request对象SearchRequest request = new SearchRequest("hotel");// 2.准备DSL// 2.1、设置sizerequest.source().size(0);// 2.1、聚合request.source().aggregation(AggregationBuilders// 聚合的名称.terms("brandAgg")// 聚合的字段.field("brand")// 聚合的数量.size(10));// 3.发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析聚合结果Aggregations aggregations = response.getAggregations();// 4.1、根据聚合名称获取聚合结果Terms brandTerms = aggregations.get("brandAgg");// 4.2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();// 4.3、遍历每一个bucketfor (Terms.Bucket bucket : buckets) {// 4.4、获取keyString key = bucket.getKeyAsString();System.out.println("key = " + key);}}

业务需求

需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:

分析:

目前,页面的城市列表、星级列表、品牌列表都是写死的,并不会随着搜索结果的变化而变化。但是用户搜索条件改变时,搜索结果会跟着变化。

例如:用户搜索“东方明珠”,那搜索的酒店肯定是在上海东方明珠附近,因此,城市只能是上海,此时城市列表中就不应该显示北京、深圳、杭州这些信息了。

也就是说,搜索结果中包含哪些城市,页面就应该列出哪些城市;搜索结果中包含哪些品牌,页面就应该列出哪些品牌。

如何得知搜索结果中包含哪些品牌?如何得知搜索结果中包含哪些城市?

使用聚合功能,利用 Bucket 聚合,对搜索结果中的文档基于品牌分组、基于城市分组,就能得知包含哪些品牌、哪些城市了。

因为是对搜索结果进行聚合,因此聚合是限定范围的聚合,也就是说聚合的限定条件跟搜索文档的条件一致。

查看浏览器可以发现,前端其实已经发出了这样的一个请求:

请求参数与搜索文档的参数完全一致。

返回值类型就是页面要展示的最终结果:

结果是一个Map结构:

  • key是字符串,城市、星级、品牌、价格
  • value是集合,例如多个城市的名称

业务实现

cn.itcast.hotel.web 包下的 HotelController 中添加一个方法,遵循下面的要求:

  • 请求方式:POST
  • 请求路径:/hotel/filters
  • 请求参数:RequestParams,与搜索文档的参数一致
  • 返回值类型:Map<String, List<String>>

代码:

     @PostMapping("/filters")public Map<String, List<String>> getFilters(@RequestBody RequestParams params) {return hotelService.filters(params);}

这里调用了 IHotelService 中的 getFilters 方法,尚未实现。

cn.itcast.hotel.service.IHotelService 中定义新方法:

Map<String, List<String>> filters(RequestParams params);

cn.itcast.hotel.service.impl.HotelService 中实现该方法:

     /*** 查询城市、星级、品牌的聚合结果** @param params* @return 聚合结果,格式:{"城市": ["上海", "北京"], "品牌": ["如家", "希尔顿"]}*/@Overridepublic Map<String, List<String>> filters(RequestParams params) {try {// 1.创建Request对象SearchRequest request = new SearchRequest("hotel");// 2.准备DSL// 2.1、querybuildBasicQuery(params, request);// 2.2、设置sizerequest.source().size(0);// 2.1、聚合buildAggregation(request);// 3.发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析聚合结果Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4.1、根据品牌名称,获取品牌结果List<String> brandList = getAggByName(aggregations, "brandAgg");result.put("品牌", brandList);// 4.2、根据城市名称,获取城市结果List<String> cityList = getAggByName(aggregations, "cityAgg");result.put("城市", cityList);// 4.3、根据星级名称,获取星级结果List<String> starList = getAggByName(aggregations, "starAgg");result.put("星级", starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}private List<String> getAggByName(Aggregations aggregations, String aggName) {// 1、根据聚合名称获取聚合结果Terms brandTerms = aggregations.get(aggName);// 2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();// 3、遍历每一个bucketList<String> brandList = new ArrayList<>();for (Terms.Bucket bucket : buckets) {// 4、获取keyString key = bucket.getKeyAsString();brandList.add(key);}return brandList;}private void buildAggregation(SearchRequest request) {// 对品牌进行聚合request.source().aggregation(AggregationBuilders// 聚合的名称.terms("brandAgg")// 聚合的字段.field("brand")// 聚合的数量.size(100));// 对城市进行聚合request.source().aggregation(AggregationBuilders// 聚合的名称.terms("cityAgg")// 聚合的字段.field("city")// 聚合的数量.size(100));// 对星级进行聚合request.source().aggregation(AggregationBuilders// 聚合的名称.terms("starAgg")// 聚合的字段.field("starName")// 聚合的数量.size(100));}private void buildBasicQuery(RequestParams params, SearchRequest request) {// 1.构建BooleanQueryBoolQueryBuilder boolQuery = QueryBuilders.boolQuery();// 关键字搜索String key = params.getKey();if (key == null || "".equals(key)) {boolQuery.must(QueryBuilders.matchAllQuery());} else {boolQuery.must(QueryBuilders.matchQuery("all", key));}// 城市条件if (params.getCity() != null && !params.getCity().equals("")) {boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));}// 品牌条件if (params.getBrand() != null && !params.getBrand().equals("")) {boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}// 星级条件if (params.getStarName() != null && !params.getStarName().equals("")) {boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));}// 价格if (params.getMinPrice() != null && params.getMaxPrice() != null) {boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));}// 2.算分控制FunctionScoreQueryBuilder functionScoreQuery =// 构建 function_socre 查询QueryBuilders.functionScoreQuery(// 原始查询,相关性算分的查询boolQuery,// function socre的数组new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{// 其中的一个function socre元素new FunctionScoreQueryBuilder.FilterFunctionBuilder(// 过滤条件QueryBuilders.termQuery("isAD", true),// 算分函数ScoreFunctionBuilders.weightFactorFunction(10))});// 3.放入sourcerequest.source().query(functionScoreQuery);}

2、自动补全

当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如图:

这种根据用户输入的字母,提示完整词条的功能,就是自动补全了。

因为需要根据拼音字母来推断,因此要用到拼音分词功能。

2.1、拼音分词器

要实现根据字母自动做补全,就必须对文档按照拼音进行分词。在 GitHub 上恰好有 elasticsearch 的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin

安装方式与IK分词器一样,分为三步:

1、解压

2、上传到虚拟机中,elasticsearch的 plugin 目录

/var/lib/docker/volumes/es-plugins/_data

3、重启 elasticsearch

4、测试

POST /_analyze
{"text": ["如家酒店还不错"],"analyzer": "pinyin"
}

2.2、自定义分词器

默认的拼音分词器会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音,所以需要对拼音分词器做个性化定制,形成自定义分词器。

elasticsearch中分词器(analyzer)的组成包含三部分:

  • character filters:在 tokenizer 之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如 keyword,就是不分词;还有 ik_smart
  • tokenizer filter:将 tokenizer 输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

文档分词时会依次由这三部分来处理文档:

我们可以在创建索引库时,通过 settings 来配置自定义的analyzer(分词器):

PUT /test // 创建一个test索引库
{"settings": {"analysis": {"analyzer": { // 自定义分词器"my_analyzer": {  // 分词器名称"tokenizer": "ik_max_word","filter": "py"}},"filter": { // 自定义tokenizer filter"py": { // 过滤器名称"type": "pinyin", // 过滤器类型,这里是pinyin"keep_full_pinyin": false, // 取消单个字的拼音,例如:刘德华 -> [liu,de,hua]"keep_joined_full_pinyin": true, // 加上全拼功能,例如:刘德华 -> [liudehua]"keep_original": true, // 保留中文"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}}
}

示例:

# 自定义拼音分词器
PUT /test
{"settings": {"analysis": {"analyzer": { "my_analyzer": { "tokenizer": "ik_max_word","filter": "py"}},"filter": {"py": { "type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer"}}}
}

测试:

POST /test/_analyze
{"text": ["如家酒店还不错"],"analyzer": "my_analyzer"
}

注意:为了避免搜索到同音字,搜索时不要使用拼音分词器

拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。

# 插入数据
POST /test/_doc/1
{"id": 1,"name": "狮子"
}
POST /test/_doc/2
{"id": 2,"name": "虱子"
}
# 搜索关键字
GET /test/_search
{"query": {"match": {"name": "掉入狮子笼咋办"}}
}

因此字段在创建倒排索引时应该用 my_analyzer 分词器;字段在搜索时应该使用 ik_smart 分词器

# 删除test索引库
DELETE /test# 自定义分词器
PUT /test
{"settings": {"analysis": {"analyzer": { "my_analyzer": { "tokenizer": "ik_max_word","filter": "py"}},"filter": {"py": { "type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}

再次测试:

2.3、自动补全查询

elasticsearch 提供了 Completion Suggester 查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高自动补全查询的效率,对于文档中字段的类型有一些约束:

  • 参与补全查询的字段必须是 completion 类型。
  • 字段的内容一般是用来补全的多个词条形成的数组。

比如,一个这样的索引库:

# 自动补全的索引库
PUT test2
{"mappings": {"properties": {"title":{"type": "completion"}}}
}

然后插入下面的数据:

# 示例数据
POST test2/_doc
{"title": ["Sony", "WH-1000XM3"]
}
POST test2/_doc
{"title": ["SK-II", "PITERA"]
}
POST test2/_doc
{"title": ["Nintendo", "switch"]
}

查询语法如下:

// 自动补全查询
GET /test/_search
{"suggest": {"title_suggest": { // 给查询起一个名称"text": "s", // 用户输入的关键字"completion": { // 自动补全的类型"field": "title", // 补全查询的字段"skip_duplicates": true, // 跳过重复的"size": 10 // 获取前10条结果}}}
}

示例:

# 自动补全查询
GET /test2/_search
{"suggest": {"titleSuggest": {"text": "s","completion": {"field": "title","skip_duplicates": true,"size": 10}}}
}

2.4、实现酒店搜索框自动补全

现在,我们的hotel索引库还没有设置拼音分词器,需要修改索引库中的配置。但是我们知道索引库是无法修改的,只能删除然后重新创建。

另外,我们需要添加一个字段,用来做自动补全,将 brand、suggestion、city 等都放进去,作为自动补全的提示。

因此,总结一下,我们需要做的事情包括:

1、修改hotel索引库结构,设置自定义拼音分词器

2、修改索引库的 name、all 字段,使用自定义分词器

3、索引库添加一个新字段 suggestion,类型为 completion 类型,使用自定义的分词器

4、给HotelDoc类添加suggestion字段,内容包含brand、business

5、重新导入数据到hotel库

修改酒店映射结构

# 删除酒店索引库
DELETE /hotel# 酒店索引库
PUT /hotel
{"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}}
}

注意:text_anlyzer 是用于全文检索的,需要进行分词(ik_max_word);而 completion_analyzer 是用于自动补全的,不需要进行分词(keyword

这里看下 name、all 字段,"analyzer": "text_anlyzer" 表示在创建倒排索引时用 text_anlyzer"search_analyzer": "ik_smart" 表示在搜索时用 ik_smart

再看看 suggestion ,这个字段是用来做自动补全的,它的类型是 completion,用的分词器是 completion_analyzer,也就是不分词直接转成拼音。

修改HotelDoc实体

HotelDoc 中要添加一个字段,用来做自动补全,内容可以是酒店品牌、城市、商圈等信息。按照自动补全字段的要求,最好是这些字段的数组。

因此我们在 HotelDoc 中添加一个 suggestion 字段,类型为List<String>,然后将 brand、city、business 等信息放到里面。

@Data
@NoArgsConstructor
public class HotelDoc {private Long id;private String name;private String address;private Integer price;private Integer score;// 品牌private String brand;private String city;private String starName;// 酒店所在的商圈private String business;private String location;private String pic;// 排序时的距离值private Object distance;// 广告标记private boolean isAD;// 自动补全的数组private List<String> suggestion;// 注意这两个get和set方法需要手动加上,不然自动生成的方法名是没有带get的,会导致广告图片无法正常显示public boolean getisAD() {return isAD;}public void setisAD(boolean AD) {isAD = AD;}public HotelDoc(Hotel hotel) {this.id = hotel.getId();this.name = hotel.getName();this.address = hotel.getAddress();this.price = hotel.getPrice();this.score = hotel.getScore();this.brand = hotel.getBrand();this.city = hotel.getCity();this.starName = hotel.getStarName();this.business = hotel.getBusiness();this.location = hotel.getLatitude() + ", " + hotel.getLongitude();this.pic = hotel.getPic();// 自动补全字段的处理if (this.business.contains("/")) {// business有多个值,需要切割String[] arr = this.business.split("/");this.suggestion = new ArrayList<>();// 添加品牌this.suggestion.add(this.brand);// 添加商圈Collections.addAll(this.suggestion, arr);} else {this.suggestion = Arrays.asList(this.brand, this.business);}}
}

重新导入数据到es中

重新执行之前编写的导入数据功能:

     /*** 批量新增文档** @throws IOException*/@Testvoid testBulkRequest() throws IOException {// 查询所有的酒店数据List<Hotel> list = hotelService.list();// 1.创建Request对象BulkRequest request = new BulkRequest();// 2.准备参数,添加多个新增的Requestfor (Hotel hotel : list) {// 2.1 转换为文档类型HotelDocHotelDoc hotelDoc = new HotelDoc(hotel);// 2.2 转jsonString json = JSON.toJSONString(hotelDoc);// 2.3 添加请求request.add(new IndexRequest("hotel").id(hotel.getId().toString()).source(json, XContentType.JSON));}// 3.发送请求client.bulk(request, RequestOptions.DEFAULT);}

可以看到新的酒店数据中包含了suggestion:

测试自动补全功能

GET /hotel/_search
{"suggest": {"suggestions": {"text": "h","completion": {"field": "suggestion","skip_duplicates": true,"size": 10}}}
}

RestAPI实现自动补全

请求参数构造的API:

自动补全的结果比较特殊,解析的代码如下:

实现搜索框自动补全

查看前端页面,可以发现当我们在输入框键入时,前端会发起 ajax 请求:

返回值是补全词条的集合,类型为 List<String>

1、在 cn.itcast.hotel.web 包下的 HotelController 中添加新接口,接收新的请求:

     @GetMapping("/suggestion")public List<String> getSuggestions(@RequestParam("key") String prefix) {return hotelService.getSuggestions(prefix);}

2、在 cn.itcast.hotel.service 包下的 IhotelService 中添加方法:

List<String> getSuggestions(String prefix);

3、在 cn.itcast.hotel.service.impl.HotelService 中实现该方法:

     @Overridepublic List<String> getSuggestions(String prefix) {try {// 1.创建Request对象SearchRequest request = new SearchRequest("hotel");// 2.准备DSLrequest.source().suggest(new SuggestBuilder()// 添加一个补全查询的名称.addSuggestion("suggestions",// 自动补全的字段名字SuggestBuilders.completionSuggestion("suggestion")// 自动补全的前缀.prefix(prefix)// 跳过重复的.skipDuplicates(true)// 最多显示10条数据.size(10)));// 3.发起请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析响应结果Suggest suggest = response.getSuggest();// 4.1、根据补全查询的名称,获取补全结果CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");// 4.2、获取optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();// 4.3、遍历optionsList<String> list = new ArrayList<>(options.size());for (CompletionSuggestion.Entry.Option option : options) {// 4.4、获取一个option中的text字段,也就是补全的词条String text = option.getText().toString();list.add(text);}return list;} catch (IOException e) {throw new RuntimeException(e);}}

4、测试:

3、数据同步(面试重点)

elasticsearch 中的酒店数据来自于 mysql 数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这就是 elasticsearch 和 mysql 之间的数据同步
问题。

3.1、思路分析

常见的数据同步方案有三种:

  • 同步调用
  • 异步通知
  • 监听binlog

方案一:同步调用

流程如下:

  • hotel-demo 对外提供接口,用来修改 elasticsearch 中的数据
  • 酒店管理服务在完成数据库操作后,直接调用 hotel-demo 提供的接口

优缺点:

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方案二:异步通知

流程如下:

  • hotel-admin 对 mysql 数据库的数据完成增、删、改后,发送 MQ 消息
  • hotel-demo 监听 MQ,接收到消息后完成对 elasticsearch 数据的修改

优缺点:

  • 优点:低耦合,实现难度一般
  • 缺点:依赖 mq 的可靠性

方案三:监听binlog

流程如下:

  • 给 mysql 开启 binlog 功能
  • mysql 完成增、删、改操作都会记录在 binlog 中
  • hotel-demo 基于 canal 监听 binlog 变化,实时更新 elasticsearch 中的内容

优缺点:

  • 优点:完全解除服务间的耦合
  • 缺点:开启 binlog 会增加数据库负担、实现复杂度高

3.2、实现数据同步

我们以异步通知为例,使用 MQ 消息中间件

思路

利用课前资料提供的 hotel-admin 项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对 elasticsearch 中的数据也要完成相同操作。

步骤:

  • 导入课前资料提供的 hotel-admin 项目,启动并测试酒店数据的 CRUD
  • 声明 exchange、queue、RoutingKey
  • 在 hotel-admin 中的增、删、改业务中完成消息发送
  • 在 hotel-demo 中完成消息监听,并更新 elasticsearch 中的数据
  • 启动并测试数据同步功能

导入demo

导入课前资料提供的 hotel-admin 项目:

运行后,访问 http://localhost:8099

声明交换机和队列

MQ结构如图:

1、引入依赖

在 hotel-admin、hotel-demo 中引入 rabbitmq 的依赖:

<!-- amqp依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置 rabbitmq

在 hotel-admin、hotel-demo 中的 application.yaml 中配置 rabbitmq

spring:rabbitmq:host: rabbitmq服务器ip地址port: 5672username: adminpassword: 283619virtual-host: /

3、声明队列和交换机名称

在 hotel-admin 和 hotel-demo 中的 cn.itcast.hotel.constatnts 包下新建一个类 MqConstants

/*** 声明队列和交换机的名称** @author xiexu* @create 2022-11-17 10:12*/
public class MqConstants {/*** 交换机*/public final static String HOTEL_EXCHANGE = "hotel.topic";/*** 监听新增或修改的队列*/public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";/*** 监听删除的队列*/public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";/*** 新增或修改的RoutingKey*/public final static String HOTEL_INSERT_KEY = "hotel.insert";/*** 删除的RoutingKey*/public final static String HOTEL_DELETE_KEY = "hotel.delete";}

4、声明队列和交换机

在 hotel-demo 中定义配置类,声明队列、交换机:

/*** 声明队列和交换机** @author xiexu* @create 2022-11-17 10:17*/
@Configuration
public class MqConfig {/*** 声明一个交换机** @return*/@Beanpublic TopicExchange topicExchange() {return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);}/*** 声明新增或修改的队列** @return*/@Beanpublic Queue insertQueue() {return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);}/*** 声明删除的队列** @return*/@Beanpublic Queue deleteQueue() {return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);}/*** 声明新增队列与交换机的绑定关系** @return*/@Beanpublic Binding insertQueueBinding() {return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);}/*** 声明删除队列与交换机的绑定关系** @return*/@Beanpublic Binding deleteQueueBinding() {return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);}}

发送MQ消息

在 hotel-admin 中的增、删、改业务中分别发送MQ消息:

@RestController
@RequestMapping("/hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/{id}")public Hotel queryById(@PathVariable("id") Long id) {return hotelService.getById(id);}@GetMapping("/list")public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page, @RequestParam(value = "size", defaultValue = "1") Integer size) {Page<Hotel> result = hotelService.page(new Page<>(page, size));return new PageResult(result.getTotal(), result.getRecords());}/*** 新增** @param hotel*/@PostMappingpublic void saveHotel(@RequestBody Hotel hotel) {// 新增酒店hotelService.save(hotel);/*** 发送MQ消息* 第一个参数:交换机* 第二个参数:RoutinKey* 第三个参数:为了节省资源,只发送酒店id,消费者拿到后通过id查询mysql数据库,就能获取到插入的酒店数据了*/rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}/*** 更新** @param hotel*/@PutMapping()public void updateById(@RequestBody Hotel hotel) {if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);/*** 发送MQ消息* 第一个参数:交换机* 第二个参数:RoutinKey* 第三个参数:为了节省资源,只发送酒店id,消费者拿到后通过id查询mysql数据库,就能获取到插入的酒店数据了*/rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}/*** 删除** @param id*/@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);/*** 发送MQ消息* 第一个参数:交换机* 第二个参数:RoutinKey* 第三个参数:酒店id*/rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);}
}

接收MQ消息

hotel-demo接收到MQ消息要做的事情包括:

  • 新增消息:根据传递的 hotel 的 id 查询 hotel 信息,然后新增一条数据到es索引库
  • 删除消息:根据传递的 hotel 的 id 删除索引库中的一条数据

操作

1、首先在 hotel-demo 的 cn.itcast.hotel.service 包下的 IHotelService 中添加新增、删除的业务

void insertById(Long id);void deleteById(Long id);

2、给 hotel-demo 中的 cn.itcast.hotel.service.impl 包下的 HotelService 中实现业务:

     @Overridepublic void insertById(Long id) {try {// 1.根据id查询酒店数据Hotel hotel = getById(id);// 2.转换为HotelDoc文档类型HotelDoc hotelDoc = new HotelDoc(hotel);// 3.转换成JSON格式String json = JSON.toJSONString(hotelDoc);// 1.创建Request对象IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());// 2.准备JSON文档request.source(json, XContentType.JSON);// 3.发送请求client.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void deleteById(Long id) {try {// 1.创建Request对象DeleteRequest request = new DeleteRequest("hotel", id.toString());// 2.发送请求client.delete(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}

3、编写监听器

在 hotel-demo 中的 cn.itcast.hotel.mq 包下新增一个类:

/*** @author xiexu* @create 2022-11-17 10:37*/
@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;/*** 监听酒店新增或修改的业务** @param id 酒店id*/@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE) // 监听的队列名称public void listenHotelInsertOrUpdate(Long id) {hotelService.insertById(id);}/*** 监听酒店删除的业务** @param id 酒店id*/@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE) // 监听的队列名称public void listenHotelDelete(Long id) {hotelService.deleteById(id);}}

4、elasticsearch集群

单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。

  • 海量数据存储问题:将索引库从逻辑上拆分成 N 个分片(shard),存储到多个节点
  • 单点故障问题:将分片数据在不同节点备份(replica)

ES集群相关概念:

  • 集群(cluster):一组拥有共同的 cluster name 的 节点。
  • 节点(node) :集群中的一个 Elasticearch 实例
  • 分片(shard):索引可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引的不同分片可以拆分到不同的节点中

解决问题:数据量太大,单点存储量有限的问题。

此处,我们把数据分成3片:shard0、shard1、shard2

  • 主分片(Primary shard):相对于副本分片的定义。
  • 副本分片(Replica shard)每个主分片可以有一个或者多个副本,数据和主分片一样。

数据备份可以保证高可用,但是每个分片备份一份,所需要的节点数量就会翻一倍,成本实在是太高了!

为了在高可用和成本间寻求平衡,我们可以这样做:

  • 首先对数据进行分片,存储到不同节点
  • 然后对每个分片进行备份,放到对方节点,完成互相备份

这样就可以大大减少所需要的服务节点数量,如图,我们以3分片,每个分片备份一份为例:

现在,每个分片都有1个备份,存储在3个节点:

  • node0:保存了分片0和1
  • node1:保存了分片0和2
  • node2:保存了分片1和2

4.1、部署ES集群

我们会在单机上利用 Docker 容器运行多个 Elasticsearch 实例来模拟集群。

可以直接使用 docker-compose 来完成,不过这要求你的 Linux 服务器至少有 4G ****以上的内存空间。

1、首先编写一个 docker-compose 文件,内容如下:

version: '2.2'
services:es01:image: elasticsearch:7.12.1 # 镜像container_name: es01 # 容器名称environment: # 环境变量- node.name=es01 # 节点名称- cluster.name=es-docker-cluster # 集群名称- discovery.seed_hosts=es02,es03 # 集群内其他节点的ip地址,因为docker容器内互联,所以直接写容器名称就可以了- cluster.initial_master_nodes=es01,es02,es03 # 初始化的主节点,表示这三台es节点可以参与选举- "ES_JAVA_OPTS=-Xms512m -Xmx512m" # JVM堆内存大小volumes: # 数据卷- data01:/usr/share/elasticsearch/dataports: # 端口映射- 9200:9200networks:- elastices02:image: elasticsearch:7.12.1container_name: es02environment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data02:/usr/share/elasticsearch/dataports: # 端口映射- 9201:9200networks:- elastices03:image: elasticsearch:7.12.1container_name: es03environment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data03:/usr/share/elasticsearch/dataports: # 端口映射- 9202:9200networks:- elasticvolumes:data01:driver: localdata02:driver: localdata03:driver: localnetworks:elastic:driver: bridge

示例:

version: '2.2'
services:es01:image: elasticsearch:7.12.1container_name: es01environment:- node.name=es01- cluster.name=es-docker-cluster- discovery.seed_hosts=es02,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data01:/usr/share/elasticsearch/dataports:- 9200:9200networks:- elastices02:image: elasticsearch:7.12.1container_name: es02environment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data02:/usr/share/elasticsearch/dataports: - 9201:9200networks:- elastices03:image: elasticsearch:7.12.1container_name: es03environment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data03:/usr/share/elasticsearch/dataports:- 9202:9200networks:- elasticvolumes:data01:driver: localdata02:driver: localdata03:driver: localnetworks:elastic:driver: bridge

2、es运行需要修改 Linux 系统权限,修改 /etc/sysctl.conf 文件

vi /etc/sysctl.conf

3、添加下面的内容:

vm.max_map_count=262144

4、然后执行命令,让配置生效

sysctl -p

5、通过 docker-compose 启动集群:

docker-compose up -d

6、查看每个es节点的日志

docker logs -f es01docker logs -f es02docker logs -f es03

4.2、集群状态监控

kibana 可以监控 es 集群状态,不过新版本需要依赖 es 的 x-pack 功能,配置比较复杂。

这里推荐使用 cerebro 来监控 es 集群状态,官方网站:https://github.com/lmenezes/cerebro

下载后解压打开 bin 目录下的 cerebro

访问 http://localhost:9000 即可进入管理界面

绿色的线条代表es集群处于健康状态

4.3、创建索引库

利用 kibana的DevTools创建索引库

在 DevTools 中输入指令:

PUT /itcast
{"settings": {"number_of_shards": 3, // 分片数量"number_of_replicas": 1 // 给每个分片添加的副本数量},"mappings": {"properties": {// mapping映射定义 ...}}
}

利用 cerebro 创建索引库

填写索引库信息:

回到首页,即可查看索引库分片效果:

4.4、集群职责划分

elasticsearch 中集群节点有不同的职责划分:

默认情况下,集群中的任何一个节点都同时具备上述四种角色。

真实的集群一定要将集群职责进行分离:

  • master 节点:对 CPU 要求高,但是对内存要求低
  • data 节点:对 CPU 和内存要求都高
  • coordinating 节点:对网络带宽、CPU 要求高

职责分离可以让我们根据不同节点的需求分配不同的硬件去部署,避免业务之间的互相干扰。

elasticsearch 中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色

一个典型的 es 集群职责划分如图:

LB指的是负载均衡器。

4.5、ES集群脑裂问题

默认情况下,每个节点都是 master eligible 节点(主节点),因此一旦 master 节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点发生网络故障时,可能发生脑裂问题。

例如在一个集群中,因为网络故障导致主节点与其它节点失联:

此时,node2 和 node3 认为 node1 宕机,就会重新选主:

当 node3 当选后,集群继续对外提供服务,node2 和 node3 自成集群,node1自成集群,两个集群数据不同步,出现数据差异。

当网络恢复后,因为集群中有两个 master 节点,集群状态的不一致,出现脑裂的情况:

为了避免发生脑裂问题,要求选票超过 (eligible节点数量 + 1)/ 2 才能当选为主,因此 eligible 节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在 es7.0 以后,已经成为默认配置,因此一般不会发生脑裂问题。

例如:3个节点形成的集群,选票必须超过 (3 + 1)/ 2 ,也就是2票。node3得到 node2 和 node3 的选票,当选为主。而 node1 只有自己 1 票,没有当选。集群中依然只有1个主节点,没有出现脑裂问题。

总结

master eligible节点的作用是什么?

  • 参与集群选主
  • 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求

data节点的作用是什么?

  • 数据的CRUD

coordinator节点的作用是什么?

  • 路由请求到其它节点
  • 合并查询到的结果,返回给用户

4.6、集群分布式存储

当新增文档时,应该保存到不同的分片,保证数据均衡,那么 coordinating node 如何确定数据该存储到哪个分片呢?

分布式存储测试

插入三条数据:

测试可以看到,三条数据分别在不同的分片:

结果:

分布式存储原理

elasticsearch 会通过 hash 算法来计算文档应该存储到哪个分片上:

coordinating node 根据 id 做 hash 运算,得到的结果对分片数量取余,余数就是对应要存储的分片

说明:

  • _routing 默认是文档的 id
  • 算法与分片数量有关,因此索引库一旦创建,分片数量就不能修改!

新增文档的流程如下:

深蓝色表示主分片,浅蓝色表示分片副本

  • 新增一个 id=1 的文档
  • 对 id 做 hash 运算,假如得到的结果是 2,则应该存储到 shard-2
  • shard-2 的主分片在 node3 节点,将数据路由到 node3 节点,由 node3 进行保存文档
  • 同步给 shard-2 的副本分片(R-2),在 node2 节点
  • 返回结果给 coordinating-node 节点(node1)

4.7、集群分布式查询

elasticsearch 的查询分成两个阶段:

  • scatter phase:分散阶段,coordinating node 会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node 汇总 data node 的搜索结果,并处理为最终结果集返回给用户

4.8、集群故障转移

集群的 master 节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。

例如一个集群结构如图,三个节点都是健康的。

现在 node1 是主节点,其它两个节点是从节点。突然,node1发生了故障:

宕机后的第一件事,需要重新选主,例如选中了node2:

node2成为主节点后,会检测集群监控状态,发现 P-1 没有副本分片,P-0 没有主分片。因此需要将 node1 上的数据迁移到 node2、node3,确保任何一个分片都至少有两份(一个主分片,一个副本分片):

总结:

  • 主节点(master)宕机后,候选主节点(EligibleMaster)选举为新的主节点。
  • 主节点(master)监控分片、节点状态,将故障节点上的分片转移到正常节点,确保数据安全。

day07 Elasticsearch搜索引擎3相关推荐

  1. 第三百六十二节,Python分布式爬虫打造搜索引擎Scrapy精讲—elasticsearch(搜索引擎)基本的索引和文档CRUD操作、增、删、改、查...

    第三百六十二节,Python分布式爬虫打造搜索引擎Scrapy精讲-elasticsearch(搜索引擎)基本的索引和文档CRUD操作.增.删.改.查 elasticsearch(搜索引擎)基本的索引 ...

  2. ElasticSearch搜索引擎: 内存分析与设置

    在 Elasticsearch 的运行过程中,如何合理分配与设置内存是一件十分重要的事情,否则十分容易出现各种问题. 一.Elasticsearch为什么吃内存: 我们先看下 ES 服务器的总体内存消 ...

  3. ElasticSearch搜索引擎常见面试题总结

    一.ElasticSearch基础: 1.什么是Elasticsearch: Elasticsearch 是基于 Lucene 的 Restful 的分布式实时全文搜索引擎,每个字段都被索引并可被搜索 ...

  4. 第三百六十节,Python分布式爬虫打造搜索引擎Scrapy精讲—elasticsearch(搜索引擎)的基本概念...

    第三百六十节,Python分布式爬虫打造搜索引擎Scrapy精讲-elasticsearch(搜索引擎)的基本概念 elasticsearch的基本概念 1.集群:一个或者多个节点组织在一起 2.节点 ...

  5. day06 Elasticsearch搜索引擎2

    day06 Elasticsearch搜索引擎2 1.DSL查询文档 1.1.DSL查询分类 Elasticsearch提供了基于JSON的DSL(Domain Specific Language)来 ...

  6. ElasticSearch搜索引擎详解-持续更新中

    ElasticSearch搜索引擎详解 1. ElasticSearch概述 1.1 elasticsearch是什么 1.2 全文搜索引擎 1.3 elasticsearch and solr 1. ...

  7. Elasticsearch搜索引擎(一)——基础使用

    Elasticsearch搜索引擎 关键词是中文的建议使用,英文和数字不要,模糊就行 如果普通数据库查询,无法解决如下问题 如果表记录上千万上亿了这个性能问题,另外一个如果有一个本文字段要在里面模糊配 ...

  8. 一步一步学爬虫(4)数据存储之Elasticsearch搜索引擎存储

    Elasticsearch搜索引擎存储 1. Elasticsearch 介绍 2. Elasticsearch 相关概念 3. 准备工作 3.1 下载程序 3.2 解压缩,配置文件修改 4. 创建索 ...

  9. php操作ElasticSearch搜索引擎流程详解

    更多python.php教程请到友情连接: 菜鸟教程https://www.piaodoo.com 茂名一技http://www.enechn.com ppt制作教程步骤 http://www.tpy ...

最新文章

  1. jsp中九大内置对象
  2. 校园安全责任重大 安防守护迭代升级
  3. visual studio 2015开发nodejs教程1搭建环境
  4. xhtml的行内描述性元素
  5. [LeetCode] 4Sum II 四数之和之二
  6. JDK 12:实际中的切换语句/表达式
  7. 将原生SQL功能Hibernate到您的Spring Data Repository中
  8. 前端学习(1667):前端系列实战课程之拖拽
  9. 命令行参数的作用_Rasa 聊天机器人专栏(二):命令行界面
  10. java 基本语法 二_java基础语法2
  11. 主流Ajax框架介绍
  12. cad补全三视图_机械制图课程中补全三视图的解题方法
  13. mysql按键精灵接口,mysql,按键精灵,读取写入
  14. ubuntu中安装微信
  15. 计算机主机自动关机如何设置,电脑怎么设置自动关机?电脑自动关机方法教程 电脑维修技术网...
  16. 当当年中庆,百万自营图书大放价,又有羊毛可以薅了
  17. Rust 官方入门程序(a Guessing Game)解读
  18. kurento和打洞的服务器的安装及部署
  19. ZOJ3587 Marlon's String KMP技巧处理
  20. android iOS App客户端如何实现在线支付

热门文章

  1. 国开《工业通风及除尘》终结性考试
  2. 理解悲观锁乐观锁、同步锁、读锁、写锁
  3. 《趣味知识博文》小W与小L带你聊天式备考CDA Level Ⅰ(二)
  4. oracle11g r2 64 补丁,win10系统下oracle11g R2的64位版本安装教程
  5. 小米商城抢购脚本_小米10系列MiCare保障服务上线:免费2次换屏 549元起
  6. 面试题目之:mvvm框架是什么?它与其他框架(jquery)的区别是什么?哪些场景适合?
  7. 大数据开发常用的编程语言有哪些?
  8. 【爬虫+多线程+MySQL】网抑云音乐评论爬取
  9. matlab 波前像差,波前像差(波阵面像差)的基本知识
  10. 2023最全软件测试学习路线图(从入门到精通)