作者介绍

蒋鹏程,苏州万店掌软件技术有限公司

前言

CloudCanal 近期提供了自定义代码构建宽表能力,我们第一时间参与了该特性内测,并已落地生产稳定运行。开发流程详见官方文档 《CloudCanal自定义代码实时加工》。

能力特点包括:

  • 灵活,支持反查打宽表,特定逻辑数据清洗,对账,告警等场景
  • 调试方便,通过任务参数配置自动打开 debug 端口,对接 IDE 调试
  • SDK 接口清晰,提供丰富的上下文信息,方便数据逻辑开发

本文基于我们业务中的实际需求(MySQL -> ElasticSearch 宽表构建),梳理一下具体的开发调试流程,希望对大家有所帮助。

使用案例

案例一:商品表和SKU宽表行构建

业务背景

在对接用户的小程序进行商品搜索时,需要如下几个能力

  1. 基于分词的全文索引
  2. 同时搜索不同表中的字段

需要全文索引的初衷是希望用户搜索商品的关键词就可以搜索到想要的商品。这在传统数据库中一般支持的都比较弱甚至不支持,因此需要借助 ES 分词器搜索。

而第二个能力主要是由于业务数据通常分布在多个表中,但是 ES 并不能像需要关系型数据库那样联表查询,CloudCanal 自定义代码的能力则整号解决了我们多表关联的痛点。

业务流程

在使用 CloudCanal 总体的流程变得十分清晰,在 CloudCanal 层面通过订阅表结合自定义代码中的反查数据库以及数据处理,可以直接生成可以写到对端 ES 的宽表行。

表结构

准备的 mysql 表结构如下,一个商品会对应多个 SKU,我们在对端创建好索引,其中的 sku_detail 保存一个商品关联的 SKU 信息,是一个典型的一对多场景。

ES mapping 中的字段对应主表 tb_enterprise_goods 中字段,额外新增的 sku_detail 字段就是我们需要从子表 tb_enterprise_sku 中同步的数据。

## 商品表
CREATE TABLE `tb_enterprise_goods` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(64) NOT NULL DEFAULT '' COMMENT '商品名称',`enterprise_id` int(11) NOT NULL DEFAULT '0' COMMENT '企业id',`goods_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商家商品编号',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9410 DEFAULT CHARSET=utf8mb4;
## SKU表
CREATE TABLE `tb_enterprise_sku` (`id` int(11) NOT NULL AUTO_INCREMENT,`enterprise_goods_id` int(11) NOT NULL COMMENT '企业商品id',`name` varchar(255) NOT NULL DEFAULT '' COMMENT 'sku{1:2,2:1}',`sku_no` varchar(255) DEFAULT '' COMMENT '商品sku编码',`scan_goods` varchar(255) CHARACTER SET utf8 NOT NULL DEFAULT '' COMMENT 'sku条形码',PRIMARY KEY (`id`),
) ENGINE=InnoDB AUTO_INCREMENT=14397 DEFAULT CHARSET=utf8mb4 COMMENT='企业 sku';

ES 索引如下:

      "enterprise_id": {"type": "integer"},"goods_no": {"type": "text","analyzer": "custom_e","fields": {"keyword": {"type": "keyword"}}},"id": {"type": "integer"},"name": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart","fields": {"standard": {"type": "text","analyzer": "standard"},"keyword":{"type": "keyword"}},"fielddata": true},"sku_detail": {"type": "nested","properties": {"id": {"type": "integer"},"sku_no": {"type": "text","analyzer": "custom_e","fields": {"keyword": {"type": "keyword"}}},"scan_goods": {"type": "text","analyzer": "custom_e","fields": {"keyword": {"type": "keyword"}}}

注:为了方便大家理解,此处表字段进行了缩减

自定义代码工作流程

自定义代码源码

public List<CustomRecord> addData(CustomRecord customRecord, DataSource dataSource) {List<CustomRecord> customRecordList=new ArrayList<>();String idStr = (customRecord.getFieldMapAfter().get("id")).toString();List<EnterpriseSku> enterpriseSkuList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));if (enterpriseSkuList.size() > 0) {Map<String, Object> addFieldValueMap = new LinkedHashMap<>();addFieldValueMap.put("sku_detail", JSONArray.parseArray(JSON.toJSONString(enterpriseSkuList)));RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap);}customRecordList.add(customRecord);return customRecordList;}public List<CustomRecord> updateData(CustomRecord customRecord, DataSource dataSource) {List<CustomRecord> customRecordList=new ArrayList<>();String idStr = (customRecord.getFieldMapAfter().get("id")).toString();List<EnterpriseSku> enterpriseSkuList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));if (enterpriseSkuList.size() > 0) {Map<String, Object> addFieldValueMap = new LinkedHashMap<>();addFieldValueMap.put("sku_detail", JSONArray.parseArray(JSON.toJSONString(enterpriseSkuList)));RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap);}customRecordList.add(customRecord);return customRecordList;}private List<EnterpriseSku> tryQuerySourceDs(DataSource dataSource, Integer id) {try(Connection connection = dataSource.getConnection();PreparedStatement ps = connection.prepareStatement("select * from `live-mini`.tb_enterprise_sku where is_del=0 and enterprise_goods_id=" + id)) {ResultSet resultSet = ps.executeQuery();BeanListHandler<EnterpriseSku> bh = new BeanListHandler(EnterpriseSku.class);List<EnterpriseSku> enterpriseSkuList = bh.handle(resultSet);return enterpriseSkuList;} catch (Exception e) {esLogger.error(e.getMessage());return new ArrayList<>();}}

思路

customRecord 对象即自定义代码传入的参数,传入的 id 为子表 tb_enterprise_sku 的外键 enterprise_goods_id,查询出子表关于这个外键的所有数据,放入 addFieldValueMap 中,再利用源码提供的方法RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap),对 customRecord 进行加工。

创建任务步骤

新建源端对端数据源 选择订阅表及同步到对端的索引 选择同步字段,选择自定义包 完成创建任务

实现效果

{"_index" : "live-mini_pro_enterprise_goods_sku_view","_type" : "_doc","_id" : "17385","_score" : 12.033585,"_source" : {"img" : "https://ovopark.oss-cn-hangzhou.aliyuncs.com/wanji/2020-11-30/1606786889982.jpg","category_name" : "无类目","is_grounding" : 1,"del_time" : "2021-11-01T17:13:32+08:00","goods_no" : "","distribute_second" : 0.0,"uniform_proportion" : 0,"description" : "赠送私域直播流量转化平台万集&线上商城","video" : "","self_uniform_proportion" : 0,"update_time" : "2021-11-01T17:13:32+08:00","allocate_video" : null,"self_commission_properation" : 0.0,"category_id" : 0,"is_promote" : 0,"price" : 0.03,"is_distributor_self" : 0,"limit_purchases_max_quantity" : 0,"limit_purchases_type" : 0,"is_del" : 0,"is_distributor" : 0,"activity_price" : 0.0,"id" : 17385,"stock" : 0,"distribute_first" : 0.0,"is_distribution_threshold" : 0,"refund_configure" : 1,"create_time" : "2021-11-01T17:13:32+08:00","scan_goods" : "","limit_purchases_cycle" : 0,"is_sku" : 1,"allocate_mode" : 0,"sku_detail" : [{"scan_goods" : "","sku_no" : "","id" : "19943"}],"enterprise_id" : 24,"is_delivery" : 0,"is_limit_purchases" : 0,"name" : "测试商品测试商品测试商品测试商","goods_type" : 0,"goods_order" : 0,"ts" : "2021-11-01T17:16:42+08:00","delivery_price" : 0.0}}

案例二:订单表、商品表宽表构建

业务背景

小程序商城中需要展示猜你喜欢的商品,对猜你喜欢商品是根据用户购买商品的频率来决定,主要涉及订单表,订单商品表,用户表,商品表等,使用ES 查询同样面临多表无法 join 的问题,本案例中依然采用 CloudCanal 自定义代码同步为扁平化数据。

业务原使用技术及问题

同步 ES 的方案原先使用 logstash 的方式全量同步数据,由于数据量的问题,同步数据放在每日的凌晨,带来的问题为,数据同步不及时,并且只能是全量风险比较高。多次出现删除索引数据后并没有同步的情况。

表结构

CREATE TABLE `tb_order` (`id` int(11) NOT NULL AUTO_INCREMENT,`order_sn` varchar(32) NOT NULL COMMENT '订单编号',`user_id` int(11) NOT NULL COMMENT '用户 id',`user_name` varchar(255) DEFAULT NULL COMMENT '用户名称',`user_phone` varchar(11) DEFAULT NULL COMMENT '用户电话',`store_id` int(11) NOT NULL COMMENT '门店 id',`enterprise_id` int(11) DEFAULT '1' COMMENT '企业id',`order_type` int(11) NOT NULL COMMENT '0:快递配送;1:门店自取; 2:美团配送即时单; 3:美团即时配送预约单;',`order_status` tinyint(11) DEFAULT '0' COMMENT '原订单状态:1:未付款,3:待发货/待打包,5:(待收货/待取货),6:交易完成,7:订单失效,8:交易关闭, 13:用戶取消,18:商家强制关闭,19同意退款但是退款失敗(未用到),30:美团即时配送状态异常',`total_price` decimal(10,2) DEFAULT '0.00' COMMENT '订单总价',PRIMARY KEY (`id`,`total_goods_weight`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=18630 DEFAULT CHARSET=utf8mb4 COMMENT='订单表';CREATE TABLE `tb_order_goods` (`id` int(11) NOT NULL AUTO_INCREMENT,`user_id` int(11) NOT NULL COMMENT '用户 id',`order_id` int(11) NOT NULL COMMENT '订单 id',`goods_id` int(11) NOT NULL COMMENT '订单商品 id',`enterprise_goods_id` varchar(11) DEFAULT NULL COMMENT '企业商品id',`name` varchar(512) DEFAULT '' COMMENT '订单商品名称',`spec` varchar(100) DEFAULT NULL COMMENT '规格属性',`img` varchar(100) DEFAULT '' COMMENT '订单商品图片',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19159 DEFAULT CHARSET=utf8mb4 COMMENT='订单商品表';

ES 索引字段

"store_id":{"type": "integer"},"user_id":{"type": "integer"},"sex":{"type": "integer"},"birthday":{"type": "keyword"},"goods_name":{"type": "text","analyzer" : "ik_max_word","search_analyzer" : "ik_smart","fields": {"keyword":{"type": "keyword"}},"fielddata": true},"goods_type":{"type": "integer"},"order_goods_id":{"type": "integer"},"enterprise_goods_id":{"type": "integer"},"goods_price":{"type": "double"},"order_id":{"type": "integer"},"order_create_time":{"type": "date"}

注:ES表结构中涉及多张表,为了方便举例,这边只贴出2张表。es_doc展示纬度为订单商品纬度。

实现流程

订阅订单表 订阅字段 画出横线的即为需要同步的字段,有一个点需要特别注意:ES 中需要展示的字段一定要勾上同步,不勾上的话在自定义代码中 add 后 也不会被同步 官方给出的解释为字段黑白名单。 这里有几个细节点,订阅的表的维度并非 ES 存储数据的维度,所以这边的 id 并不是 ES 的 _id,对于这种需要在源端同步必须传的字段,设置对端字段可以随意设置一个对端已有的字段,在自定义代码中可以灵活的去重新配置需要同步的字段。(如果设置默认,ES 的 index 会创建出这个字段,这显然不是我们想要看到的效果)

业务流程

代码实现

查询扁平化数据

SELECTto2.store_id,tuc.id AS user_id,tuc.sex AS sex,tuc.birthday,tog.NAME AS goods_name,tog.goods_type,tog.goods_id AS order_goods_id,tog.goods_price,tog.create_time AS order_create_time,tog.id AS order_id,tog.enterprise_goods_id AS enterprise_goods_id
FROM`live-mini`.tb_order to2INNER JOIN `live-mini`.tb_order_goods tog ON to2.id = tog.order_id AND tog.is_del = 0 AND to2.user_id = tog.user_idINNER JOIN `live-mini`.tb_user_c tuc ON to2.user_id = tuc.id AND tuc.is_del = 0
WHEREto2.is_del = 0 AND to2.id= #{占位}
GROUP BY tog.id

思路:自定义代码获取 order 表的主键后,查询上面的 SQL,先将原 customRecord 中数据删除,再以查询出的结果维度新增数据。修改的逻辑亦如此。

public List<CustomRecord> addData(CustomRecord customRecord, DataSource dataSource) {List<CustomRecord> customRecordList=new ArrayList<>();String idStr = (customRecord.getFieldMapAfter().get("id")).toString();List<OrderGoods> orderGoodsList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));RecordBuilder.modifyRecordBuilder(customRecord).deleteRecord();if (orderGoodsList.size() > 0) {for (OrderGoods orderGoods:orderGoodsList){//添加需要的行和列Map<String,Object> fieldMap=BeanMapTool.beanToMap(orderGoods);customRecordList.add(RecordBuilder.createRecordBuilder().createRecord(fieldMap).build());}}return customRecordList;}public List<CustomRecord> updateData(CustomRecord customRecord, DataSource dataSource) {List<CustomRecord> customRecordList=new ArrayList<>();String idStr = (customRecord.getFieldMapAfter().get("id")).toString();List<OrderGoods> orderGoodsList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));RecordBuilder.modifyRecordBuilder(customRecord).deleteRecord();if (orderGoodsList.size() > 0) {for (OrderGoods orderGoods:orderGoodsList){//添加需要的行和列Map<String,Object> fieldMap=BeanMapTool.beanToMap(orderGoods);customRecordList.add(RecordBuilder.createRecordBuilder().createRecord(fieldMap).build());}}return customRecordList;}private List<OrderGoods> tryQuerySourceDs(DataSource dataSource, Integer id) {String sql="SELECT to2.store_id,tuc.id AS user_id,tuc.sex AS sex,tuc.birthday,tog.NAME AS goods_name,tog.goods_type,tog.goods_id AS order_goods_id,tog.goods_price,tog.create_time AS order_create_time,tog.id AS order_id,tog.enterprise_goods_id AS enterprise_goods_id FROM `live-mini`.tb_order to2 INNER JOIN `live-mini`.tb_order_goods tog ON to2.id = tog.order_id  AND tog.is_del = 0  AND to2.user_id = tog.user_id INNER JOIN `live-mini`.tb_user_c tuc ON to2.user_id = tuc.id AND tuc.is_del = 0  WHERE to2.is_del = 0  and to2.id=";try(Connection connection = dataSource.getConnection();PreparedStatement ps = connection.prepareStatement(sql + id+" GROUP BY tog.id")) {ResultSet resultSet = ps.executeQuery();BeanListHandler<OrderGoods> bh = new BeanListHandler(OrderGoods.class);List<OrderGoods> orderGoodsList = bh.handle(resultSet);return orderGoodsList;} catch (Exception e) {esLogger.error(e.getMessage());return new ArrayList<>();}}

实现效果

 {"_index" : "live-mini-order-pro","_type" : "_doc","_id" : "359","_score" : 1.0,"_source" : {"goods_type" : 0,"order_id" : 359,"order_goods_id" : 450,"order_create_time" : "2020-12-22T10:45:20.000Z","enterprise_goods_id" : 64,"goods_name" : "【老客户专享】万店掌2021新年定制台历","sex" : 2,"goods_price" : 1.0,"user_id" : 386,"store_id" : 1,"birthday" : ""}}

写在最后

CloudCanal 的自定义代码很好地解决了我们多表关联同步 ES 的问题,简洁易用的界面和有深度的功能都令人印象深刻,期待 CloudCanal 更多新能力。关于 CloudCanal 自定义代码的能力,也欢迎大家与我交流。

参与内测

CloudCanal 会不断提供一些预览的能力,包括新数据链路, 优化能力,功能插件。本文所描述的自定义代码能力目前也处于内测阶段。如需体验,可添加我们小助手(微信号:suhuayue001)进行了解和试用。

加入CloudCanal粉丝群掌握一手消息和获取更多福利,请添加我们小助手微信:suhuayue001

CloudCanal-免费好用的企业级数据同步工具,欢迎品鉴。 了解更多产品可以查看官方网站: http://www.clougence.com CloudCanal社区:https://www.askcug.com/

[用户分享]CloudCanal助力万店掌MySQL同步ES构建宽表场景相关推荐

  1. 实现MySQL同步数据到ES构建宽表

    作者介绍 Ceven,德勤乐融(北京)科技有限公司 邮箱:likailin@deqinyuerong.com 前言 CloudCanal 近期提供了自定义代码构建宽表能力,我们第一时间参与了该特性内测 ...

  2. canal实现mysql同步Elasticsearch数据linux中安装ELK

    linux中ELK数据同步 前言 ElasticSearch安装 下载安装包&&添加es用户 启动 ElasticSearch HEAD安装 下载&&安装 配置 设置跨 ...

  3. go-mysql-elasticsearch+mysql 同步 ElasticSearch(标贝科技)

    标贝科技 https://ai.data-baker.com/#/?source=qwer12 填写邀请码fwwqgs,每日免费调用量还可以翻倍 一.Elasticsearch:https://www ...

  4. 业界分享 | 美团到店综合知识图谱的构建与应用

    美团到店综合业务涵盖了本地生活中的休闲玩乐.丽人.亲子.结婚.宠物等多个行业.为了不断提升到店综合业务场景下的供需匹配效率,美团深入挖掘用户在本地生活中的多样化需求,构建了以用户需求节点为中心并链接商 ...

  5. 互联网日报 | 2月1日 星期一 | 苏宁宣布云网万店组织架构;威马汽车完成上市辅导;美团上线医疗健康品牌“百寿健康网”...

    今日看点 ✦ 威马汽车已完成上市辅导,拟登陆科创板 ✦ 苏宁宣布云网万店组织架构:集团总裁任峻担任总裁 ✦ 美团上线医疗健康品牌"百寿健康网",再次布局互联网医疗 ✦ 官方发声:P ...

  6. 互联网晚报 | 8月12日 星期四 | 苏宁易购零售云将迈入“万店时代”;理想汽车今日港股上市;好未来励步推素质教育新产品...

    今日看点 ✦ 国务院教育督导办:对各地落实"双减"情况建立半月通报制度 ✦ 恒大回应出售恒大汽车:考虑出售旗下部分资产,并非整体出售 ✦ 理想汽车:港股IPO募资约115.5亿港元 ...

  7. 国际化,“万店”瑞幸咖啡的新征程

    出品 | 何玺 排版 | 叶媛 瑞幸咖啡保持高速增长! 5月1日,瑞幸咖啡公布了2023年Q1财报.数据显示,瑞幸咖啡营收额.客户数.以及门店总数等核心数据,均呈现出强劲的上升势头. 01 瑞幸咖啡2 ...

  8. 经济内循环时代,我们需要更多的“云网万店”新物种

    苏宁又有大动作. 11月30日,苏宁易购宣布旗下"云网万店"与投资机构签订协议,完成总额高达60亿元的A轮融资. 01 "云网万店"是啥新物种? 对不了解&qu ...

  9. 用户分享率高达87% KilaKila恋爱小程序的女性营销逻辑

    8月4日,二次元兴趣社区KilaKila推出了暑期大型活动"KilaKila梦幻扭蛋季",活动期间推出的恋爱类微信小程序"一日Honey"上线不到一周,日活即达 ...

最新文章

  1. 一步一步教你如何用python做词云_一步一步教你如何用Python做词云
  2. faster rcnn源码解读总结
  3. 中南林科大c语言程序设计,2017年中南林业科技大学计算机与信息工程学院802C语言与数据结构之C程序设计考研题库...
  4. aspnet是前端还是后端_项目开发中无法回避的问题:前端和后端如何合作和并行工作?...
  5. 机器人码垛手持式编程_三分钟告诉你企业为什么要使用全自动码垛机械手!
  6. JAVA学习day10
  7. 基于 iso 镜像构建 yum 本地源
  8. IMPLEMENT_DYNCREATE(CFileView, CView)
  9. 怎样将手机屏幕投射到电脑
  10. 如何解决电脑横屏问题
  11. 列宽一字符等于多少厘米_Excel中行高与列宽单位和厘米的转换
  12. 如何屏蔽电脑微信自动更新
  13. 机体坐标系的角速度分量
  14. 2、使用FTP客户端连接FTP服务器
  15. 极客日报:《英雄联盟》回应服务器崩了:官方直接回退了旧版本;Deno 1.19 发布|极客头条
  16. 直播视频转换为文稿,分分钟就可以实现的方法来了
  17. 赵栋201771010137《面向对象程序设计(java)》第六周学习总结
  18. 传世基本架构-客户端(全局变量与总体执行流程)
  19. 弹幕调试 jquery.danmu.js
  20. 【原创】畅言实现单点登录的设计流程和技术细节(2/2)

热门文章

  1. 『往事』之---我的童年少年时代(续)
  2. scratch教程----4.科赫雪花新积木的运用
  3. 数据可视化之信息图表
  4. 2021级-JAVA07 常用类(字符串)
  5. Centos8.2云服务器下安装和卸载Java JDK
  6. stm31.js下拉菜单说明
  7. js 获取用户当前使用的 浏览器 判断是否谷歌浏览器
  8. 组件、子系统、包区别联系
  9. mysql关键字冲突解决办法_mysql 关键字 及关键字冲突解决办法
  10. 数据分析——泰坦尼克号预测