乐忧商城

  • 13.搜索过滤
    • 13.1 生成分类和品牌过滤
    • 13.3 生成规格参数过滤
    • 13.4 过滤条件的筛选
    • 13.5 页面展示选择的过滤项
    • 13.6 取消过滤项
    • 13.7 优化
  • 14.thymeleaf及其静态化
    • 14.1 商品详情
    • 14.2 页面静态化
  • 15.RabbitMQ
    • 15.1 RabbitMQ简介
    • 15.2 五种消息模型
    • 15.3 Spring AMQP
    • 15.4 项目改造

13.搜索过滤

13.1 生成分类和品牌过滤

对于过滤功能,先看一下想要实现的效果:

整个过滤部分有3块:

  • 顶部的导航,已经选择的过滤条件展示:

    • 商品分类面包屑,根据用户选择的商品分类变化
    • 其它已选择过滤参数
  • 过滤条件展示,又包含3部分
    • 商品分类展示
    • 品牌展示
    • 其它规格参数
  • 展开或收起的过滤条件的按钮

顶部导航要展示的内容跟用户选择的过滤条件有关。

  • 比如用户选择了某个商品分类,则面包屑中才会展示具体的分类
  • 比如用户选择了某个品牌,列表中才会有品牌信息。

所以,这部分需要依赖第二部分:过滤条件的展示和选择。因此我们先不着急去做。展开或收起的按钮是否显示,取决于过滤条件有多少,如果很少,那么就没必要展示。所以也是跟第二部分的过滤条件有关。

这样分析来看,我们必须先做第二部分:过滤条件展示。

先来看分类和品牌。在我们的数据库中已经有所有的分类和品牌信息。在这个位置,是不是把所有的分类和品牌信息都展示出来呢?显然不是,用户搜索的条件会对商品进行过滤,而在搜索结果中,不一定包含所有的分类和品牌,直接展示出所有商品分类,让用户选择显然是不合适的。无论是分类信息,还是品牌信息,都应该从搜索的结果商品中进行聚合得到。

原来,我们返回的结果是PageResult对象,里面只有total、totalPage、items3个属性。但是现在要对商品分类和品牌进行聚合,数据显然不够用,我们需要对返回的结果进行扩展,添加分类和品牌的数据,由于后面也要返回聚合后的规格参数,因此这里统一扩展SearchResult类。

package com.leyou.search.pojo;import com.leyou.common.pojo.PageResult;
import com.leyou.item.pojo.Brand;import java.util.List;
import java.util.Map;public class SearchResult extends PageResult<Goods> {private List<Brand> brands;private List<Map<String,Object>> categories;private List<Map<String,Object>> specs;public List<Map<String, Object>> getSpecs() {return specs;}public void setSpecs(List<Map<String, Object>> specs) {this.specs = specs;}public SearchResult(Long total, List<Goods> items, List<Brand> brands, List<Map<String, Object>> categories, List<Map<String, Object>> specs) {super(total, items);this.brands = brands;this.categories = categories;this.specs = specs;}public SearchResult(Long total, Integer totalPage, List<Goods> items, List<Brand> brands, List<Map<String, Object>> categories, List<Map<String, Object>> specs) {super(total, totalPage, items);this.brands = brands;this.categories = categories;this.specs = specs;}public SearchResult(List<Brand> brands, List<Map<String, Object>> categories, List<Map<String, Object>> specs) {this.brands = brands;this.categories = categories;this.specs = specs;}public List<Brand> getBrands() {return brands;}public void setBrands(List<Brand> brands) {this.brands = brands;}public List<Map<String, Object>> getCategories() {return categories;}public void setCategories(List<Map<String, Object>> categories) {this.categories = categories;}
}

修改SearchServiceImpl:

public SearchResult search(SearchRequest request) {// 判断查询条件if (StringUtils.isBlank(request.getKey())) {// 返回默认结果集return null;}// 初始化自定义查询构建器NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();// 添加查询条件queryBuilder.withQuery(QueryBuilders.matchQuery("all", request.getKey()).operator(Operator.AND));// 添加结果集过滤,只需要:id,subTitle, skusqueryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{"id", "subTitle", "skus"}, null));// 获取分页参数Integer page = request.getPage();Integer size = request.getSize();// 添加分页queryBuilder.withPageable(PageRequest.of(page - 1, size));String categoryAggName = "categories";String brandAggName = "brands";queryBuilder.addAggregation(AggregationBuilders.terms(categoryAggName).field("cid3"));queryBuilder.addAggregation(AggregationBuilders.terms(brandAggName).field("brandId"));// 执行搜索,获取搜索的结果集AggregatedPage<Goods> goodsPage = (AggregatedPage<Goods>)this.goodsReponsitory.search(queryBuilder.build());// 解析聚合结果集List<Map<String, Object>> categories = getCategoryAggResult(goodsPage.getAggregation(categoryAggName));List<Brand> brands = getBrandAggResult(goodsPage.getAggregation(brandAggName));// 封装成需要的返回结果集return new SearchResult(goodsPage.getContent(), goodsPage.getTotalElements(), goodsPage.getTotalPages(), categories, brands);
}
/*** 解析品牌聚合结果集* @param aggregation* @return*/
private List<Brand> getBrandAggResult(Aggregation aggregation) {// 处理聚合结果集LongTerms terms = (LongTerms)aggregation;// 获取所有的品牌id桶List<LongTerms.Bucket> buckets = terms.getBuckets();// 定义一个品牌集合,搜集所有的品牌对象List<Brand> brands = new ArrayList<>();// 解析所有的id桶,查询品牌buckets.forEach(bucket -> {Brand brand = this.brandClient.queryBrandById(bucket.getKeyAsNumber().longValue());brands.add(brand);});return brands;// 解析聚合结果集中的桶,把桶的集合转化成id的集合// List<Long> brandIds = terms.getBuckets().stream().map(bucket -> bucket.getKeyAsNumber().longValue()).collect(Collectors.toList());// 根据ids查询品牌//return brandIds.stream().map(id -> this.brandClient.queryBrandById(id)).collect(Collectors.toList());// return terms.getBuckets().stream().map(bucket -> this.brandClient.queryBrandById(bucket.getKeyAsNumber().longValue())).collect(Collectors.toList());
}/*** 解析分类* @param aggregation* @return*/
private List<Map<String,Object>> getCategoryAggResult(Aggregation aggregation) {// 处理聚合结果集LongTerms terms = (LongTerms)aggregation;// 获取所有的分类id桶List<LongTerms.Bucket> buckets = terms.getBuckets();// 定义一个品牌集合,搜集所有的品牌对象List<Map<String, Object>> categories = new ArrayList<>();List<Long> cids = new ArrayList<>();// 解析所有的id桶,查询品牌buckets.forEach(bucket -> {cids.add(bucket.getKeyAsNumber().longValue());});List<String> names = this.categoryClient.queryNamesByIds(cids);for (int i = 0; i < cids.size(); i++) {Map<String, Object> map = new HashMap<>();map.put("id", cids.get(i));map.put("name", names.get(i));categories.add(map);}return categories;
}

页面渲染数据结构
我们可以把所有的过滤条件放入一个数组中,然后在页面利用v-for遍历一次生成。
其基本结构是这样的:

[{k:"过滤字段名",options:[{/*过滤字段值对象*/},{/*过滤字段值对象*/}]}
]

我们先在data中定义数组:filters,等待组装过滤参数:

data: {ly,search:{key: "",page: 1},goodsList:[], // 接收搜索得到的结果total: 0, // 总条数totalPage: 0, // 总页数filters:[] // 过滤参数集合
},

然后在查询搜索结果的回调函数中,对过滤参数进行封装:

页面渲染数据
我们注意到,虽然页面元素是一样的,但是品牌会比其它搜索条件多出一些样式,因为品牌是以图片展示。需要进行特殊处理。数据展示是一致的,我们采用v-for处理:

<div class="type-wrap" v-for="(f,i) in filters" :key="i" v-if="f.k !== '品牌'"><div class="fl key">{{f.k}}</div><div class="fl value"><ul class="type-list"><li v-for="(option, j) in f.options" :key="j"><a>{{option.name}}</a></li></ul></div><div class="fl ext"></div>
</div>
<div class="type-wrap logo" v-else><div class="fl key brand">{{f.k}}</div><div class="value logos"><ul class="logo-list"><li v-for="(option, j) in f.options" v-if="option.image"><img :src="option.image" /></li><li style="text-align: center" v-else><a style="line-height: 30px; font-size: 12px" href="#">{{option.name}}</a></li></ul></div><div class="fl ext"><a href="javascript:void(0);" class="sui-btn">多选</a></div>
</div>

13.3 生成规格参数过滤

有四个问题需要先思考清楚:

  • 什么时候显示规格参数过滤? 分类只有一个
  • 如何知道哪些规格需要过滤?
  • 要过滤的参数,其可选值是如何获取的?
  • 规格过滤的可选值,其数据格式怎样的?

1.什么情况下显示有关规格参数的过滤?
如果用户尚未选择商品分类,或者聚合得到的分类数大于1,那么就没必要进行规格参数的聚合。因为不同分类的商品,其规格是不同的。因此,我们在后台需要对聚合得到的商品分类数量进行判断,如果等于1,我们才继续进行规格参数的聚合。
**2.如何知道哪些规格需要过滤? **
我们不能把数据库中的所有规格参数都拿来过滤。因为并不是所有的规格参数都可以用来过滤,参数的值是不确定的。我们在设计规格参数时,已经标记了某些规格可搜索,某些不可搜索。因此,一旦商品分类确定,我们就可以根据商品分类查询到其对应的规格,从而知道哪些规格要进行搜索。
3.要过滤的参数,其可选值是如何获取的?
虽然数据库中有所有的规格参数,但是不能把一切数据都用来供用户选择。与商品分类和品牌一样,应该是从用户搜索得到的结果中聚合,得到与结果品牌的规格参数可选值。
4.要过滤的可选值,其数据格式怎样的?

具体怎么实现呢?总结为以下五步:

  • 1)用户搜索得到商品,并聚合出商品分类
  • 2)判断分类数量是否等于1,如果是则进行规格参数聚合
  • 3)先根据分类,查找可以用来搜索的规格
  • 4)对规格参数进行聚合
  • 5)将规格参数聚合结果整理后返回

具体的实现过程就不一一分析了,直接贴上代码即可:
GoodsServiceImpl类:

package com.leyou.search.service.impl;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.leyou.common.pojo.PageResult;
import com.leyou.item.pojo.*;
import com.leyou.search.client.BrandClient;
import com.leyou.search.client.CategoryClient;
import com.leyou.search.client.GoodsClient;
import com.leyou.search.client.SpecificationClient;
import com.leyou.search.pojo.Goods;
import com.leyou.search.pojo.SearchRequest;
import com.leyou.search.pojo.SearchResult;
import com.leyou.search.repository.GoodsRepository;
import com.leyou.search.service.SearchService;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilter;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;@Service
public class SearchServiceImpl implements SearchService {@Autowiredprivate BrandClient brandClient;@Autowiredprivate CategoryClient categoryClient;@Autowiredprivate GoodsClient goodsClient;@Autowiredprivate SpecificationClient specificationClient;@Autowiredprivate GoodsRepository goodsRepository;private static final ObjectMapper MAPPER = new ObjectMapper();/*** 将spu转化为Goods* @param spu* @return*/@Overridepublic Goods spuToGoods(Spu spu) throws IOException {Goods goods = new Goods();goods.setId(spu.getId());List<Long> ids = Arrays.asList(spu.getCid1(), spu.getCid2(), spu.getCid3());List<String> names = this.categoryClient.queryNamesById(ids);String brandName = this.brandClient.queryByid(spu.getBrandId()).getName();String all = spu.getTitle() + " " + StringUtils.join(names, " ") + " " + brandName;goods.setAll(all);goods.setSubTitle(spu.getSubTitle());goods.setBrandId(spu.getBrandId());goods.setCid1(spu.getCid1());goods.setCid2(spu.getCid2());goods.setCid3(spu.getCid3());goods.setCreateTime(spu.getCreateTime());List<Long> prices = new ArrayList<>();List<Map<String,Object>> skus = new ArrayList<>();List<Sku> skusList = this.goodsClient.querySkusBySpuId(spu.getId());skusList.forEach(sku -> {prices.add(sku.getPrice());Map<String,Object> map = new HashMap<>();map.put("id",sku.getId());map.put("title",sku.getTitle());String images = sku.getImages();map.put("images",StringUtils.isEmpty(images)?"":images.split(",")[0]);map.put("price",sku.getPrice());skus.add(map);});goods.setPrice(prices);goods.setSkus(MAPPER.writeValueAsString(skus));Map<String,Object> specs = new HashMap<>();List<SpecParam> params = this.specificationClient.queryParams(null, spu.getCid3(), null, true);SpuDetail spuDetail = this.goodsClient.querySpuDetailBySpuId(spu.getId());String genericSpec = spuDetail.getGenericSpec();String specialSpec = spuDetail.getSpecialSpec();Map<String,Object> genericSpecMap = MAPPER.readValue(genericSpec, new TypeReference<Map<String, Object>>() {});Map<String,List<Object>> specialSpecMap = MAPPER.readValue(specialSpec, new TypeReference<Map<String, List<Object>>>() {});params.forEach(param -> {if(param.getGeneric()){String value = genericSpecMap.get(param.getId().toString()).toString();if(param.getNumeric()){value = chooseSegment(value, param);}specs.put(param.getName(),value);}else{String value = specialSpecMap.get(param.getId().toString()).toString();specs.put(param.getName(),value);}});goods.setSpecs(specs);return goods;}/*** 根据searchRequest对象查找指定的goods集合* @param searchRequest* @return*/@Overridepublic SearchResult search(SearchRequest searchRequest) {//获得搜索的关键字String key = searchRequest.getKey();if(StringUtils.isEmpty(key)){return null;}NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();//QueryBuilder basicQuery = QueryBuilders.matchQuery("all", searchRequest.getKey()).operator(Operator.AND);QueryBuilder basicQuery = buildBoolQueryBuilder(searchRequest);queryBuilder.withQuery(basicQuery);queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{"id","subTitle","skus"},null));queryBuilder.withPageable(PageRequest.of(searchRequest.getPage()-1,searchRequest.getSize()));//添加品牌聚合String brandAggName="brands";queryBuilder.addAggregation(AggregationBuilders.terms(brandAggName).field("brandId"));//添加分类聚合String categoryAggName="categories";queryBuilder.addAggregation(AggregationBuilders.terms(categoryAggName).field("cid3"));//获取查询结果AggregatedPage<Goods> goods = (AggregatedPage<Goods>) this.goodsRepository.search(queryBuilder.build());//将查询结果转化成需要的形式List<Brand> brands = getBrandAggResult(goods.getAggregation(brandAggName));List<Map<String,Object>> categories = getCategoryAggResult(goods.getAggregation(categoryAggName));List<Map<String,Object>> specs = null;//如果分类只有一个就对相应的规格参数进行聚合,如果分类有多个则对所有规格参数进行聚合没有意义且浪费时间if(!CollectionUtils.isEmpty(categories) && categories.size() == 1){specs = getSpecsAggResult((Long)categories.get(0).get("id"), basicQuery);}SearchResult result = new SearchResult(goods.getTotalElements(), goods.getTotalPages(), goods.getContent(),brands,categories,specs);return result;}/*** 将值转化为区间表示,以方便搜索* @param value* @param p* @return*/private String chooseSegment(String value, SpecParam p) {double val = NumberUtils.toDouble(value);String result = "其它";// 保存数值段for (String segment : p.getSegments().split(",")) {String[] segs = segment.split("-");// 获取数值范围double begin = NumberUtils.toDouble(segs[0]);double end = Double.MAX_VALUE;if(segs.length == 2){end = NumberUtils.toDouble(segs[1]);}// 判断是否在范围内if(val >= begin && val < end){if(segs.length == 1){result = segs[0] + p.getUnit() + "以上";}else if(begin == 0){result = segs[1] + p.getUnit() + "以下";}else{result = segment + p.getUnit();}break;}}return result;}/*** 返回品牌聚合结果集* @param aggregation* @return*/private List<Brand> getBrandAggResult(Aggregation aggregation){// 强制转换LongTerms longTerms = (LongTerms) aggregation;return longTerms.getBuckets().stream().map(bucket -> this.brandClient.queryByid(bucket.getKeyAsNumber().longValue())).collect(Collectors.toList());}/*** 返回分类聚合结果集* @param aggregation* @return*/private List<Map<String,Object>> getCategoryAggResult(Aggregation aggregation){LongTerms longTerms = (LongTerms) aggregation;return longTerms.getBuckets().stream().map(bucket -> {Map<String,Object> map = new HashMap<>();Long id = bucket.getKeyAsNumber().longValue();List<String> names = this.categoryClient.queryNamesById(Arrays.asList(id));map.put("id",id);map.put("name",names.get(0));return map;}).collect(Collectors.toList());}/*** 返回规格参数的聚合结果* @param cid* @return*/private List<Map<String,Object>> getSpecsAggResult(Long cid, QueryBuilder basicQuery){List<Map<String,Object>> specs = new ArrayList<>();//根据cid查询所有的规格参数List<SpecParam> params = this.specificationClient.queryParams(null, cid, null, true);//构造新的查询NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();//传入查询条件queryBuilder.withQuery(basicQuery);//添加过滤集queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{},null));//添加聚合查询params.forEach(param -> {//将规格参数名称作为聚合名称queryBuilder.addAggregation(AggregationBuilders.terms(param.getName()).field("specs." + param.getName() + ".keyword"));});//执行查询AggregatedPage<Goods> search = (AggregatedPage<Goods>)this.goodsRepository.search(queryBuilder.build());//处理查询结果Map<String, Aggregation> aggregationMap = search.getAggregations().asMap();for (Map.Entry<String, Aggregation> entry : aggregationMap.entrySet()) {Map<String,Object> map = new HashMap<>();String key = entry.getKey();StringTerms value = (StringTerms)entry.getValue();map.put("key", key);List<String> collect = value.getBuckets().stream().map(bucket -> bucket.getKeyAsString()).collect(Collectors.toList());map.put("options", collect);specs.add(map);}return specs;}/*** 创建布尔查询* @param searchRequest* @return*/private QueryBuilder buildBoolQueryBuilder(SearchRequest searchRequest){BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();// 添加must条件boolQueryBuilder.must(QueryBuilders.matchQuery("all",searchRequest.getKey()).operator(Operator.AND));// 添加filter条件Map<String, Object> filter = searchRequest.getFilter();if(filter != null) {for (Map.Entry<String, Object> entry : filter.entrySet()) {String key = entry.getKey();String term;//将key转换成相应的查询字段if (StringUtils.equals("品牌", key)) {term = "brandId";} else if (StringUtils.equals("分类", key)) {term = "cid3";} else {term = "specs." + key + ".keyword";}boolQueryBuilder.filter(QueryBuilders.termQuery(term, entry.getValue()));}}return boolQueryBuilder;}
}

页面渲染
首先把后台传递过来的specs添加到filters数组:要注意:分类、品牌的option选项是对象,里面有name属性,而specs中的option是简单的字符串,所以需要进行封装,变为相同的结构:

data.specs.forEach(spec => {spec.options = spec.options.map(o => ({name: o}));_this.filters.push(spec);});

最后渲染的结果如下:

展示或收起过滤条件
如果感觉显示的太多了,我们可以通过按钮点击来展开和隐藏部分内容:


我们在data中定义变量,记录展开或隐藏的状态:

然后在按钮绑定点击事件,以改变show的取值:

在展示规格时,对show进行判断:

13.4 过滤条件的筛选

当我们点击页面的过滤项,要做哪些事情?

  • 把过滤条件保存在search对象中(watch监控到search变化后就会发送到后台)
  • 在页面顶部展示已选择的过滤项
  • 把商品分类展示到顶部面包屑
    保存过滤项
    我们把已选择的过滤项保存在search中:

    这里有一个很隐蔽的坑要注意,在created构造函数中会对search进行初始化,所以要在构造函数中对filter进行初始化,否则又会出现filter条件变了但是监听不生效的情况,因为钩子函数初始化search时没有filter对象,导致后面即使filter发生变化也不会被监听到

    search.filter是一个对象,结构:
{"过滤项名":"过滤项值"
}

然后给所有的过滤项绑定点击事件:

要注意,点击事件传2个参数:

  • k:过滤项的key
  • option:当前过滤项对象

在点击事件中,保存过滤项到selectedFilter:

selectFilter(k, o){const obj = {};Object.assign(obj, this.search);if(k === '分类' || k === '品牌'){o = o.id;}obj.filter[k] = o.name || o;this.search = obj;
}

另外,这里search对象中嵌套了filter对象,请求参数格式化时需要进行特殊处理,修改common.js中的一段代码:

关于后台代码的实现,已经粘贴过了,具体的实现步骤就直接看代码吧!

13.5 页面展示选择的过滤项

当用户选择一个商品分类以后,我们应该在过滤模块的上方展示一个面包屑,把三级商品分类都显示出来。用户选择的商品分类就存放在search.filter中,但是里面只有第三级分类的id:cid3,我们需要根据它查询出所有三级分类的id及名称。于是我们需要在后台提供相应的接口。

在页面重新加载完毕后,此时因为过滤条件中加入了商品分类的条件,所以查询的结果中只有1个分类。我们判断商品分类是否只有1个,如果是,则查询三级商品分类,添加到面包屑即可。

然后渲染:

其它过滤项
接下来,我们需要在页面展示用户已选择的过滤项,如图:

我们知道,所有已选择过滤项都保存在search.filter中,因此在页面遍历并展示即可。
基本有四类数据:

  • 商品分类:这个不需要展示,分类展示在面包屑位置
  • 品牌:这个要展示,但是其key和值不合适,我们不能显示一个id在页面。需要找到其name值
  • 数值类型规格:这个展示的时候,需要把单位查询出来
  • 非数值类型规格:这个直接展示其值即可

具体怎么实现也并不难,这里就略了。
隐藏已经选择的过滤项
现在,我们已经实现了已选择过滤项的展示,但是你会发现一个问题:已经选择的过滤项,在过滤列表中依然存在:这些已经选择的过滤项,应该从列表中移除。怎么做呢?你必须先知道用户选择了什么。用户选择的项保存在search.filter中:我们可以编写一个计算属性,把filters中的 已经被选择的key过滤掉:

computed:{remainFilters(){const keys = Object.keys(this.search.filter);if(this.search.filter.cid3){keys.push("分类")}if(this.search.filter.brandId){keys.push("品牌")}return this.filters.filter(f => !keys.includes(f.k));}
}

然后页面不再直接遍历filters,而是遍历remainFilters

最后发现,还剩下一堆没选过的。但是都只有一个可选项,此时再过滤没有任何意义,应该隐藏,所以,在刚才的过滤条件中,还应该添加一条:如果只剩下一个可选项,不显示

13.6 取消过滤项

我们能够看到,每个过滤项后面都有一个小叉,当点击后,应该取消对应条件的过滤。
思路非常简单:

  • 给小叉绑定点击事件
  • 点击后把过滤项从search.filter中移除,页面会自动刷新,OK

    绑定点击事件时,把k传递过去,方便删除
removeFilter(k){this.search.filter[k] = null;
}

13.7 优化

搜索系统需要优化的点:

  • 查询规格参数部分可以添加缓存
  • 聚合计算interval变化频率极低,所以可以设计为定时任务计算(周期为天),然后缓存起来。
  • elasticsearch本身有查询缓存,可以不进行优化
  • 商品图片应该采用缩略图,减少流量,提高页面加载速度
  • 图片采用延迟加载
  • 图片还可以采用CDN服务器
  • sku信息应该在页面异步加载,而不是放到索引库

14.thymeleaf及其静态化

14.1 商品详情

当用户搜索到商品,肯定会点击查看,就会进入商品详情页,接下来我们完成商品详情页的展示。在商品详情页中,我们会使用到Thymeleaf来渲染页面,所以需要先了解Thymeleaf的语法。其实我觉得Thymeleaf和jsp语法也差不多,学了不用就忘,然后再学,唉。

商品详情浏览量比较大,并发高,我们会独立开启一个微服务,用来展示商品详情,该微服务命名为leyou-goods-web,依然三步走。
1.引入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>leyou</artifactId><groupId>com.leyou.parent</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><groupId>com.leyou.goods</groupId><artifactId>leyou-goods-web</artifactId><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>com.leyou.item</groupId><artifactId>leyou-item-interface</artifactId><version>1.0.0-SNAPSHOT</version></dependency><dependency><groupId>com.leyou.common</groupId><artifactId>leyou-common</artifactId><version>1.0.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies></project>

2.添加配置文件application.yml

server:port: 8084
spring:application:name: goods-webthymeleaf:cache: falserabbitmq:host: 192.168.124.121virtual-host: /leyouusername: leyoupassword: leyou
eureka:client:service-url:defaultZone: http://127.0.0.1:10086/eurekaregistry-fetch-interval-seconds: 5

3.编写引导类:

package com.leyou;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class LeyouGoodsWebApplication {public static void main(String[] args) {SpringApplication.run(LeyouGoodsWebApplication.class,args);}
}

修改页面跳转路径

在zuul网关组件中添加路由:

zuul:prefix: /apiroutes:item-service: /item/** #路由到商品的微服务search-service: /search/** #路由到搜索的微服务goods-web: /goods/** #路由到商品详情的微服务user-service: /user/** #路由到用户的微服务auth-service: /auth/** #路由到授权的微服务cart-service: /cart/** #路由到购物车的微服务order-service: /order/** #路由到订单的微服务add-host-header: true #携带请求本身的head头信息sensitive-headers: # 配置禁止使用的头信息,这里设置为null,否则set-cookie无效

然后编写GoodsController类:

package com.leyou.goods.controller;import com.leyou.goods.service.GoodsService;
import com.leyou.goods.service.impl.GoodsHtmlServiceImpl;
import net.bytebuddy.asm.Advice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;import java.util.Map;@Controller
public class GoodsController {@Autowiredprivate GoodsService goodsService;@Autowiredprivate GoodsHtmlServiceImpl goodsHtmlService;@GetMapping("/item/{id}.html")public String queryDetailsOfItem(@PathVariable("id")Long id, Model model){Map<String,Object> map = this.goodsService.queryDetailsOfItem(id);model.addAllAttributes(map);goodsHtmlService.createHtml(id);return "item";}
}

GoodsServiceImpl类:

package com.leyou.goods.service.impl;import com.leyou.goods.client.BrandClient;
import com.leyou.goods.client.CategoryClient;
import com.leyou.goods.client.GoodsClient;
import com.leyou.goods.client.SpecificationClient;
import com.leyou.goods.service.GoodsService;
import com.leyou.item.pojo.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;import java.util.*;@Service
public class GoodsServiceImpl implements GoodsService {@Autowiredprivate BrandClient brandClient;@Autowiredprivate CategoryClient categoryClient;@Autowiredprivate GoodsClient goodsClient;@Autowiredprivate SpecificationClient specificationClient;/*** 1.三个分类 List<Map<String,Object>>categories* 2.一个品牌 brand* 3.spu(根据spu获取其title和subtitle名称)* 4.skus List<sku>需要获取sku的price,title,images* 5.spuDetail(商品介绍与售后保障)* 6.groups: List<SpecGroup> (SpecGroup对象里应该有List<SpecParam>) ,参数的值应该由sku确定(通用的参数和特殊的参数应该分情况赋值,通用的根据spuDetail即可,特殊的根据选中的sku赋值)* 有些参数是字符串格式,需要首先转化为json对象然后利用vue进行遍历,而不是直接用thymeleaf进行遍历,因为直接遍历无法遍历.* 7.paramMap List<Map<String,Object>> {"name":"特殊的规格参数名","content":[数组类型]} (查找所有的特殊规格参数List<SpecParam>,获取选定sku的own_spec)* generic_spec: {id:name,...}* special_spec:{id:["",""...],...}* @param spuId* @return*/@Overridepublic Map<String, Object> queryDetailsOfItem(Long spuId) {Map<String,Object> resultMap = new HashMap<>();//查询spuSpu spu = this.goodsClient.querySpuBySpuId(spuId);resultMap.put("spu",spu);//查询spuDetailSpuDetail spuDetail = this.goodsClient.querySpuDetailBySpuId(spuId);resultMap.put("spuDetail",spuDetail);//查询skusList<Sku> skus = this.goodsClient.querySkusBySpuId(spuId);resultMap.put("skus",skus);//查询品牌Brand brand = this.brandClient.queryByid(spu.getBrandId());resultMap.put("brand",brand);//查询分类List<Long> ids = Arrays.asList(spu.getCid1(), spu.getCid2(), spu.getCid3());List<String> names = this.categoryClient.queryNamesById(ids);List<Map<String,Object>> categories = new ArrayList<>();for (int i = 0; i < ids.size(); i++) {Map<String,Object> map = new HashMap<>();map.put("id",ids.get(i));map.put("name",names.get(i));categories.add(map);}resultMap.put("categories",categories);//查询规格参数组List<SpecGroup> specGroups = this.specificationClient.querySpecGroupsByCid(spu.getCid3());resultMap.put("groups",specGroups);//查询特殊规格参数List<SpecParam> params = this.specificationClient.queryParams(null, spu.getCid3(), false, null);Map<Long,String> paramMap = new HashMap<>();params.forEach(param -> {paramMap.put(param.getId(),param.getName());});resultMap.put("paramMap",paramMap);//返回结果return resultMap;}}

渲染商品列表

这个部分需要渲染的数据有5块:

  • sku图片
  • sku标题
  • 副标题
  • sku价格
  • 特有规格属性列表

其中,sku 的图片、标题、价格,都必须在用户选中一个具体sku后,才能渲染。而特有规格属性列表可以在spuDetail中查询到。而副标题则是在spu中,直接可以在页面渲染。因此,我们先对特有规格属性列表进行渲染。等用户选择一个sku,再通过js对其它sku属性渲染。

渲染规格属性列表
规格属性列表将来会有事件和动态效果。我们需要有js代码参与,不能使用Thymeleaf来渲染了。因此,这里我们用vue,不过需要先把数据放到js对象中,方便vue使用

我们在页面的head中,定义一个js标签,然后在里面定义变量,保存与sku相关的一些数据:

<script th:inline="javascript">// sku集合const skus = /*[[${skus}]]*/ [];// 规格参数id与name对const paramMap = /*[[${params}]]*/ {};// 特有规格参数集合const specialSpec = JSON.parse(/*[[${spuDetail.specialSpec}]]*/ "");
</script>
  • specialSpec:这是SpuDetail中唯一与Sku相关的数据
    因此我们并没有保存整个spuDetail,而是只保留了这个属性,而且需要手动转为js对象。
  • paramMap:规格参数的id和name键值对,方便页面根据id获取参数名
  • skus:sku集合

通过Vue渲染
我们把刚才获得的几个变量保存在Vue实例中:

然后在页面中渲染:

<div id="specification" class="summary-wrap clearfix"><dl v-for="(v,k) in specialSpec" :key="k"><dt><div class="fl title"><i>{{paramMap[k]}}</i></div></dt><dd v-for="(str,j) in v" :key="j"><a href="javascript:;" class="selected">{{str}}<span title="点击取消选择">&nbsp;</span></a></dd></dl>
</div>

数据成功渲染了。不过我们发现所有的规格都被勾选了。这是因为现在,每一个规格都有式:selected,我们应该只选中一个,让它的class样式为selected才对!

规格属性的筛选
每一个规格项是数组中的一个元素,因此我们只要保存被选择的规格项的索引,就能判断哪个是用户选择的了!我们需要一个对象来保存用户选择的索引,格式如下:

{"4":0,"12":0,"13":0
}

但问题是,第一次进入页面时,用户并未选择任何参数。因此索引应该有一个默认值,我们将默认值设置为0。我们在head的script标签中,对索引对象进行初始化:

然后将其保存在vue之中。
页面改造
我们在页面中,通过判断indexes的值来判断当前规格是否被选中,并且给规格绑定点击事件,点击规格项后,修改indexes中的对应值:

<div id="specification" class="summary-wrap clearfix"><dl v-for="(v,k) in specialSpec" :key="k"><dt><div class="fl title"><i>{{paramMap[k]}}</i></div></dt><dd v-for="(str,j) in v" :key="j"><a href="javascript:;" :class="{selected: j===indexes[k]}" @click="indexes[k]=j">{{str}}<span v-if="j===indexes[k]" title="点击取消选择">&nbsp;</span></a></dd></dl>
</div>

确定SKU
在我们设计sku数据的时候,就已经添加了一个字段:indexes,这其实就是规格参数的索引组合。而我们在页面中,用户点击选择规格后,就会把对应的索引保存起来,因此,我们可以根据这个indexes来确定用户要选择的sku。我们在vue中定义一个计算属性,来计算与索引匹配的sku:

computed:{sku(){const index = Object.values(this.indexes).join("_");return this.skus.find(s => s.indexes == index);}
}

渲染sku列表
既然已经拿到了用户选中的sku,接下来,就可以在页面渲染数据了

商品图片是一个字符串,以,分割,页面展示比较麻烦,所以我们编写一个计算属性:images(),将图片字符串变成数组:

computed: {sku(){const index = Object.values(this.indexes).join("_");return this.skus.find(s=>s.indexes==index);},images(){return this.sku.images ? this.sku.images.split(",") : [''];}
},

将页面改造成如下即可:

<div class="zoom"><!--默认第一个预览--><div id="preview" class="spec-preview"><span class="jqzoom"><img :jqimg="images[0]" :src="data:images[0]" width="400px" height="400px"/></span></div><!--下方的缩略图--><div class="spec-scroll"><a class="prev">&lt;</a><!--左右按钮--><div class="items"><ul><li v-for="(image, i) in images" :key="i"><img :src="data:image" :bimg="image" onmousemove="preview(this)" /></li></ul></div><a class="next">&gt;</a></div>
</div>

注意商品详情是HTML代码,我们不能使用 th:text,应该使用th:utext

<!--商品详情-->
<div class="intro-detail" th:utext="${spuDetail.description}">
</div>

规格包装
规格包装分成两部分:

  • 规格参数
  • 包装列表

而且规格参数需要按照组来显示
规格参数的值分为两部分:

  • 通用规格参数:保存在SpuDetail中的genericSpec中
  • 特有规格参数:保存在sku的ownSpec中

所以我们需要把这两部分值取出来,放到groups中。
从spuDetail中取出genericSpec并取出groups:

把genericSpec引入到Vue实例:

因为sku是动态的,所以我们编写一个计算属性,来进行值的组合:

groups(){groups.forEach(group => {group.params.forEach(param => {if(param.generic){// 通用属性,去spu的genericSpec中获取param.v = this.genericSpec[param.id] || '其它';}else{// 特有属性值,去SKU中获取param.v = JSON.parse(this.sku.ownSpec)[param.id]}})})return groups;
}

然后页面渲染:

<div class="Ptable"><div class="Ptable-item" v-for="group in groups" :key="group.name"><h3>{{group.name}}</h3><dl><div v-for="p in group.params"><dt>{{p.name}}</dt><dd>{{p.v + (p.unit || '')}}</dd></div></dl></div>
</div>

包装列表
包装列表在商品详情中,我们一开始并没有赋值到Vue实例中,但是可以通过Thymeleaf来渲染

<div class="package-list"><h3>包装清单</h3><p th:text="${spuDetail.packingList}"></p>
</div>

售后服务
售后服务也可以通过Thymeleaf进行渲染:

<div id="three" class="tab-pane"><p>售后保障</p><p th:text="${spuDetail.afterService}"></p>
</div>

14.2 页面静态化

问题分析
现在,我们的页面是通过Thymeleaf模板引擎渲染后返回到客户端。在后台需要大量的数据查询,而后渲染得到HTML页面。会对数据库造成压力,并且请求的响应时间过长,并发能力不高。那么怎么解决呢?首先我们能想到的就是缓存技术,比如之前学习过的Redis。不过Redis适合数据规模比较小的情况。假如数据量比较大,例如我们的商品详情页。每个页面如果10kb,100万商品,就是10GB空间,对内存占用比较大。此时就给缓存系统带来极大压力,如果缓存崩溃,接下来倒霉的就是数据库了。所以缓存并不是万能的,某些场景需要其它技术来解决,比如静态化。
什么是静态化
静态化是指把动态生成的HTML页面变为静态内容保存,以后用户的请求到来,直接访问静态页面,不再经过服务的渲染。而静态的HTML页面可以部署在nginx中,从而大大提高并发能力,减小tomcat压力。
如何实现静态化
目前,静态化页面都是通过模板引擎来生成,而后保存到nginx服务器来部署。常用的模板引擎比如:

  • Freemarker
  • Velocity
  • Thymeleaf

我们之前就使用的Thymeleaf,来渲染html返回给用户。Thymeleaf除了可以把渲染结果写入Response,也可以写到本地文件,从而实现静态化。

Thymeleaf实现静态化
先说下Thymeleaf中的几个概念:

  • Context:运行上下文
  • TemplateResolver:模板解析器
  • TemplateEngine:模板引擎

1.Context
上下文: 用来保存模型数据,当模板引擎渲染时,可以从Context上下文中获取数据用于渲染。当与SpringBoot结合使用时,我们放入Model的数据就会被处理到Context,作为模板渲染的数据使用。
2.TemplateResolver
模板解析器:用来读取模板相关的配置,例如:模板存放的位置信息,模板文件名称,模板文件的类型等等。当与SpringBoot结合时,TemplateResolver已经由其创建完成,并且各种配置也都有默认值,比如模板存放位置,其默认值就是:templates。比如模板文件类型,其默认值就是html。
3.TemplateEngine
模板引擎:用来解析模板的引擎,需要使用到上下文、模板解析器。分别从两者中获取模板中需要的数据,模板文件。然后利用内置的语法规则解析,从而输出解析后的文件。来看下模板引擎进行处理的函数:

templateEngine.process("模板名", context, writer);

三个参数:

  • 模板名称
  • 上下文:里面包含模型数据
  • writer:输出目的地的流

在输出时,我们可以指定输出的目的地,如果目的地是Response的流,那就是网络响应。如果目的地是本地文件,那就实现静态化了。而在SpringBoot中已经自动配置了模板引擎,因此我们不需要关心这个。现在我们做静态化,就是把输出的目的地改成本地文件即可!
GoodsHtmlServiceImpl类:

package com.leyou.goods.service.impl;import com.leyou.goods.service.GoodsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thymeleaf.TemplateEngine;
import org.thymeleaf.context.Context;import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;@Service
public class GoodsHtmlServiceImpl {// thymeleaf启动器已经自动注入了模板引擎@Autowiredprivate TemplateEngine engine;@Autowiredprivate GoodsService goodsService;public void createHtml(Long spuId){//初始化运行环境Context context = new Context();//设置变量context.setVariables(this.goodsService.queryDetailsOfItem(spuId));PrintWriter printWriter = null;try {File file = new File("D:\\ApplicationProgram\\nginx-1.14.0\\html\\api\\goods\\item\\"+spuId+".html");printWriter = new PrintWriter(file);engine.process("item",context,printWriter);} catch (FileNotFoundException e) {e.printStackTrace();} finally {if(printWriter != null){printWriter.close();}}}
/*** 新建线程处理页面静态化* @param spuId*/public void asyncExcute(Long spuId) {ThreadUtils.execute(()->createHtml(spuId));/*ThreadUtils.execute(new Runnable() {@Overridepublic void run() {createHtml(spuId);}});*/}
}

线程工具类:

public class ThreadUtils {private static final ExecutorService es = Executors.newFixedThreadPool(10);public static void execute(Runnable runnable) {es.submit(runnable);}
}

什么时候创建静态文件
我们编写好了创建静态文件的service,那么问题来了:什么时候去调用它呢?想想这样的场景:假如大部分的商品都有了静态页面。那么用户的请求都会被nginx拦截下来,根本不会到达我们的leyou-goods-web服务。只有那些还没有页面的请求,才可能会到达这里。因此,如果请求到达了这里,我们除了返回页面视图外,还应该创建一个静态页面,那么下次就不会再来麻烦我们了。所以,我们在GoodsController中添加逻辑,去生成静态html文件:

@GetMapping("{id}.html")
public String toItemPage(@PathVariable("id")Long id, Model model){// 加载所需的数据Map<String, Object> map = this.goodsService.loadModel(id);// 把数据放入数据模型model.addAllAttributes(map);// 页面静态化this.goodsHtmlService.asyncExcute(id);return "item";
}

注意:生成html 的代码不能对用户请求产生影响,所以这里我们使用额外的线程进行异步创建。
nginx代理静态页面
接下来,我们修改nginx,让它对商品请求进行监听,指向本地静态页面,如果本地没找到,才进行反向代理:

server {listen       80;server_name  www.leyou.com;proxy_set_header X-Forwarded-Host $host;proxy_set_header X-Forwarded-Server $host;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;location /api/goods/item {# 先找本地# root html;# if (!-f $request_filename) { #请求的文件不存在,就反向代理proxy_pass http://127.0.0.1:10010;#    break;# }}location /api/goods {proxy_pass http://127.0.0.1:10010;proxy_connect_timeout 600;proxy_read_timeout 600;}location / {proxy_pass http://127.0.0.1:9002;proxy_connect_timeout 600;proxy_read_timeout 600;}}

实际测试时发现静态化之后访问速度大大增加。

15.RabbitMQ

15.1 RabbitMQ简介

目前我们已经完成了商品详情和搜索系统的开发。我们思考一下,是否存在问题?

  • 商品的原始数据保存在数据库中,增删改查都在数据库中完成。
  • 搜索服务数据来源是索引库,如果数据库商品发生变化,索引库数据不能及时更新。
  • 商品详情做了页面静态化,静态页面数据也不会随着数据库商品发生变化。

如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?
这里有两种解决方案:

  • 方案1:每当后台对商品做增删改操作,同时要修改索引库数据及静态页面
  • 方案2:搜索服务和商品页面服务对外提供操作接口,后台在商品增删改后,调用接口

以上两种方式都有同一个严重问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。

所以,我们会通过另外一种方式来解决这个问题:消息队列

什么是消息队列
消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

结合前面所说的问题:

  • 商品服务对商品增删改以后,无需去操作索引库或静态页面,只是发送一条消息,也不关心消息被谁接收。
  • 搜索服务和静态页面服务接收消息,分别去处理索引库和静态页面。

如果以后有其它系统也依赖商品服务的数据,同样监听消息即可,商品服务无需任何代码修改。

MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

常见MQ产品

  • ActiveMQ:基于JMS
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
  • Kafka:分布式消息系统,高吞吐量

我将我的rabbitMQ安装在虚拟机上。

15.2 五种消息模型

创建一个新工程,并导入相关依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.rabbitmq</groupId><artifactId>itcast-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.2.RELEASE</version></parent><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

然后添加配置文件:

spring:rabbitmq:host: 192.168.124.121username: leyoupassword: leyouvirtual-host: /leyou

编写一个工具类,用于建立与rabbitMQ的连接:

package cn.itcast.rabbitmq.util;import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("192.168.124.121");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/leyou");factory.setUsername("leyou");factory.setPassword("leyou");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}}

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。

1.基本消息模型
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
总之:
生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
生产者:

package cn.itcast.rabbitmq.direct;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/*** 生产者,模拟为商品服务*/
public class Send {private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "商品删除了, id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());System.out.println(" [商品服务:] Sent '" + message + "'");channel.close();connection.close();}
}

消费者1:

package cn.itcast.rabbitmq.direct;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;
/*** 消费者1*/
public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_1";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

消费者2:

package cn.itcast.rabbitmq.direct;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;
/*** 消费者2*/
public class Recv2 {private final static String QUEUE_NAME = "direct_exchange_queue_2";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

消息确认机制(ACK)
消息一旦被消费者接收,队列中的消息就会被删除。那么问题来了:RabbitMQ怎么知道消息被接收了呢?如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  • 自动ACK:消息一旦被接收,消费者自动发送ACK
  • 手动ACK:消息接收后,不会发送ACK,需要手动调用

哪种更好取决于消息的重要性:

  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
  • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
    如何手动ACK呢?
public class Recv2 {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");// 手动进行ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列,第二个参数false,手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);}
}

注意最后一行代码

channel.basicConsume(QUEUE_NAME, false, consumer);

如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。

2.work消息模型
工作队列或者竞争消费者模式

在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
所以避免消息堆积的方法有两种:

  • 1)采用workqueue,多个消费者监听同一队列。
  • 2)接收到消息以后,通过线程池,异步消费。

接下来我们来模拟这个流程:
生产者:

package cn.itcast.rabbitmq.work;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 生产者
public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i = 0; i < 50; i++) {// 消息内容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 2);}// 关闭通道和连接channel.close();connection.close();}
}

消费者1:

package cn.itcast.rabbitmq.work;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;// 消费者1
public class Recv {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 设置每个消费者同时只能处理一条消息channel.basicQos(1);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");try {// 模拟完成任务的耗时:1000msThread.sleep(1000);} catch (InterruptedException e) {}// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}

消费者2:

package cn.itcast.rabbitmq.work;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;//消费者2
public class Recv2 {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 设置每个消费者同时只能处理一条消息channel.basicQos(1);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}

能者多劳

  • 消费者1比消费者2的效率要低,一次任务的耗时较长
  • 然而两人最终消费的消息数量是一样的
  • 消费者2大量时间处于空闲状态,消费者1一直忙碌

现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。

怎么实现呢?

我们可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

订阅模型分类
在之前的模式中,我们创建了一个工作队列。 工作队列背后的假设是:每个任务只被传递给一个工作人员。 在这一部分,我们将做一些完全不同的事情 - 我们将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。

1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
X(Exchanges):交换机一方面接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange类型有以下几种:

Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.订阅模型-Fanout

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个消费者
  • 2) 每个消费者有自己的queue(队列)
  • 3) 每个队列都要绑定到Exchange(交换机)
  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 5) 交换机把消息发送给绑定过的所有队列
  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

生产者:
两个变化:

  • 1) 声明Exchange,不再声明Queue
  • 2) 发送消息到Exchange,不再发送到Queue
package cn.itcast.rabbitmq.fanout;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello everyone";// 发布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [生产者] Sent '" + message + "'");channel.close();connection.close();}
}

消费者1:

package cn.itcast.rabbitmq.fanout;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;//消费者1
public class Recv {private final static String QUEUE_NAME = "fanout_exchange_queue_1";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}

消费者2:

package cn.itcast.rabbitmq.fanout;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;
// 消费者2
public class Recv2 {private final static String QUEUE_NAME = "fanout_exchange_queue_2";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}

4.订阅模型-Direct
有选择性的接收消息.在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。
在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)。消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
生产者:

package cn.itcast.rabbitmq.direct;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/*** 生产者,模拟为商品服务*/
public class Send {private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "商品删除了, id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());System.out.println(" [商品服务:] Sent '" + message + "'");channel.close();connection.close();}
}

消费者1:

package cn.itcast.rabbitmq.direct;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;
/*** 消费者1*/
public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_1";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

消费者2:

package cn.itcast.rabbitmq.direct;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;
/*** 消费者2*/
public class Recv2 {private final static String QUEUE_NAME = "direct_exchange_queue_2";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

5.订阅模型-Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!

Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert
通配符规则:

`#`:匹配一个或多个词
`*`:匹配不多不少恰好1个词

audit.#:能够匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs
生产者:
使用topic类型的Exchange,发送消息的routing key有3种: item.isnert、item.update、item.delete:

package cn.itcast.rabbitmq.topic;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/*** 生产者,模拟为商品服务*/
public class Send {private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为topic// 第三个参数表明需要持久化channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);// 消息内容String message = "新增商品 : id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品// 第三个参数可以写成MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化消息channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());System.out.println(" [商品服务:] Sent '" + message + "'");channel.close();connection.close();}
}

消费者1:

package cn.itcast.rabbitmq.topic;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;
/*** 消费者1*/
public class Recv {private final static String QUEUE_NAME = "topic_exchange_queue_1";private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列// 第二个参数表示队列的持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

消费者2:

package cn.itcast.rabbitmq.topic;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;
/*** 消费者2*/
public class Recv2 {private final static String QUEUE_NAME = "topic_exchange_queue_2";private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列// 第二个参数表示队列的持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

持久化
如何避免消息丢失?
1) 消费者的ACK机制。可以防止消费者丢失消息。
2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。

是可以将消息进行持久化呢?
要将消息持久化,前提是:队列、Exchange都持久化
交换机持久化

队列持久化

消息持久化

15.3 Spring AMQP

Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。如何使用Spring-amqp呢?
添加依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中添加:

spring:rabbitmq:host: 192.168.124.121username: leyoupassword: leyouvirtual-host: /leyou

添加监听者:
在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。

@Component
public class Listener {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "true"),exchange = @Exchange(value = "spring.test.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"#.#"}))public void listen(String msg){System.out.println("接收到消息:" + msg);}
}
  • @Componet:类上的注解,注册到Spring容器
  • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
    • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:

      • value:这个消费者关联的队列。值是@Queue,代表一个队列
      • exchange:队列所绑定的交换机,值是@Exchange类型
      • key:队列和交换机绑定的RoutingKey

类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
AmqpTemplate
Spring最擅长的事情就是封装,把他人的框架进行封装和整合。Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

红框圈起来的是比较常用的3个方法,分别是:

  • 指定交换机、RoutingKey和消息体
  • 指定消息
  • 指定RoutingKey和消息,会向默认的交换机发送消息

测试代码

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {@Autowiredprivate AmqpTemplate amqpTemplate;@Testpublic void testSend() throws InterruptedException {String msg = "hello, Spring boot amqp";this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);// 等待10秒后再结束Thread.sleep(10000);}
}

15.4 项目改造

接下来,我们就改造项目,实现搜索服务、商品静态页的数据同步。
思路分析
发送方:商品微服务

  • 什么时候发?
    当商品服务对商品进行写操作:增、删、改的时候,需要发送一条消息,通知其它服务。
  • 发送什么内容?
    对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此我们只发送商品id,其它服务可以根据id查询自己需要的信息。

接收方:搜索微服务、静态页面微服务
接收消息后如何处理?

  • 搜索微服务:

    • 增/改:添加新的数据到索引库
    • 删:删除索引库数据
  • 静态页微服务:
    • 增/改:创建新的静态页
    • 删:删除原来的静态页

商品服务发送消息
1.引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置文件

spring:rabbitmq:host: 192.168.124.121username: leyoupassword: leyouvirtual-host: /leyoutemplate:exchange: leyou.item.exchangepublisher-confirms: true
  • template:有关AmqpTemplate的配置

    • exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
  • publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

3.改造GoodsService
在GoodsService中封装一个发送消息到mq的方法:(需要注入AmqpTemplate模板)

private void sendMessage(Long id, String type){// 发送消息try {this.amqpTemplate.convertAndSend("item." + type, id);} catch (Exception e) {logger.error("{}商品消息发送异常,商品id:{}", type, id, e);}
}

这里没有指定交换机,因此默认发送到了配置中的:leyou.item.exchange
注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑
然后在新增的时候调用:

修改的时候调用:

搜索服务接收消息
搜索服务接收到消息后要做的事情:

  • 增:添加新的数据到索引库
  • 删:删除索引库数据
  • 改:修改索引库数据

因为索引库的新增和修改方法是合二为一的,因此我们可以将这两类消息一同处理,删除另外处理。
引入依赖和添加配置我们跳过,直接进入监听器的编写:

package com.leyou.search.listener;import com.leyou.item.pojo.Spu;
import com.leyou.search.client.GoodsClient;
import com.leyou.search.pojo.Goods;
import com.leyou.search.repository.GoodsRepository;
import com.leyou.search.service.SearchService;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;//必须将listener注入spring容器
@Component
public class GoodsListener {@Autowiredprivate GoodsRepository goodsRepository;@Autowiredprivate GoodsClient goodsClient;@Autowiredprivate SearchService searchService;//添加rabbitMQ注解,指定交换机的类型,名称,队列名称以及其绑定的routingKey@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "LEYOU_SEARCH_SAVE_QUEUE", durable = "true"),exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.insert","item.update"}))// 异常一定要抛出,因为spring会根据该方法是否抛出异常来决定是否发送ACK确认,若没有抛出异常则spring自动帮我们发送确认,若抛出异常,则消息仍然会继续存在于// 消息队列之中public void save(Long id) throws IOException {if(id == null){return;}Spu spu = this.goodsClient.querySpuBySpuId(id);Goods goods = this.searchService.spuToGoods(spu);this.goodsRepository.save(goods);}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "LEYOU_SEARCH_DELETE_QUEUE", durable = "true"),exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.delete"}))public void delete(Long id){if(id == null){return;}this.goodsRepository.deleteById(id);}
}

静态页面服务接收消息
商品静态页服务接收到消息后的处理:

  • 增:创建新的静态页
  • 删:删除原来的静态页
  • 改:创建新的静态页并覆盖原来的

不过,我们编写的创建静态页的方法也具备覆盖以前页面的功能,因此:增和改的消息可以放在一个方法中处理,删除消息放在另一个方法处理。

package com.leyou.goods.listener;import com.leyou.goods.service.impl.GoodsHtmlServiceImpl;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.File;@Component
public class GoodsListener {@Autowiredprivate GoodsHtmlServiceImpl goodsHtmlService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "LEYOU_GOODS_SAVE_QUEUE",durable = "true"),exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.insert","item.update"}))public void save(Long id){if(id == null){return;}this.goodsHtmlService.createHtml(id);}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "LEYOU_GOODS_DELETE_QUEUE",durable = "true"),exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.delete"}))public void delete(Long id){if(id == null){return;}File file = new File("D:\\ApplicationProgram\\nginx-1.14.0\\html\\api\\goods\\item\\" + id + ".html");file.deleteOnExit();}
}

乐忧商城项目总结-4相关推荐

  1. 乐忧商城项目总结-1

    乐忧商城 1.springboot 1.1 springboot基本介绍 1.2 springboot快速入门 1.3 默认配置的原理 小结 1.4 springboot整合常用模块 整合spring ...

  2. 乐忧商城项目总结-2

    乐忧商城 6.商品分类 6.1 搭建后台管理的前端页面 6.2 Vuetify框架 6.3 使用域名访问本地项目 6.4 实现商品分类查询 7.品牌查询 8.品牌新增及fastDFS 8.1 品牌新增 ...

  3. 乐忧商城项目总结-3

    乐忧商城 10.商品管理 10.1 商品新增 10.2 商品修改 10.3 搭建前台系统 11.elasticsearch 11.1 elasticsearch介绍及其安装 11.2 操作索引 11. ...

  4. java学习day58(乐友商城)乐友商城项目搭建、SE6语法使用

    复习springCloud总结: 今日内容: 了解电商行业 了解乐优商城项目结构 能独立搭建项目基本框架 能参考使用ES6的新语法 1.了解电商行业 学习电商项目,自然要先了解这个行业,所以我们首先来 ...

  5. 商城项目介绍以及ES6的新语法

    0.学习目标 了解电商行业 了解乐优商城项目结构 能独立搭建项目基本框架 能参考使用ES6的新语法 1.了解电商行业 学习电商项目,自然要先了解这个行业,所以我们首先来聊聊电商行业 1.1.项目分类 ...

  6. day04-乐优商城项目搭建

    0.学习目标 了解电商行业 了解乐优商城项目结构 能独立搭建项目基本框架 能参考使用ES6的新语法 1.了解电商行业 学习电商项目,自然要先了解这个行业,所以我们首先来聊聊电商行业 1.1.项目分类 ...

  7. day01-乐优商城项目搭建

    0.学习目标 了解电商行业 了解乐优商城项目结构 能独立搭建项目基本框架 能参考使用ES6的新语法 1.了解电商行业 学习电商项目,自然要先了解这个行业,所以我们首先来聊聊电商行业 1.1.项目分类 ...

  8. 【javaWeb微服务架构项目——乐优商城day15】——会调用订单系统接口,实现订单结算功能,实现微信支付功能

    0.学习目标 会调用订单系统接口 实现订单结算功能 实现微信支付功能 源码笔记及资料: 链接:https://pan.baidu.com/s/1_opfL63P1pzH3rzLnbFiNw 提取码:v ...

  9. 乐优商城之项目搭建(四)

    文章目录 (一)项目分类 (二)电商行业 (三)专业术语 (四)项目介绍 (五)技术选型 (六)开发环境 (七)搭建后台环境:父工程 (八)搭建后台环境:eureka (九)搭建后台环境:zuul ( ...

最新文章

  1. Thinkpad SL400 issue
  2. python爬虫软件-Python爬虫工具篇 - 必会用的6款Chrome插件
  3. 2-Authentication Framework Chain of Trust
  4. 过滤器获取service方法返回慢_Gateway:自定义过滤器
  5. 内网通 去广告 代码_一文秒懂Facebook广告投放常见专业术语
  6. 模拟输入(ADC-A0)
  7. Optimizing Code with GCC
  8. loadrunner发送json_Loadrunner接口测试-发送JSON格式的请求
  9. SQL Server 加密层级
  10. python 帮助 autocad_python 使用pyautocad操作AutoCAD
  11. Ubuntu18.04编译pulseaudio14.x(八)
  12. Android--从相册中选取照片并返回结果
  13. 【三级网络技术】IP地址聚合考点
  14. 变更DirectX SDK版本-DirectX8升级DirectX9
  15. 速达3000 数据库备份文件分析
  16. ctrl+alt+方向键 与win7的旋转屏幕冲突解决
  17. 经济学人精读丨中国的电子商务
  18. JQuery实现灯箱特效
  19. [艾兰岛]编辑器做传送门——kura酱长期更新
  20. 英飞凌 DAVE™ 4.1.2 SDK 开发app学习笔记——什么是DAVE APP?

热门文章

  1. Jemalloc 深入分析 之 配对堆Pairing Heap
  2. 【转录调控网络】典型的基因转录调控网络推导方法——布尔网络
  3. python 经验模态分解_经验模态分解下的日内趋势交易策略 附源码
  4. cad延伸命令怎么用_CAD拉伸怎么用
  5. JS 倒计时展示小工具
  6. c++实现天干地支纪年法
  7. Unity Shader PostProcessing - 11 - Depth Fog/Height Fog - 雾效/深度雾效/高度雾/深度+高度雾
  8. Python创建数学动画
  9. 栅格数据像元大小0.000几的处理方法或重采样失败显示像元过大或者过小
  10. 迪赛智慧数——柱状图(象形动态图):不同性别消费者点外卖频率