前言:

这是1张相对简单些报表示例,用sql语句也能实现,但主要是为了便于大家理解ES聚合实现报表的使用方法。

之前写过"java实现日报表、月报表统计,没数据补0"文章,https://blog.csdn.net/JRocks/article/details/113841913,方法是用sql语句查询数据库得到结果,然后通过java代码实现日期无数据自动补0,这种方法是非常简单方便。

但也有弊端,如有些数据展示用sql语句关联查询非常麻烦并且效率低下、或者说是sql语句实现不了、又或者sql+java代码实现也很麻烦、又或者是随着业务数据迅速增长,复杂的sql关联查询导致页面响应速度慢等等情况,所以考虑使用ES来做处理,一是可以提高查询效率,二是ES可以自动补0。

产品需求:

在这里插入图片描述

需求描述:

首先大家可以理解为这是1张订单表,表中有“购买周期、购买来源”字典字段;

这是1张年月日报表,同时筛选条件允许跨年、跨月、跨日,我们需要做的就是根据查询条件将数据从“合计、购买周期、购买来源”3个维度进行汇总,特别需要说明一点就是"人数"汇总字段,相同用户需要去重;

项目已经上线,所以既需要对历史数据进行推送ES处理,又需要对单笔订单购买进行数据推送ES处理;

思路步骤:很重要

kibana中新建ES数据索引;

新建批量添加ES索引数据的方法;

历史数据初始化推送ES对应索引中;

kibana中新建ES查询模板;

kibana中新建ES汇总查询模板

(注:也就是产品需求中表格下的最后一列合计相关字段内容,是显示当前查询条件下的所有汇总,不只是显示当前分页数据结果);

对ES查询模板的数据结果进行汇总返回前端JSON数据;

单笔订单购买进行数据推送ES处理;

注明:这是微服务项目,文章中提到的是2个module,其中关于ES相关的代码单独是1个module,称为dg-search,在代码实战中我会列出。

代码实战:按照思路步骤逐步展开

一:kibana中新建ES数据索引;

首先得搭建好ES,搭建过程这里不展开,kibana界面如下

在这里插入图片描述

# 1、新建保证金购买统计表索引

PUT dg_financial_order_report

{

"mappings": {

"properties": {

"orderTime":{

"type": "date",

"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",

"fields": {

"keyword": {

"type": "keyword",

"ignore_above": 20

}

}

},

"totalType":{

"type": "integer"

},

"deadline":{

"type": "integer"

},

"orderType":{

"type": "integer"

},

"qty":{

"type": "long"

},

"orderMoney":{

"type": "float"

},

"people":{

"type": "long"

}

}

}

}

二:新建批量添加ES索引数据的方法(dg-search项目)

// -------------------------------------------------------------------------------------------

// Feign接口

package com.dg.mall.search.feign.feign;

@FeignClient(name = "dg-search", path = "/dg-search/api/bzjFinanceReportEs")

public interface BzjFinanceReportFeignService {

/**

* 批量新增保证金购买表数据

*/

@PostMapping("/batchAddOrUpdateDepositOrder")

void batchAddOrUpdateDepositOrderEsMapping(@RequestBody List list);

/**

* 保证金购买表数据查询

* @param depositBucketsReq

* @return

*/

@GetMapping("/listDepositOrder")

List listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq);

/**

* 保证金购买表--统计金额

* @param depositBucketsReq

* @return

*/

@GetMapping("/getDepositOrderTotal")

DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq);

}

// -------------------------------------------------------------------------------------------

// Feign接口实现类

@RestController

@RequestMapping("/api/bzjFinanceReportEs")

public class BzjFinanceReportFeignProvider implements BzjFinanceReportFeignService {

@Autowired

private RestHighLevelClient restHighLevelClient;

/**

* 批量新增保证金购买表数据

*/

@Override

public void batchAddOrUpdateDepositOrderEsMapping(List list) {

try {

if (CollectionUtils.isEmpty(list)) {

return;

}

BulkRequest bulkRequest = new BulkRequest();

IndexRequest request = null;

for (DepositOrderMappingReq req : list) {

request = new IndexRequest("POST");

request.index( "索引名"); // dg_financial_order_report

request.id(req.getId());

request.source(JSON.toJSONString(req), XContentType.JSON);

bulkRequest.add(request);

}

restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkListener);

} catch (Exception e) {

LogUtil.error("批量添加保证金存款表es数据失败! message:{} --", e, e.getMessage());

throw new ServiceException(SearchExceptionEnum.BATCH_ADD_BZJ_ORDER_ES_FAIL);

}

}

}

实体类

/**

*

* 保证金购买统计表--批量新增ES数据,请求实体

*

*/

@Data

@Builder

@NoArgsConstructor

@AllArgsConstructor

public class DepositOrderMappingReq {

/**

* 存款ID

*/

private String id;

/**

* 购买日期

*/

private String orderTime;

/**

* 合计

*/

private Integer totalType = null;

/**

* 购买周期

*/

private Integer deadline = null;

/**

* 购买来源

*/

private Integer orderType = null;

/**

* 笔数

*/

private Long qty = 0L;

/**

* 金额

*/

private BigDecimal orderMoney = BigDecimal.ZERO;

/**

* 人数(已去重)

*/

private Long people = 0L;

}

/**

*

* 保证金购买统计表--ES数据查询请求实体

*

*/

@Data

@Builder

@NoArgsConstructor

@AllArgsConstructor

public class DepositBucketsReq {

/**

* 开始时间

*/

private String gteTime;

/**

* 结束时间

*/

private String lteTime;

/**

* 统计类型(年月日)

*/

private String interval;

/**

* 时间格式化

*/

private String format;

}

三:历史数据初始化推送ES索引

说明:这里采用任务调度的方式执行(没有启用),便于操作数据初始化,只是上线之后手动执行一次该定时任务。

在这里插入图片描述

package com.dg.mall.financial.jobhandle;

/**

*

* 推送Es数据定时任务

*

*/

@Service

public class SyncDepositOrderToEsHandle extends IJobHandler {

@Resource

private DepositOrderService orderService;

@Resource

private BzjFinanceReportFeignService bzjFinanceReportFeignService;

@Resource

private DepositRollReportFeignService depositRollReportFeignService;

@Resource

private FinancialProducer financialProducer;

private static ThreadLocal threadLocal = new ThreadLocal<>();

@XxlJob("syncDepositOrderToEsHandle")

@Override

public ReturnT execute(String param) throws Exception {

// 这里采用这种写法的目的是:执行任务调度后可以马上返回结果提示

threadLocal.set(param);

Thread thread = new Thread(new SyncDepositOrderToEsHandle.SynExecute());

thread.start();

Thread expiredThread = new Thread(new SyncDepositOrderToEsHandle.SynExpiredExecute());

expiredThread.start();

return ReturnT.SUCCESS;

}

class SynExecute implements Runnable {

@Override

public void run() {

//考虑数据量过大,进行分页推送数据,每次推送100条

int current = 1, size = 100;

for (; ; ) {

PageVO pageVO = new PageVO<>(current, size);

pageVO.setSearchCount(false);

// 查询所有订单数据,按合计、购买来源、购买周期进行区分,见下面的mapping.xml

PageVO res = orderService.getSyncPushOrderDataToEs(pageVO, threadLocal.get());

if (ObjectUtil.isEmpty(res.getRecords()) || res.getRecords().size() < 1) {

break;

}

final List orderMappingReqs = Lists.newArrayList();

for (DepositOrderMappingRes mappingRes : res.getRecords()) {

DepositOrderMappingReq reportReq = orderService.batchAddOrderData(mappingRes);

orderMappingReqs.add(reportReq);

}

if (CollectionUtils.isNotEmpty(orderMappingReqs)) {

// 调用dg-search批量新增的ES的方法

bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(orderMappingReqs);

}

current++;

}

}

}

}

orderService.getSyncPushOrderDataToEs() 方法的mapping.xml

resultType="com.dg.mall.financial.vo.res.deposit.DepositOrderMappingRes">

-- 1、合计

SELECT

o.id AS id,

DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,

0 AS totalType,

NULL AS deadline,

NULL AS orderType,

count( * ) AS qty,

sum( o.order_money ) AS orderMoney,

o.user_id AS userId

FROM

dg_deposit_order o

JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id

WHERE

GROUP BY

orderTime,

userId

UNION ALL

-- 2、购买周期

SELECT

o.id AS id,

DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,

NULL AS totalType,

dos.deadline AS deadline,

NULL AS orderType,

count( * ) AS qty,

sum( o.order_money ) AS orderMoney,

o.user_id AS userId

FROM

dg_deposit_order o

JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id

WHERE

GROUP BY

orderTime,

userId,

deadline

UNION ALL

-- 3、购买来源

SELECT

o.id AS id,

DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,

NULL AS totalType,

NULL AS deadline,

o.order_type AS orderType,

count( * ) AS qty,

sum( o.order_money ) AS orderMoney,

o.user_id AS userId

FROM

dg_deposit_order o

JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id

WHERE

GROUP BY

orderTime,

userId,

orderType

四:kibana中新建ES查询模板

有小伙伴会问:为什么使用查询模板,它有什么好处呢?

首先:我认为更好上手,可以提升你的成功自信心,“模板”从字面上讲就是给人一种套用,更简单这种感觉;

第二:更直观、可以减少java代码量,让你更专心关注ES的聚合写法,如果你手动用java代码写过ES的复杂聚合,你会发现使用模板确实更方便好用;

# 保证金购买模板查询

GET /_search/template

{

"id":"search_order_collect_template",

"params":{

"gteTime":"2021-02-02",

"lteTime":"2021-02-02",

"interval":"day",

"format":"yyyy-MM-dd"

}

}

# 保证金购买模板

POST _scripts/search_order_collect_template

{

"script": {

"lang": "mustache",

"source": {

"size": 0,

"query": {

"range": {

"orderTime": {

"gte": "{{gteTime}}",

"lte": "{{lteTime}}",

"format": "yyyy-MM-dd"

}

}

},

"aggs": {

"group_date_histogram_data": {

"date_histogram": {

"field": "orderTime",

"calendar_interval": "{{interval}}",

"format": "{{format}}"

},

"aggs": {

"group_totalType_data": {

"terms": {

"field": "totalType"

},

"aggs": {

"qty": {

"sum": {

"field": "qty"

}

},

"orderMoney": {

"sum": {

"field": "orderMoney"

}

},

"people": {

"sum": {

"field": "people"

}

}

}

},

"group_orderType": {

"terms": {

"field": "orderType"

},

"aggs": {

"qty": {

"sum": {

"field": "qty"

}

},

"orderMoney": {

"sum": {

"field": "orderMoney"

}

},

"people": {

"sum": {

"field": "people"

}

}

}

},

"group_deadline": {

"terms": {

"field": "deadline"

},

"aggs": {

"qty": {

"sum": {

"field": "qty"

}

},

"orderMoney": {

"sum": {

"field": "orderMoney"

}

},

"people": {

"sum": {

"field": "people"

}

}

}

}

}

}

}

}

}

}

模板查询显示效果

在这里插入图片描述

java代码(dg-search项目)

//feign接口

/**

* 保证金购买表数据查询

* @param depositBucketsReq

* @return

*/

@GetMapping("/listDepositOrder")

List listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq);

// 实现类

@Override

public List listDepositOrder(DepositBucketsReq depositBucketsReq) {

final Map params = (Map) JSON.toJSON(depositBucketsReq);

String templateName = elasticsearchConfig.bzjConfig.getSearch_order_collect_template();

LogUtil.info("模板名称:{} 请求参数为:{}", templateName, params.toString());

SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName,

params, restHighLevelClient);

Aggregations aggregations = searchResponse.getAggregations();

// 和统计模板的不同之处

ParsedDateHistogram dateHistogram = aggregations.get("group_date_histogram_data");

if (dateHistogram.getBuckets().size() < 1) {

return null;

}

return getDepositOrderCollect(dateHistogram.getBuckets());

}

private List getDepositOrderCollect(List extends Histogram.Bucket> buckets){

List res = Lists.newLinkedList();

buckets.stream().forEach(dateBucket -> {

String dateString = dateBucket.getKeyAsString();

DepositOrderBucketsRes build = DepositOrderBucketsRes.builder()

.orderTime(dateString)

.build();

Aggregations aggregations = dateBucket.getAggregations();

// 获取合计聚合数据

getOrderSumAggsData(aggregations, build);

// 获取购买周期数据

getOrderDeadlineAggsData(build, aggregations);

// 获取购买来源数据

getOrderSourceAggsData(aggregations, build);

res.add(build);

});

return res;

}

/**

* 获取合计聚合数据

* @param aggregations

* @param build

*/

public void getOrderSumAggsData(Aggregations aggregations, DepositOrderBucketsRes build) {

ParsedTerms totalTypeData = aggregations.get("group_totalType_data");

List extends Terms.Bucket> totalTypeDataBuckets = totalTypeData.getBuckets();

totalTypeDataBuckets.stream().forEach(bucket -> {

Aggregations agg = bucket.getAggregations();

ParsedSum sumQty = agg.get("qty");

ParsedSum sumMoney = agg.get("orderMoney");

ParsedSum sumPeople = agg.get("people");

build.setSumQty(new BigDecimal(sumQty.getValueAsString()).intValue());

build.setSumMoney(NumberUtil.round(new BigDecimal(sumMoney.getValueAsString()), BigDecimal.ROUND_CEILING).toString());

build.setSumPeople(new BigDecimal(sumPeople.getValueAsString()).intValue());

});

}

/**

* 获取购买周期数据

* @param build

* @param aggregations

*/

private void getOrderDeadlineAggsData(DepositOrderBucketsRes build, Aggregations aggregations) {

ParsedTerms deadlineData = aggregations.get("group_deadline");

List extends Terms.Bucket> deadlineDataBuckets = deadlineData.getBuckets();

deadlineDataBuckets.stream().forEach(bucket -> {

Integer deadline = new BigDecimal(bucket.getKey().toString()).intValue();

Aggregations agg = bucket.getAggregations();

ParsedSum qty = agg.get("qty");

ParsedSum money = agg.get("orderMoney");

ParsedSum people = agg.get("people");

DepositOrderDeadlineEnum deadlineEnum = DepositOrderDeadlineEnum.getDeadlineEnum(deadline);

if (ObjectUtil.isEmpty(deadlineEnum)) {

return;

}

switch (deadlineEnum) {

case THIRTY_DAYS:

build.setThirtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());

build.setThirtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());

build.setThirtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());

break;

case NINETY_DAYS:

build.setNinetyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());

build.setNinetyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());

build.setNinetyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());

break;

case THREE_HUNDRED_SIXTY_DAYS:

build.setThreeHundredSixtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());

build.setThreeHundredSixtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());

build.setThreeHundredSixtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());

break;

default:

break;

}

});

}

/**

* 获取购买来源数据

* @param aggregations

* @param build

*/

public void getOrderSourceAggsData(Aggregations aggregations, DepositOrderBucketsRes build) {

ParsedTerms sourceData = aggregations.get("group_orderType");

List extends Terms.Bucket> sourceDataBuckets = sourceData.getBuckets();

sourceDataBuckets.stream().forEach(bucket -> {

Integer orderType = new BigDecimal(bucket.getKey().toString()).intValue();

Aggregations agg = bucket.getAggregations();

ParsedSum qty = agg.get("qty");

ParsedSum money = agg.get("orderMoney");

ParsedSum people = agg.get("people");

DepositOrderSourceEnum orderSourceEnum = DepositOrderSourceEnum.getOrderSourceEnum(orderType);

if (ObjectUtil.isEmpty(orderSourceEnum)) {

return;

}

switch (orderSourceEnum) {

case DEPOSIT:

build.setDepositQty(new BigDecimal(qty.getValueAsString()).intValue());

build.setDepositMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());

break;

// 这几种来源,统称为商店入驻

case CHANNEL_STORE:

case FIRM:

case CONSIGNMENT_STORE:

case IDLE_STORE:

BigDecimal newQty = new BigDecimal(qty.getValueAsString());

BigDecimal oldQty = new BigDecimal(build.getStoreQty());

BigDecimal newMoney = new BigDecimal(money.getValueAsString());

BigDecimal oldMoney = new BigDecimal(build.getStoreMoney());

build.setStoreQty((oldQty.add(newQty)).intValue());

build.setStoreMoney(NumberUtil.round(oldMoney.add(newMoney), BigDecimal.ROUND_CEILING).toString());

break;

default:

break;

}

});

}

五:kibana中新建ES汇总查询模板

# 保证金存款表统计模板查询

GET /_search/template

{

"id":"search_order_total_template",

"params":{

"gteTime":"2008-01-01",

"lteTime":"2021-12-31",

"interval":"day",

"format":"yyyy-MM-dd"

}

}

# 保证金存款表统计模板

POST _scripts/search_order_total_template

{

"script": {

"lang": "mustache",

"source": {

"size": 0,

"query": {

"range": {

"orderTime": {

"gte": "{{gteTime}}",

"lte": "{{lteTime}}",

"format": "yyyy-MM-dd"

}

}

},

"aggs": {

"group_totalType_data": {

"terms": {

"field": "totalType"

},

"aggs": {

"qty": {

"sum": {

"field": "qty"

}

},

"orderMoney": {

"sum": {

"field": "orderMoney"

}

},

"people": {

"sum": {

"field": "people"

}

}

}

},

"group_orderType": {

"terms": {

"field": "orderType"

},

"aggs": {

"qty": {

"sum": {

"field": "qty"

}

},

"orderMoney": {

"sum": {

"field": "orderMoney"

}

},

"people": {

"sum": {

"field": "people"

}

}

}

},

"group_deadline": {

"terms": {

"field": "deadline"

},

"aggs": {

"qty": {

"sum": {

"field": "qty"

}

},

"orderMoney": {

"sum": {

"field": "orderMoney"

}

},

"people": {

"sum": {

"field": "people"

}

}

}

}

}

}

}

}

模板查询显示效果

在这里插入图片描述

java代码(dg-search项目)

// feign接口

/**

* 保证金购买表--统计金额

* @param depositBucketsReq

* @return

*/

@GetMapping("/getDepositOrderTotal")

DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq);

// 实现类

@Override

public DepositOrderBucketsRes getDepositOrderTotal(DepositBucketsReq depositBucketsReq) {

final Map params = (Map) JSON.toJSON(depositBucketsReq);

String templateName = elasticsearchConfig.bzjConfig.getSearch_order_total_template();

LogUtil.info("模板名称:{} 请求参数为:{}", templateName, params.toString());

SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName,

params, restHighLevelClient);

Aggregations aggregations = searchResponse.getAggregations();

DepositOrderBucketsRes build = DepositOrderBucketsRes.builder().build();

// 获取合计聚合数据(代码同上)

getOrderSumAggsData(aggregations, build);

// 获取购买周期数据(代码同上)

getOrderDeadlineAggsData(build, aggregations);

// 获取购买来源数据(代码同上)

getOrderSourceAggsData(aggregations, build);

return build;

}

六:对ES查询模板的数据结果进行汇总返回前端JSON数据

Controller:

/**

* 保证金购买统计表--查询

*/

@GetMapping("/order/list")

public ResponseData listPageDepositOrderByReq(@Valid DepositCollectReq depositCollectReq){

return ResponseData.success(financeReportService.listPageDepositOrderByReq(depositCollectReq));

}

/**

* 保证金购买统计表--统计金额

*/

@GetMapping("/order/total")

public ResponseData getDepositOrderTotal(@Valid DepositCollectReq depositCollectReq){

return ResponseData.success(financeReportService.getDepositOrderTotal(depositCollectReq));

}

/**

* 保证金购买统计表--导出excel

*/

@GetMapping("/order/export")

public ResponseData getDepositOrderExportFile(@Valid DepositCollectReq depositCollectReq){

return ResponseData.success(financeReportService.getDepositOrderExportFile(depositCollectReq));

}

前端请求实体:

package com.dg.mall.financial.vo.req.report.collect;

import com.dg.mall.core.page.PageVO;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import javax.validation.constraints.NotBlank;

import javax.validation.constraints.NotNull;

/**

*

* 请求实体

*

*/

@Data

@AllArgsConstructor

@NoArgsConstructor

public class DepositCollectReq extends PageVO {

/**

* 报表类型(0:日,1:月,2:年)

*/

@NotNull(message = "报表类型不能为空!")

private Integer reportType;

/**

* 开始时间

*/

@NotBlank(message = "开始日期不能为空!")

private String gteTime;

/**

* 结束时间

*/

@NotBlank(message = "结束日期不能为空!")

private String lteTime;

}

Service:

/**

* 保证金购买统计表--分页查询

* @param depositCollectReq

* @return

*/

PageVO listPageDepositOrderByReq(DepositCollectReq depositCollectReq);

/**

* 保证金购买统计表--导出excel

* @param depositCollectReq

* @return

*/

String getDepositOrderExportFile(DepositCollectReq depositCollectReq);

/**

* 保证金购买统计表--统计金额

* @param depositCollectReq

* @return

*/

DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq);

ServiceImpl:

// 查询

@Override

public PageVO listPageDepositOrderByReq(DepositCollectReq depositCollectReq) {

PageVO pageVO = new PageVO<>();

// 查询条件转换成ES的请求实体类并添加默认参数

DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);

List resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq);

if (CollectionUtils.isEmpty(resList)){

return pageVO;

}

List collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList());

final PageUtils pageUtils = new PageUtils(Integer.valueOf(depositCollectReq.getCurrent() + ""), Integer.valueOf(depositCollectReq.getSize() + ""), collect);

pageVO.setRecords(pageUtils.getCurrentList());

pageVO.setTotal(pageUtils.getAllList().size());

pageVO.setSize(depositCollectReq.getSize());

pageVO.setCurrent(depositCollectReq.getCurrent());

return pageVO;

}

//统计

@Override

public DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq) {

DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);

return bzjFinanceReportFeignService.getDepositOrderTotal(depositBucketsReq);

}

// 导出excel(easyPoi)

@Override

public String getDepositOrderExportFile(DepositCollectReq depositCollectReq) {

DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);

List resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq);

if (CollectionUtils.isEmpty(resList)){

return null;

}

List collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList());

final Map map = Maps.newHashMap();

TemplateExportParams params = new TemplateExportParams("template/excel/导出保证金购买报表模板.xlsx");

map.put("mapList", collect);

Workbook workbook = ExcelExportUtil.exportExcel(params, map);

return uploadUtils.uploadFile(workbook, "保证金购买报表");

}

//查询条件转换成ES的请求实体类并添加默认参数

@Override

public DepositBucketsReq getDepositBucketsReq(DepositCollectReq depositCollectReq) {

String gteTime = depositCollectReq.getGteTime();

String lteTime = depositCollectReq.getLteTime();

Integer type = depositCollectReq.getReportType();

ReportDateEnum reportDateEnum = ReportDateEnum.getReportDateEnum(type);

switch (reportDateEnum) {

case MONTH_FORMAT: {

gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr();

lteTime = DateUtil.endOfMonth(TimeUtils.getDateTime(lteTime, type)).toDateStr();

break;

}

case YEAR_FORMAT: {

gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr();

lteTime = DateUtil.endOfYear(TimeUtils.getDateTime(lteTime, type)).toDateStr();

break;

}

default:

break;

}

return DepositBucketsReq.builder()

.gteTime(gteTime)

.lteTime(lteTime)

.format(reportDateEnum.getFormat())

.interval(reportDateEnum.getInterval())

.build();

}

逻辑分页:因为ES查询模板我们没有实现使用ES的from、size这种分页语法(可能可以,但我们没有实现),所以封装了一个逻辑分页类

package com.dg.mall.financial.utils;

import com.google.common.collect.Lists;

import lombok.Data;

import java.io.Serializable;

import java.util.List;

/**

*

* 逻辑分页工具

*

*/

@Data

public class PageUtils implements Serializable {

/**

* 当前页

*/

private Integer current;

/**

* 每页数据

*/

private Integer size;

/**

* 当前页数据

*/

private List currentList;

/**

* 所有数据

*/

private List allList;

public PageUtils(Integer current, Integer size, List allList) {

final List arrayList = Lists.newArrayList();

this.current = current;

this.size = size;

this.allList = allList;

if (allList.size() <= size) {

this.currentList = allList;

return;

}

int start = (current - 1) * size;

for (int i = 0; i < size; i++) {

if (start + i >= allList.size()) {

break;

}

arrayList.add(allList.get(start + i));

}

this.currentList = arrayList;

}

}

七:单笔订单购买进行数据推送ES处理

注意2点:

记得做人数统计的去重,这里使用的是redis;

req.setId(),这个Id记得做到唯一,因为这个id会存入ES索引数据中,而ES索引id相同,会做数据覆盖;

注:我这里单条数据推送使用了消息队列,大家可以参照、也可以直接调用批量插入ES的方法;

/**

* 购买表数据推送es(区分购买周期和购买来源)

*

* @param depositOrder

*/

public void sendOrderToEs(DepositOrder depositOrder) {

// 合计 日期_类型_userid

addOrderByTotalTypeData(depositOrder);

// 购买周期

if (addOrderByDeadlineData(depositOrder)) return;

// 购买来源

addOrderBySourceData(depositOrder);

}

/**

* 添加订单数据,合计、并对人数进行区分

*

* @param depositOrder

*/

private void addOrderByTotalTypeData(DepositOrder depositOrder) {

DepositOrderMappingReq req = DepositOrderMappingReq.builder().build();

String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd");

// 人数去重

String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(SUM_TYPE.getType()).concat("_").concat(depositOrder.getUserId());

if (!redisTemplate.hasKey(key)) {

req.setPeople(1L);

redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS);

}

req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(SUM_TYPE.getType()));

req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"));

req.setTotalType(SUM_TYPE.getCode());

req.setQty(1L);

req.setOrderMoney(depositOrder.getOrderMoney());

ReportReq reportReq = ReportReq.builder()

.shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ)

.data(JSON.toJSONString(req)).build();

financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));

}

/**

* 添加订单数据,根据购买周期区分

*

* @param depositOrder

* @return

*/

private boolean addOrderByDeadlineData(DepositOrder depositOrder) {

DepositOrderMappingReq req = DepositOrderMappingReq.builder().build();

String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd");

DepositOrderSubsidy orderSubsidy = depositOrderSubsidyService.getOne(new LambdaQueryWrapper()

.eq(DepositOrderSubsidy::getOrderId, depositOrder.getOrderId()));

if (ObjectUtil.isEmpty(orderSubsidy)) {

return true;

}

// 人数去重

String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(DEADLINE_TYPE.getType()).concat("_").concat(String.valueOf(orderSubsidy.getDeadline())).concat("_").concat(depositOrder.getUserId());

if (!redisTemplate.hasKey(key)) {

req.setPeople(1L);

redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS);

}

req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(DEADLINE_TYPE.getType()));

req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"));

req.setDeadline(orderSubsidy.getDeadline());

req.setQty(1L);

req.setOrderMoney(depositOrder.getOrderMoney());

ReportReq reportReq = ReportReq.builder()

.shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ)

.data(JSON.toJSONString(req)).build();

financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));

return false;

}

/**

* 添加订单数据,根据购买来源区分

*

* @param depositOrder

*/

private void addOrderBySourceData(DepositOrder depositOrder) {

DepositOrderMappingReq req = DepositOrderMappingReq.builder()

.id(String.valueOf(depositOrder.getId()).concat("_").concat(ORDER_SOURCE_TYPE.getType()))

.orderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"))

.orderType(depositOrder.getOrderType())

.qty(1L)

.orderMoney(depositOrder.getOrderMoney())

.build();

ReportReq reportReq = ReportReq.builder()

.shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ)

.data(JSON.toJSONString(req)).build();

financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));

}

/**

* 发送消息队列

*

* @param exchange

* @param routingKey

* @param content

*/

public void sendMessage(String exchange, String routingKey, Object content) {

rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(confirmCallback);

rabbitTemplate.setReturnCallback(returnCallback);

CorrelationData correlationData = new CorrelationData(String.valueOf(IdWorker.getId()));

rabbitTemplate.convertAndSend(exchange, routingKey, getMessage(content),

message -> {

message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);

message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

return message;

}, correlationData);

LogUtil.info("消息队列发送完成,消息为:" + content);

}

消息队列--消费者:单条数据推送ES

package com.dg.mall.financial.rabbitmq.consumer;

/**

*

* 财务报表--消费

*

*/

@Service

public class DepositFinancelReportConsumer {

@Autowired

private BzjFinanceReportFeignService bzjFinanceReportFeignService;

@Resource

private DepositRenewalReportFeignService depositRenewalReportFeignService;

@Resource

private DepositRollReportFeignService depositRollReportFeignService;

@Resource

private DepositAwardEsFeignService depositAwardEsFeignService;

@Resource

private DepositFinanceReportService financeReportService;

@Resource

private SubsidyAwardService subsidyAwardService;

@Resource

private RedisTemplate redisTemplate;

@RabbitListener(queues = "deposit-financel-report-queue")

@RabbitHandler

@Transactional(rollbackFor = Exception.class)

public void financelReportQueue(Message msg, Channel channel) throws Exception {

long deliveryTag = msg.getMessageProperties().getDeliveryTag();

String resMsg = new String(msg.getBody(), "utf-8");

try {

// 根据报表类型区分

ReportReq req = JSON.parseObject(resMsg, new TypeReference() {});

String shortName = req.getShortName();

switch (shortName) {

case ReportTypeConstants.FINANCIEL_BZJGMTJ:

// 批量添加--保证金购买统计表es数据

batchAddOrUpdateDepositOrderEsMapping(channel, deliveryTag, req);

break;

// ……

default:

throw new IllegalStateException("Unexpected value: " + shortName);

}

} catch (Exception e) {

channel.basicNack(deliveryTag, false, true);

throw new RuntimeException();

}

channel.basicAck(deliveryTag, true);

}

/**

* 批量新增保证金存款数据(按"购买周期、购买来源"区分)

*

* @param channel

* @param deliveryTag

* @param req

* @throws IOException

*/

public void batchAddOrUpdateDepositOrderEsMapping(Channel channel, long deliveryTag, ReportReq req) throws IOException {

DepositOrderMappingReq orderMappingReq = JSON.parseObject(req.getData(), new TypeReference() {

});

if (ObjectUtil.isEmpty(orderMappingReq.getOrderTime())) {

channel.basicAck(deliveryTag, true);

return;

}

ArrayList list = new ArrayList<>();

list.add(orderMappingReq);

bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(list);

return;

}

}

总结:

至此为止,功能就算全部完成了,对于我来说,重点就在于如何设计ES索引数据结构和查询模板的聚合使用。

es统计mysql 报表_Elasticsearch(简称ES)实现日报表、月报表、年报表统计,没数据补0...相关推荐

  1. Elasticsearch(简称ES)实现日报表、月报表、年报表统计,没数据补0

    前言: 这是1张相对简单些报表示例,用sql语句也能实现,但主要是为了便于大家理解ES聚合实现报表的使用方法. 之前写过"java实现日报表.月报表统计,没数据补0"文章,http ...

  2. java实现日报表、月报表统计,没数据补0

    产品需求: 1.日报表 2.月报表 需求点: 前端传日期或月份区间,当数据库中指定的日期或月份没数据时也需要界面显示,但领取人数和使用人数需要自动补齐0: 举例:2020-11-28.2020-11- ...

  3. mysql 怎么查询结果补0_mysql查询连续时间数据——无数据补0

    用D3来画统计图,数据是从mysql数据库中查出来的.mysql 统计图是关于某段时间内,每一年/每个月/每周的微博数量的折线图.数据表是一条微博为一条记录,每条记录有日期这个字段.sql 首先考虑用 ...

  4. MySQL按天统计一周没有数据补0

    背景:统计图表所需数据,当天没有数据时需要补0 思路:关联查询(所需时间列表与统计结果) 步骤: 第一步:获取一周的日期列表 SELECT @cdate := DATE_ADD(@cdate, INT ...

  5. sqlserver查询补全时间_mssql 按日期分组(group by)查询统计的时候,没有数据补0的解决办法...

    摘要: 下文讲述一次报表制作的需求, 需制作一个月的销量的数据汇总,如果其中某一天没有数据,那么就补0处理 例: /* 统计2018-4月份的销量统计, 无数据的天补0 */ ---建立基础数据 cr ...

  6. MySQL时间段查询,无数据补0

    原文地址: http://www.cnblogs.com/LUA123/p/6155414.html 上一节提到分时间段统计,可是无数据的时候不显示,而此时我们需要让他显示0. 首先我们需要建一个时间 ...

  7. mysql查询补齐12个月_MySQL查询12个月数据,无数据补0

    1.数据库  和表就用自己的,下面是代码 select case month(shi_com_time) when '1' then sum(mileage) else 0 end as 一月份, c ...

  8. mysql统计分析,无数据补0

    如题 这个分两种情况: 不需要补0 -- 查询最近七天工单 select date_format(createTime,"%Y-%m-%d") as days,count(*) a ...

  9. mysql 取当天、昨天、上一个月、当前月和下个月数据

    留个记录.. 获取当前月数据 SELECT *FROM 表 WHERE DATE_FORMAT(字段名,'%Y%m')=DATE_FORMAT(CURDATE(),'%Y%m) SELECT *FRO ...

最新文章

  1. java调用存储过程 oracle_java调用oracle存储过程
  2. 众推平台架构——分布式爬虫
  3. ST17H26只pwm波形特征
  4. hdu 5327 Olympiad
  5. nodejs cluster ip hash_redis集群架构了解一下?一致性hash了解吗?
  6. EIGRP负载均衡实验(如有疑问,请留言)
  7. python机器学习案例系列教程——文档分类器,朴素贝叶斯分类器,费舍尔分类器
  8. jquery 加载提示框
  9. java茌首字母_汉字获取拼音首字母(1)
  10. dos下查看shal值和Md5步骤
  11. 2006中秋节短信,最新中秋节祝福短信
  12. 注册机偷懒写法1、之直接扣代码
  13. Java 生成N位随机数的方法
  14. 滴滴打车2015-2016
  15. 老板掌控公司方向的三张图
  16. Linux中磁盘分区清理方法
  17. [].shift.call( arguments ) 和 [].slice.call( arguments ) 解释
  18. 三十七度的夏天挤公车
  19. The 2021 Sichuan Provincial 四川省赛 L. Spicy Restaurant(多源bfs)
  20. 关于做PDF的FAQ(一)~(四) 1

热门文章

  1. Windows 7 ship party
  2. python(十):模块相关、操作Redis、操作Excel
  3. Linux_基础_磁盘管理
  4. 在Visual Studio 2017中找不到.NET Framework 4.6.2
  5. EJB的beans们
  6. Python全栈开发之11、进程和线程
  7. JavaScript快速入门(三)——JavaScript语句
  8. 初识php的笔记(基础知识)
  9. 匿名管道 c++实现
  10. AndroidStudio_开发工具的设置_快捷键设置_编辑器设置---Android原生开发工作笔记72