前言:

这是1张相对简单些报表示例,用sql语句也能实现,但主要是为了便于大家理解ES聚合实现报表的使用方法。

之前写过"java实现日报表、月报表统计,没数据补0"文章,https://blog.csdn.net/JRocks/article/details/113841913,方法是用sql语句查询数据库得到结果,然后通过java代码实现日期无数据自动补0,这种方法是非常简单方便。

但也有弊端,如有些数据展示用sql语句关联查询非常麻烦并且效率低下、或者说是sql语句实现不了、又或者sql+java代码实现也很麻烦、又或者是随着业务数据迅速增长,复杂的sql关联查询导致页面响应速度慢等等情况,所以考虑使用ES来做处理,一是可以提高查询效率,二是ES可以自动补0。

产品需求:

需求描述:

  • 首先大家可以理解为这是1张订单表,表中有“购买周期、购买来源”字典字段;
  • 这是1张年月日报表,同时筛选条件允许跨年、跨月、跨日,我们需要做的就是根据查询条件将数据从“合计、购买周期、购买来源”3个维度进行汇总,特别需要说明一点就是"人数"汇总字段,相同用户需要去重
  • 项目已经上线,所以既需要对历史数据进行推送ES处理,又需要对单笔订单购买进行数据推送ES处理;

思路步骤:很重要

  1. kibana中新建ES数据索引;
  2. 新建批量添加ES索引数据的方法;
  3. 历史数据初始化推送ES对应索引中;
  4. kibana中新建ES查询模板;
  5. kibana中新建ES汇总查询模板
    (注:也就是产品需求中表格下的最后一列合计相关字段内容,是显示当前查询条件下的所有汇总,不只是显示当前分页数据结果);
  6. 对ES查询模板的数据结果进行汇总返回前端JSON数据;
  7. 单笔订单购买进行数据推送ES处理;

注明:这是微服务项目,文章中提到的是2个module,其中关于ES相关的代码单独是1个module,称为dg-search,在代码实战中我会列出。

代码实战:按照思路步骤逐步展开

一:kibana中新建ES数据索引;
首先得搭建好ES,搭建过程这里不展开,kibana界面如下

# 1、新建保证金购买统计表索引
PUT dg_financial_order_report
{"mappings": {"properties": {"orderTime":{"type": "date","format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis","fields": {"keyword": {"type": "keyword","ignore_above": 20}}},"totalType":{"type": "integer"},"deadline":{"type": "integer"},"orderType":{"type": "integer"},"qty":{"type": "long"},"orderMoney":{"type": "float"},"people":{"type": "long"}}}
}

二:新建批量添加ES索引数据的方法(dg-search项目)

// -------------------------------------------------------------------------------------------
// Feign接口
package com.dg.mall.search.feign.feign;@FeignClient(name = "dg-search", path = "/dg-search/api/bzjFinanceReportEs")
public interface BzjFinanceReportFeignService {/*** 批量新增保证金购买表数据*/@PostMapping("/batchAddOrUpdateDepositOrder")void batchAddOrUpdateDepositOrderEsMapping(@RequestBody List<DepositOrderMappingReq> list);/*** 保证金购买表数据查询* @param depositBucketsReq* @return*/@GetMapping("/listDepositOrder")List<DepositOrderBucketsRes> listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq);/*** 保证金购买表--统计金额* @param depositBucketsReq* @return*/@GetMapping("/getDepositOrderTotal")DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq);
}// -------------------------------------------------------------------------------------------
// Feign接口实现类
@RestController
@RequestMapping("/api/bzjFinanceReportEs")
public class BzjFinanceReportFeignProvider implements BzjFinanceReportFeignService {@Autowired
private RestHighLevelClient restHighLevelClient;/**
* 批量新增保证金购买表数据
*/
@Overridepublic void batchAddOrUpdateDepositOrderEsMapping(List<DepositOrderMappingReq> list) {try {if (CollectionUtils.isEmpty(list)) {return;}BulkRequest bulkRequest = new BulkRequest();IndexRequest request = null;for (DepositOrderMappingReq req : list) {request = new IndexRequest("POST");request.index( "索引名"); // dg_financial_order_reportrequest.id(req.getId());request.source(JSON.toJSONString(req), XContentType.JSON);bulkRequest.add(request);}restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkListener);} catch (Exception e) {LogUtil.error("批量添加保证金存款表es数据失败! message:{} --", e, e.getMessage());throw new ServiceException(SearchExceptionEnum.BATCH_ADD_BZJ_ORDER_ES_FAIL);}}
}

实体类

/*** <p>*  保证金购买统计表--批量新增ES数据,请求实体* <p>*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DepositOrderMappingReq {/*** 存款ID*/private String id;/*** 购买日期*/private String orderTime;/*** 合计*/private Integer totalType = null;/*** 购买周期*/private Integer deadline = null;/*** 购买来源*/private Integer orderType = null;/*** 笔数*/private Long qty = 0L;/*** 金额*/private BigDecimal orderMoney = BigDecimal.ZERO;/*** 人数(已去重)*/private Long people = 0L;
}/*** <p>* 保证金购买统计表--ES数据查询请求实体* <p>*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DepositBucketsReq {/*** 开始时间*/private String gteTime;/*** 结束时间*/private String lteTime;/*** 统计类型(年月日)*/private String interval;/*** 时间格式化*/private String format;
}

三:历史数据初始化推送ES索引
说明:这里采用任务调度的方式执行(没有启用),便于操作数据初始化,只是上线之后手动执行一次该定时任务。

package com.dg.mall.financial.jobhandle;/*** <p>* 推送Es数据定时任务* <p>*/
@Service
public class SyncDepositOrderToEsHandle extends IJobHandler {@Resourceprivate DepositOrderService orderService;@Resourceprivate BzjFinanceReportFeignService bzjFinanceReportFeignService;@Resourceprivate DepositRollReportFeignService depositRollReportFeignService;@Resourceprivate FinancialProducer financialProducer;private static ThreadLocal<String> threadLocal = new ThreadLocal<>();@XxlJob("syncDepositOrderToEsHandle")@Overridepublic ReturnT<String> execute(String param) throws Exception {// 这里采用这种写法的目的是:执行任务调度后可以马上返回结果提示threadLocal.set(param);Thread thread = new Thread(new SyncDepositOrderToEsHandle.SynExecute());thread.start();Thread expiredThread = new Thread(new SyncDepositOrderToEsHandle.SynExpiredExecute());expiredThread.start();return ReturnT.SUCCESS;}class SynExecute implements Runnable {@Overridepublic void run() {//考虑数据量过大,进行分页推送数据,每次推送100条int current = 1, size = 100;for (; ; ) {PageVO pageVO = new PageVO<>(current, size);pageVO.setSearchCount(false);// 查询所有订单数据,按合计、购买来源、购买周期进行区分,见下面的mapping.xmlPageVO<DepositOrderMappingRes> res = orderService.getSyncPushOrderDataToEs(pageVO, threadLocal.get());if (ObjectUtil.isEmpty(res.getRecords()) || res.getRecords().size() < 1) {break;}final List<DepositOrderMappingReq> orderMappingReqs = Lists.newArrayList();for (DepositOrderMappingRes mappingRes : res.getRecords()) {DepositOrderMappingReq reportReq = orderService.batchAddOrderData(mappingRes);orderMappingReqs.add(reportReq);}if (CollectionUtils.isNotEmpty(orderMappingReqs)) {// 调用dg-search批量新增的ES的方法bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(orderMappingReqs);}current++;}}}
}

orderService.getSyncPushOrderDataToEs() 方法的mapping.xml

<select id="getSyncPushOrderDataToEs"resultType="com.dg.mall.financial.vo.res.deposit.DepositOrderMappingRes">-- 1、合计SELECTo.id AS id,DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,0 AS totalType,NULL AS deadline,NULL AS orderType,count( * ) AS qty,sum( o.order_money ) AS orderMoney,o.user_id AS userIdFROMdg_deposit_order oJOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_idWHERE<![CDATA[ o.order_status < 4 ]]><if test="param != null and param !=''"><![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]></if>GROUP BYorderTime,userIdUNION ALL-- 2、购买周期SELECTo.id AS id,DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,NULL AS totalType,dos.deadline AS deadline,NULL AS orderType,count( * ) AS qty,sum( o.order_money ) AS orderMoney,o.user_id AS userIdFROMdg_deposit_order oJOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_idWHERE<![CDATA[ o.order_status < 4 ]]><if test="param != null and param !=''"><![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]></if>GROUP BYorderTime,userId,deadlineUNION ALL-- 3、购买来源SELECTo.id AS id,DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,NULL AS totalType,NULL AS deadline,o.order_type AS orderType,count( * ) AS qty,sum( o.order_money ) AS orderMoney,o.user_id AS userIdFROMdg_deposit_order oJOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_idWHERE<![CDATA[ o.order_status < 4 ]]><if test="param != null and param !=''"><![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]></if>GROUP BYorderTime,userId,orderType</select>

四:kibana中新建ES查询模板
有小伙伴会问:为什么使用查询模板,它有什么好处呢?

  • 首先:我认为更好上手,可以提升你的成功自信心,“模板”从字面上讲就是给人一种套用,更简单这种感觉;
  • 第二:更直观、可以减少java代码量,让你更专心关注ES的聚合写法,如果你手动用java代码写过ES的复杂聚合,你会发现使用模板确实更方便好用;
# 保证金购买模板查询
GET /_search/template
{"id":"search_order_collect_template","params":{"gteTime":"2021-02-02","lteTime":"2021-02-02","interval":"day","format":"yyyy-MM-dd"}
}# 保证金购买模板
POST _scripts/search_order_collect_template
{"script": {"lang": "mustache","source": {"size": 0,"query": {"range": {"orderTime": {"gte": "{{gteTime}}","lte": "{{lteTime}}","format": "yyyy-MM-dd"}}},"aggs": {"group_date_histogram_data": {"date_histogram": {"field": "orderTime","calendar_interval": "{{interval}}","format": "{{format}}"},"aggs": {"group_totalType_data": {"terms": {"field": "totalType"},"aggs": {"qty": {"sum": {"field": "qty"}},"orderMoney": {"sum": {"field": "orderMoney"}},"people": {"sum": {"field": "people"}}}},"group_orderType": {"terms": {"field": "orderType"},"aggs": {"qty": {"sum": {"field": "qty"}},"orderMoney": {"sum": {"field": "orderMoney"}},"people": {"sum": {"field": "people"}}}},"group_deadline": {"terms": {"field": "deadline"},"aggs": {"qty": {"sum": {"field": "qty"}},"orderMoney": {"sum": {"field": "orderMoney"}},"people": {"sum": {"field": "people"}}}}}}}}}
}

模板查询显示效果

java代码(dg-search项目)

 //feign接口/*** 保证金购买表数据查询* @param depositBucketsReq* @return*/@GetMapping("/listDepositOrder")List<DepositOrderBucketsRes> listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq);// 实现类@Overridepublic List<DepositOrderBucketsRes> listDepositOrder(DepositBucketsReq depositBucketsReq) {final Map<String, Object> params = (Map<String, Object>) JSON.toJSON(depositBucketsReq);String templateName = elasticsearchConfig.bzjConfig.getSearch_order_collect_template();LogUtil.info("模板名称:{}  请求参数为:{}", templateName, params.toString());SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName,params, restHighLevelClient);Aggregations aggregations = searchResponse.getAggregations();// 和统计模板的不同之处ParsedDateHistogram dateHistogram = aggregations.get("group_date_histogram_data");if (dateHistogram.getBuckets().size() < 1) {return null;}return getDepositOrderCollect(dateHistogram.getBuckets());}private List<DepositOrderBucketsRes> getDepositOrderCollect(List<? extends Histogram.Bucket> buckets){List<DepositOrderBucketsRes> res = Lists.newLinkedList();buckets.stream().forEach(dateBucket -> {String dateString = dateBucket.getKeyAsString();DepositOrderBucketsRes build = DepositOrderBucketsRes.builder().orderTime(dateString).build();Aggregations aggregations = dateBucket.getAggregations();// 获取合计聚合数据getOrderSumAggsData(aggregations, build);// 获取购买周期数据getOrderDeadlineAggsData(build, aggregations);// 获取购买来源数据getOrderSourceAggsData(aggregations, build);res.add(build);});return res;}/*** 获取合计聚合数据* @param aggregations* @param build*/public void getOrderSumAggsData(Aggregations aggregations, DepositOrderBucketsRes build) {ParsedTerms totalTypeData = aggregations.get("group_totalType_data");List<? extends Terms.Bucket> totalTypeDataBuckets = totalTypeData.getBuckets();totalTypeDataBuckets.stream().forEach(bucket -> {Aggregations agg = bucket.getAggregations();ParsedSum sumQty = agg.get("qty");ParsedSum sumMoney = agg.get("orderMoney");ParsedSum sumPeople = agg.get("people");build.setSumQty(new BigDecimal(sumQty.getValueAsString()).intValue());build.setSumMoney(NumberUtil.round(new BigDecimal(sumMoney.getValueAsString()), BigDecimal.ROUND_CEILING).toString());build.setSumPeople(new BigDecimal(sumPeople.getValueAsString()).intValue());});}/*** 获取购买周期数据* @param build* @param aggregations*/private void getOrderDeadlineAggsData(DepositOrderBucketsRes build, Aggregations aggregations) {ParsedTerms deadlineData = aggregations.get("group_deadline");List<? extends Terms.Bucket> deadlineDataBuckets = deadlineData.getBuckets();deadlineDataBuckets.stream().forEach(bucket -> {Integer deadline = new BigDecimal(bucket.getKey().toString()).intValue();Aggregations agg = bucket.getAggregations();ParsedSum qty = agg.get("qty");ParsedSum money = agg.get("orderMoney");ParsedSum people = agg.get("people");DepositOrderDeadlineEnum deadlineEnum = DepositOrderDeadlineEnum.getDeadlineEnum(deadline);if (ObjectUtil.isEmpty(deadlineEnum)) {return;}switch (deadlineEnum) {case THIRTY_DAYS:build.setThirtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());build.setThirtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());build.setThirtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());break;case NINETY_DAYS:build.setNinetyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());build.setNinetyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());build.setNinetyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());break;case THREE_HUNDRED_SIXTY_DAYS:build.setThreeHundredSixtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());build.setThreeHundredSixtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());build.setThreeHundredSixtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());break;default:break;}});}/*** 获取购买来源数据* @param aggregations* @param build*/public void getOrderSourceAggsData(Aggregations aggregations, DepositOrderBucketsRes build) {ParsedTerms sourceData = aggregations.get("group_orderType");List<? extends Terms.Bucket> sourceDataBuckets = sourceData.getBuckets();sourceDataBuckets.stream().forEach(bucket -> {Integer orderType = new BigDecimal(bucket.getKey().toString()).intValue();Aggregations agg = bucket.getAggregations();ParsedSum qty = agg.get("qty");ParsedSum money = agg.get("orderMoney");ParsedSum people = agg.get("people");DepositOrderSourceEnum orderSourceEnum = DepositOrderSourceEnum.getOrderSourceEnum(orderType);if (ObjectUtil.isEmpty(orderSourceEnum)) {return;}switch (orderSourceEnum) {case DEPOSIT:build.setDepositQty(new BigDecimal(qty.getValueAsString()).intValue());build.setDepositMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());break;// 这几种来源,统称为商店入驻case CHANNEL_STORE:case FIRM:case CONSIGNMENT_STORE:case IDLE_STORE:BigDecimal newQty = new BigDecimal(qty.getValueAsString());BigDecimal oldQty = new BigDecimal(build.getStoreQty());BigDecimal newMoney = new BigDecimal(money.getValueAsString());BigDecimal oldMoney = new BigDecimal(build.getStoreMoney());build.setStoreQty((oldQty.add(newQty)).intValue());build.setStoreMoney(NumberUtil.round(oldMoney.add(newMoney), BigDecimal.ROUND_CEILING).toString());break;default:break;}});}

五:kibana中新建ES汇总查询模板

# 保证金存款表统计模板查询
GET /_search/template
{"id":"search_order_total_template","params":{"gteTime":"2008-01-01","lteTime":"2021-12-31","interval":"day","format":"yyyy-MM-dd"}
}# 保证金存款表统计模板
POST _scripts/search_order_total_template
{"script": {"lang": "mustache","source": {"size": 0,"query": {"range": {"orderTime": {"gte": "{{gteTime}}","lte": "{{lteTime}}","format": "yyyy-MM-dd"}}},"aggs": {"group_totalType_data": {"terms": {"field": "totalType"},"aggs": {"qty": {"sum": {"field": "qty"}},"orderMoney": {"sum": {"field": "orderMoney"}},"people": {"sum": {"field": "people"}}}},"group_orderType": {"terms": {"field": "orderType"},"aggs": {"qty": {"sum": {"field": "qty"}},"orderMoney": {"sum": {"field": "orderMoney"}},"people": {"sum": {"field": "people"}}}},"group_deadline": {"terms": {"field": "deadline"},"aggs": {"qty": {"sum": {"field": "qty"}},"orderMoney": {"sum": {"field": "orderMoney"}},"people": {"sum": {"field": "people"}}}}}}}
}

模板查询显示效果

java代码(dg-search项目)

// feign接口
/*** 保证金购买表--统计金额* @param depositBucketsReq* @return*/@GetMapping("/getDepositOrderTotal")DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq);// 实现类@Overridepublic DepositOrderBucketsRes getDepositOrderTotal(DepositBucketsReq depositBucketsReq) {final Map<String, Object> params = (Map<String, Object>) JSON.toJSON(depositBucketsReq);String templateName = elasticsearchConfig.bzjConfig.getSearch_order_total_template();LogUtil.info("模板名称:{}  请求参数为:{}", templateName, params.toString());SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName,params, restHighLevelClient);Aggregations aggregations = searchResponse.getAggregations();DepositOrderBucketsRes build = DepositOrderBucketsRes.builder().build();// 获取合计聚合数据(代码同上)getOrderSumAggsData(aggregations, build);// 获取购买周期数据(代码同上)getOrderDeadlineAggsData(build, aggregations);// 获取购买来源数据(代码同上)getOrderSourceAggsData(aggregations, build);return build;}

六:对ES查询模板的数据结果进行汇总返回前端JSON数据

Controller:

/*** 保证金购买统计表--查询*/@GetMapping("/order/list")public ResponseData listPageDepositOrderByReq(@Valid DepositCollectReq depositCollectReq){return ResponseData.success(financeReportService.listPageDepositOrderByReq(depositCollectReq));}/*** 保证金购买统计表--统计金额*/@GetMapping("/order/total")public ResponseData getDepositOrderTotal(@Valid DepositCollectReq depositCollectReq){return ResponseData.success(financeReportService.getDepositOrderTotal(depositCollectReq));}/*** 保证金购买统计表--导出excel*/@GetMapping("/order/export")public ResponseData getDepositOrderExportFile(@Valid DepositCollectReq depositCollectReq){return ResponseData.success(financeReportService.getDepositOrderExportFile(depositCollectReq));}

前端请求实体:

package com.dg.mall.financial.vo.req.report.collect;import com.dg.mall.core.page.PageVO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;/*** <p>* 请求实体* <p>*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DepositCollectReq extends PageVO {/*** 报表类型(0:日,1:月,2:年)*/@NotNull(message = "报表类型不能为空!")private Integer reportType;/*** 开始时间*/@NotBlank(message = "开始日期不能为空!")private String gteTime;/*** 结束时间*/@NotBlank(message = "结束日期不能为空!")private String lteTime;
}

Service:

/*** 保证金购买统计表--分页查询* @param depositCollectReq* @return*/PageVO<DepositOrderBucketsRes> listPageDepositOrderByReq(DepositCollectReq depositCollectReq);/*** 保证金购买统计表--导出excel* @param depositCollectReq* @return*/String getDepositOrderExportFile(DepositCollectReq depositCollectReq);/*** 保证金购买统计表--统计金额* @param depositCollectReq* @return*/DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq);

ServiceImpl:

 // 查询@Overridepublic PageVO<DepositOrderBucketsRes> listPageDepositOrderByReq(DepositCollectReq depositCollectReq) {PageVO<DepositOrderBucketsRes> pageVO = new PageVO<>();// 查询条件转换成ES的请求实体类并添加默认参数DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);List<DepositOrderBucketsRes> resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq);if (CollectionUtils.isEmpty(resList)){return pageVO;}List<DepositOrderBucketsRes> collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList());final PageUtils pageUtils = new PageUtils(Integer.valueOf(depositCollectReq.getCurrent() + ""), Integer.valueOf(depositCollectReq.getSize() + ""), collect);pageVO.setRecords(pageUtils.getCurrentList());pageVO.setTotal(pageUtils.getAllList().size());pageVO.setSize(depositCollectReq.getSize());pageVO.setCurrent(depositCollectReq.getCurrent());return pageVO;}//统计@Overridepublic DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq) {DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);return bzjFinanceReportFeignService.getDepositOrderTotal(depositBucketsReq);}// 导出excel(easyPoi)@Overridepublic String getDepositOrderExportFile(DepositCollectReq depositCollectReq) {DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);List<DepositOrderBucketsRes> resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq);if (CollectionUtils.isEmpty(resList)){return null;}List<DepositOrderBucketsRes> collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList());final Map<String, Object> map = Maps.newHashMap();TemplateExportParams params = new TemplateExportParams("template/excel/导出保证金购买报表模板.xlsx");map.put("mapList", collect);Workbook workbook = ExcelExportUtil.exportExcel(params, map);return uploadUtils.uploadFile(workbook, "保证金购买报表");}//查询条件转换成ES的请求实体类并添加默认参数@Overridepublic DepositBucketsReq getDepositBucketsReq(DepositCollectReq depositCollectReq) {String gteTime = depositCollectReq.getGteTime();String lteTime = depositCollectReq.getLteTime();Integer type = depositCollectReq.getReportType();ReportDateEnum reportDateEnum = ReportDateEnum.getReportDateEnum(type);switch (reportDateEnum) {case MONTH_FORMAT: {gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr();lteTime = DateUtil.endOfMonth(TimeUtils.getDateTime(lteTime, type)).toDateStr();break;}case YEAR_FORMAT: {gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr();lteTime = DateUtil.endOfYear(TimeUtils.getDateTime(lteTime, type)).toDateStr();break;}default:break;}return DepositBucketsReq.builder().gteTime(gteTime).lteTime(lteTime).format(reportDateEnum.getFormat()).interval(reportDateEnum.getInterval()).build();}

逻辑分页:因为ES查询模板我们没有实现使用ES的from、size这种分页语法(可能可以,但我们没有实现),所以封装了一个逻辑分页类

package com.dg.mall.financial.utils;import com.google.common.collect.Lists;
import lombok.Data;import java.io.Serializable;
import java.util.List;/*** <p>* 逻辑分页工具* <p>*/
@Data
public class PageUtils implements Serializable {/*** 当前页*/private Integer current;/*** 每页数据*/private Integer size;/*** 当前页数据*/private List currentList;/*** 所有数据*/private List allList;public PageUtils(Integer current, Integer size, List allList) {final List arrayList = Lists.newArrayList();this.current = current;this.size = size;this.allList = allList;if (allList.size() <= size) {this.currentList = allList;return;}int start = (current - 1) * size;for (int i = 0; i < size; i++) {if (start + i >= allList.size()) {break;}arrayList.add(allList.get(start + i));}this.currentList = arrayList;}
}

七:单笔订单购买进行数据推送ES处理

注意2点:

  • 记得做人数统计的去重,这里使用的是redis;
  • req.setId(),这个Id记得做到唯一,因为这个id会存入ES索引数据中,而ES索引id相同,会做数据覆盖;
    注:我这里单条数据推送使用了消息队列,大家可以参照、也可以直接调用批量插入ES的方法;
 /*** 购买表数据推送es(区分购买周期和购买来源)** @param depositOrder*/public void sendOrderToEs(DepositOrder depositOrder) {// 合计 日期_类型_useridaddOrderByTotalTypeData(depositOrder);// 购买周期if (addOrderByDeadlineData(depositOrder)) return;// 购买来源addOrderBySourceData(depositOrder);}/*** 添加订单数据,合计、并对人数进行区分** @param depositOrder*/private void addOrderByTotalTypeData(DepositOrder depositOrder) {DepositOrderMappingReq req = DepositOrderMappingReq.builder().build();String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd");// 人数去重String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(SUM_TYPE.getType()).concat("_").concat(depositOrder.getUserId());if (!redisTemplate.hasKey(key)) {req.setPeople(1L);redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS);}req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(SUM_TYPE.getType()));req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"));req.setTotalType(SUM_TYPE.getCode());req.setQty(1L);req.setOrderMoney(depositOrder.getOrderMoney());ReportReq reportReq = ReportReq.builder().shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ).data(JSON.toJSONString(req)).build();financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));}/*** 添加订单数据,根据购买周期区分** @param depositOrder* @return*/private boolean addOrderByDeadlineData(DepositOrder depositOrder) {DepositOrderMappingReq req = DepositOrderMappingReq.builder().build();String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd");DepositOrderSubsidy orderSubsidy = depositOrderSubsidyService.getOne(new LambdaQueryWrapper<DepositOrderSubsidy>().eq(DepositOrderSubsidy::getOrderId, depositOrder.getOrderId()));if (ObjectUtil.isEmpty(orderSubsidy)) {return true;}// 人数去重String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(DEADLINE_TYPE.getType()).concat("_").concat(String.valueOf(orderSubsidy.getDeadline())).concat("_").concat(depositOrder.getUserId());if (!redisTemplate.hasKey(key)) {req.setPeople(1L);redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS);}req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(DEADLINE_TYPE.getType()));req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"));req.setDeadline(orderSubsidy.getDeadline());req.setQty(1L);req.setOrderMoney(depositOrder.getOrderMoney());ReportReq reportReq = ReportReq.builder().shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ).data(JSON.toJSONString(req)).build();financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));return false;}/*** 添加订单数据,根据购买来源区分** @param depositOrder*/private void addOrderBySourceData(DepositOrder depositOrder) {DepositOrderMappingReq req = DepositOrderMappingReq.builder().id(String.valueOf(depositOrder.getId()).concat("_").concat(ORDER_SOURCE_TYPE.getType())).orderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd")).orderType(depositOrder.getOrderType()).qty(1L).orderMoney(depositOrder.getOrderMoney()).build();ReportReq reportReq = ReportReq.builder().shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ).data(JSON.toJSONString(req)).build();financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));}/*** 发送消息队列** @param exchange* @param routingKey* @param content*/public void sendMessage(String exchange, String routingKey, Object content) {rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);CorrelationData correlationData = new CorrelationData(String.valueOf(IdWorker.getId()));rabbitTemplate.convertAndSend(exchange, routingKey, getMessage(content),message -> {message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}, correlationData);LogUtil.info("消息队列发送完成,消息为:" + content);}

消息队列–消费者:单条数据推送ES

package com.dg.mall.financial.rabbitmq.consumer;/*** <p>* 财务报表--消费* <p>*/
@Service
public class DepositFinancelReportConsumer {@Autowiredprivate BzjFinanceReportFeignService bzjFinanceReportFeignService;@Resourceprivate DepositRenewalReportFeignService depositRenewalReportFeignService;@Resourceprivate DepositRollReportFeignService depositRollReportFeignService;@Resourceprivate DepositAwardEsFeignService depositAwardEsFeignService;@Resourceprivate DepositFinanceReportService financeReportService;@Resourceprivate SubsidyAwardService subsidyAwardService;@Resourceprivate RedisTemplate redisTemplate;@RabbitListener(queues = "deposit-financel-report-queue")@RabbitHandler@Transactional(rollbackFor = Exception.class)public void financelReportQueue(Message msg, Channel channel) throws Exception {long deliveryTag = msg.getMessageProperties().getDeliveryTag();String resMsg = new String(msg.getBody(), "utf-8");try {// 根据报表类型区分ReportReq req = JSON.parseObject(resMsg, new TypeReference<ReportReq>() {});String shortName = req.getShortName();switch (shortName) {case ReportTypeConstants.FINANCIEL_BZJGMTJ:// 批量添加--保证金购买统计表es数据batchAddOrUpdateDepositOrderEsMapping(channel, deliveryTag, req);break;// ……default:throw new IllegalStateException("Unexpected value: " + shortName);}} catch (Exception e) {channel.basicNack(deliveryTag, false, true);throw new RuntimeException();}channel.basicAck(deliveryTag, true);}/*** 批量新增保证金存款数据(按"购买周期、购买来源"区分)** @param channel* @param deliveryTag* @param req* @throws IOException*/public void batchAddOrUpdateDepositOrderEsMapping(Channel channel, long deliveryTag, ReportReq req) throws IOException {DepositOrderMappingReq orderMappingReq = JSON.parseObject(req.getData(), new TypeReference<DepositOrderMappingReq>() {});if (ObjectUtil.isEmpty(orderMappingReq.getOrderTime())) {channel.basicAck(deliveryTag, true);return;}ArrayList<DepositOrderMappingReq> list = new ArrayList<>();list.add(orderMappingReq);bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(list);return;}
}

总结:
至此为止,功能就算全部完成了,对于我来说,重点就在于如何设计ES索引数据结构和查询模板的聚合使用。

Elasticsearch(简称ES)实现日报表、月报表、年报表统计,没数据补0相关推荐

  1. es统计mysql 报表_Elasticsearch(简称ES)实现日报表、月报表、年报表统计,没数据补0...

    前言: 这是1张相对简单些报表示例,用sql语句也能实现,但主要是为了便于大家理解ES聚合实现报表的使用方法. 之前写过"java实现日报表.月报表统计,没数据补0"文章,http ...

  2. java实现日报表、月报表统计,没数据补0

    产品需求: 1.日报表 2.月报表 需求点: 前端传日期或月份区间,当数据库中指定的日期或月份没数据时也需要界面显示,但领取人数和使用人数需要自动补齐0: 举例:2020-11-28.2020-11- ...

  3. 全文检索 Elasticsearch(简称es)

    全文检索 Elasticsearch 研究 1. ElasticSearch 介绍 1.1 介绍 **Elasticsearch**是一个基于Lucene库的搜索引擎.它提供了一个分布式.支持多租户的 ...

  4. JS获取日期(年/月/日/时/分/秒)以及完整格式转化(补0)

    var myDate = new Date(); myDate.getYear();        //获取当前年份(2位) myDate.getFullYear();    //获取完整的年份(4位 ...

  5. mysql查询补齐12个月_MySQL查询12个月数据,无数据补0

    1.数据库  和表就用自己的,下面是代码 select case month(shi_com_time) when '1' then sum(mileage) else 0 end as 一月份, c ...

  6. 搜索引擎之ElasticSearch(es)入门学习、ELK 和 beats

    好记星不如烂笔头,这里记录平时工作中用到的东西,不喜可以留言. 一.ElasticSearch为啥要用 ElasticSearch简称es是一个ElasticSearch是一个分布式,高性能.高可用. ...

  7. 玩转Redis-HyperLogLog统计微博日活月活

    <玩转Redis>系列文章主要讲述Redis的基础及中高级应用.本文是<玩转Redis>系列第[9]篇,最新系列文章请前往公众号"zxiaofan"查看,或 ...

  8. redis统计用户日活量_玩转Redis-HyperLogLog统计微博日活月活

    <玩转Redis>系列文章主要讲述Redis的基础及中高级应用.本文是<玩转Redis>系列第[9]篇,最新系列文章请前往公众号"zxiaofan"查看,或 ...

  9. 帆软报表如何实现开始时间-结束时间-时间间隔(时,日、周、月、年)分段显示数据

    帆软报表如何实现开始时间-结束时间-时间间隔(时,日.周.月.年)分段显示数据 1.效果展示 时间间隔:全部.时.日.周.月.年 按小时 按日 按周 按月 按年 如果这是你想要的,可以继续查看如何实现 ...

最新文章

  1. Android SystemTrace使用攻略
  2. linux 子网和广播地址异常
  3. 只能是数字、字母、-和_
  4. 从 ELK 到 EFK 演进
  5. SQL培训内容转之wantin6(收藏)
  6. 直接上干货!技术水平真的很重要!复习指南
  7. MongoDB的快速手动安装
  8. jQuery -gt; end方法的使用方法
  9. 模糊逻辑学习--建立Mamdani系统(GUI)
  10. qq企业邮箱 pop3服务器是什么意思,腾讯qq端口是什么? QQ企业邮箱POP3SMTP设置
  11. zen-cart修改 zencart 模板修改
  12. 一维搜索算法——黄金分割法原理与实现
  13. gwas snp 和_如何利用分子实验验证GWAS发现的SNP?
  14. openwrt网络设置
  15. 使用intel编译器编译WRF4.4
  16. Android:从assets资源目录下安装apk
  17. calcHist的使用
  18. oracle 中的 NVL2() 函数
  19. Linux Shell学习笔记:exit退出状态代码
  20. win7计算机文件夹选项在哪里,Win7文件夹选项不见了怎么办?

热门文章

  1. 【C# 单因素方差分析(One Way ANOVA)】
  2. 冶金物理化学复习 --- 湿法分离提纯过程
  3. 根据先序和中序(中序和后序)确定二叉树
  4. 服务器虚拟机控制台打不开,OpenStack虚拟机控制台打不开
  5. 响应式鲜花店预订网站织梦源码
  6. 大鱼吃小鱼算法java,瞬间高大上了!
  7. MFC透明位图显示函数
  8. MSI和MSI-X对比(五)
  9. 4个万兆光口+8个千兆combo光电复用口+16个千兆网口管理型万兆机架式工业级以太网交换机
  10. jqury实现异步文件上传