实现增量数据索引

上一节中,我们为实现增量索引的加载做了充足的准备,使用到mysql-binlog-connector-java 开源组件来实现MySQL 的binlog监听,关于binlog的相关知识,大家可以自行网络查阅。或者可以mailto:magicianisaac@gmail.com

本节我们将根据binlog 的数据对象,来实现增量数据的处理,我们构建广告的增量数据,其实说白了就是为了在后期能把广告投放到索引服务,实现增量数据到增量索引的生成。Let's code.

  • 定义一个投递增量数据的接口(接收参数为我们上一节定义的binlog日志的转换对象)
/*** ISender for 投递增量数据 方法定义接口** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>*/
public interface ISender {void sender(MysqlRowData rowData);
}
  • 创建增量索引监听器
/*** IncrementListener for 增量数据实现监听** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>* @since 2019/6/27*/
@Slf4j
@Component
public class IncrementListener implements Ilistener {private final AggregationListener aggregationListener;@Autowiredpublic IncrementListener(AggregationListener aggregationListener) {this.aggregationListener = aggregationListener;}//根据名称选择要注入的投递方式@Resource(name = "indexSender")private ISender sender;/*** 标注为 {@link PostConstruct},* 即表示在服务启动,Bean完成初始化之后,立刻初始化*/@Override@PostConstructpublic void register() {log.info("IncrementListener register db and table info.");Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));}@Overridepublic void onEvent(BinlogRowData eventData) {TableTemplate table = eventData.getTableTemplate();EventType eventType = eventData.getEventType();//包装成最后需要投递的数据MysqlRowData rowData = new MysqlRowData();rowData.setTableName(table.getTableName());rowData.setLevel(eventData.getTableTemplate().getLevel());//将EventType转为OperationTypeEnumOperationTypeEnum operationType = OperationTypeEnum.convert(eventType);rowData.setOperationTypeEnum(operationType);//获取模版中该操作对应的字段列表List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType);if (null == fieldList) {log.warn("{} not support for {}.", operationType, table.getTableName());return;}for (Map<String, String> afterMap : eventData.getAfter()) {Map<String, String> _afterMap = new HashMap<>();for (Map.Entry<String, String> entry : afterMap.entrySet()) {String colName = entry.getKey();String colValue = entry.getValue();_afterMap.put(colName, colValue);}rowData.getFieldValueMap().add(_afterMap);}sender.sender(rowData);}
}
开启binlog监听
  • 首先来配置监听binlog的数据库连接信息
adconf:mysql:host: 127.0.0.1port: 3306username: rootpassword: 12345678binlogName: ""position: -1 # 从当前位置开始监听

编写配置类:

/*** BinlogConfig for 定义监听Binlog的配置信息** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>*/
@Component
@ConfigurationProperties(prefix = "adconf.mysql")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BinlogConfig {private String host;private Integer port;private String username;private String password;private String binlogName;private Long position;
}

在我们实现 监听binlog那节,我们实现了一个自定义client CustomBinlogClient,需要实现binlog的监听,这个监听的客户端就必须是一个独立运行的线程,并且要在程序启动的时候进行监听,我们来实现运行当前client的方式,这里我们会使用到一个新的Runnerorg.springframework.boot.CommandLineRunner,let's code.

@Slf4j
@Component
public class BinlogRunner implements CommandLineRunner {@Autowiredprivate CustomBinlogClient binlogClient;@Overridepublic void run(String... args) throws Exception {log.info("BinlogRunner is running...");binlogClient.connect();}
}
增量数据投递

在binlog监听的过程中,我们看到针对于int, String 这类数据字段,mysql的记录是没有问题的,但是针对于时间类型,它被格式化成了字符串类型:Fri Jun 21 15:07:53 CST 2019

--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[{before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}

对于这个时间格式,我们需要关注2点信息:

  • CST,这个时间格式会比我们的时间+ 8h(中国标准时间 China Standard Time UT+8:00)
  • 需要对这个日期进行解释处理

当然,我们也可以通过设置mysql的日期格式来改变该行为,在此,我们通过编码来解析该时间格式:

  /*** Thu Jun 27 08:00:00 CST 2019*/public static Date parseBinlogString2Date(String dateString) {try {DateFormat dateFormat = new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy",Locale.US);return DateUtils.addHours(dateFormat.parse(dateString), -8);} catch (ParseException ex) {log.error("parseString2Date error:{}", dateString);return null;}}

因为我们在定义索引的时候,是根据表之间的层级关系(Level)来设定的,根据代码规范,不允许出现Magic Number, 因此我们定义一个数据层级枚举,来表达数据层级。

/*** AdDataLevel for 广告数据层级** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>*/
@Getter
public enum AdDataLevel {LEVEL2("2", "level 2"),LEVEL3("3", "level 3"),LEVEL4("4", "level 4");private String level;private String desc;AdDataLevel(String level, String desc) {this.level = level;this.desc = desc;}
}
实现数据投递

因为增量数据可以投递到不同的位置以及用途,我们之前实现了一个投递接口com.sxzhongf.ad.sender.ISender,接下来我们实现一个投递类:

@Slf4j
@Component("indexSender")
public class IndexSender implements ISender {/*** 根据广告级别,投递Binlog数据*/@Overridepublic void sender(MysqlRowData rowData) {if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {Level2RowData(rowData);} else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {Level3RowData(rowData);} else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {Level4RowData(rowData);} else {log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));}}private void Level2RowData(MysqlRowData rowData) {if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {List<AdPlanTable> planTables = new ArrayList<>();for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {AdPlanTable planTable = new AdPlanTable();//Map的第二种循环方式fieldValueMap.forEach((k, v) -> {switch (k) {case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:planTable.setPlanId(Long.valueOf(v));break;case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:planTable.setUserId(Long.valueOf(v));break;case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:planTable.setPlanStatus(Integer.valueOf(v));break;case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));break;case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));break;}});planTables.add(planTable);}//投递推广计划planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));} else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {List<AdCreativeTable> creativeTables = new LinkedList<>();rowData.getFieldValueMap().forEach(afterMap -> {AdCreativeTable creativeTable = new AdCreativeTable();afterMap.forEach((k, v) -> {switch (k) {case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:creativeTable.setAdId(Long.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:creativeTable.setType(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:creativeTable.setMaterialType(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:creativeTable.setHeight(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:creativeTable.setWidth(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:creativeTable.setAuditStatus(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:creativeTable.setAdUrl(v);break;}});creativeTables.add(creativeTable);});//投递广告创意creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));}}private void Level3RowData(MysqlRowData rowData) {...}/*** 处理4级广告*/private void Level4RowData(MysqlRowData rowData) {...}
}
投放增量数据到MQ(kafka)

为了我们的数据投放更加灵活,方便数据统计,分析等系统的需求,我们来实现一个投放到消息中的接口,其他服务可以订阅当前MQ 的TOPIC来实现数据订阅。

配置文件中配置TOPIC
adconf:kafka:topic: ad-search-mysql-data--------------------------------------
/*** KafkaSender for 投递Binlog增量数据到kafka消息队列** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>* @since 2019/7/1*/
@Component(value = "kafkaSender")
public class KafkaSender implements ISender {@Value("${adconf.kafka.topic}")private String topic;@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 发送数据到kafka队列*/@Overridepublic void sender(MysqlRowData rowData) {kafkaTemplate.send(topic, JSON.toJSONString(rowData));}/*** 测试消费kafka消息*/@KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")public void processMysqlRowData(ConsumerRecord<?, ?> record) {Optional<?> kafkaMsg = Optional.ofNullable(record.value());if (kafkaMsg.isPresent()) {Object message = kafkaMsg.get();MysqlRowData rowData = JSON.parseObject(message.toString(),MysqlRowData.class);System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));//sender.sender();}}
}

转载于:https://www.cnblogs.com/zhangpan1244/p/11333229.html

[Spring cloud 一步步实现广告系统] 16. 增量索引实现以及投送数据到MQ(kafka)相关推荐

  1. [Spring cloud 一步步实现广告系统] 22. 广告系统回顾总结

    到目前为止,我们整个初级广告检索系统就初步开发完成了,我们来整体回顾一下我们的广告系统. 整个广告系统编码结构如下: 1.mscx-ad 父模块 主要是为了方便我们项目的统一管理 2.mscx-ad- ...

  2. [Spring cloud 一步步实现广告系统] 21. 系统错误汇总

    广告系统学习过程中问题答疑 博客园 Eureka集群启动报错 Answer 因为Eureka在集群启动过程中,会连接集群中其他的机器进行数据同步,在这个过程中,如果别的服务还没有启动完成,就会出现Co ...

  3. [Spring cloud 一步步实现广告系统] 19. 监控Hystrix Dashboard

    在之前的18次文章中,我们实现了广告系统的广告投放,广告检索业务功能,中间使用到了 服务发现Eureka,服务调用Feign,网关路由Zuul以及错误熔断Hystrix等Spring Cloud组件. ...

  4. [Spring cloud 一步步实现广告系统] 15. 使用开源组件监听Binlog 实现增量索引准备...

    MySQL Binlog简介 什么是binlog? 一个二进制日志,用来记录对数据发生或潜在发生更改的SQL语句,并以而进行的形式保存在磁盘中. binlog 的作用? 最主要有3个用途: 数据复制( ...

  5. [Spring cloud 一步步实现广告系统] 13. 索引服务编码实现

    上一节我们分析了广告索引的维护有2种,全量索引加载和增量索引维护.因为广告检索是广告系统中最为重要的环节,大家一定要认真理解我们索引设计的思路,接下来我们来编码实现索引维护功能. 我们来定义一个接口, ...

  6. [Spring cloud 一步步实现广告系统] 12. 广告索引介绍

    索引设计介绍 在我们广告系统中,为了我们能更快的拿到我们想要的广告数据,我们需要对广告数据添加类似于数据库index一样的索引结构,分两大类:正向索引和倒排索引. 正向索引 通过唯一键/主键生成与对象 ...

  7. [Spring cloud 一步步实现广告系统] 14. 全量索引代码实现

    上一节我们实现了索引基本操作的类以及索引缓存工具类,本小节我们开始实现加载全量索引数据,在加载全量索引数据之前,我们需要先将数据库中的表数据导出到一份文件中.Let's code. 1.首先定义一个常 ...

  8. [Spring cloud 一步步实现广告系统] 11. 使用Feign实现微服务调用

    上一节我们使用了Ribbon(基于Http/Tcp)进行微服务的调用,Ribbon的调用比较简单,通过Ribbon组件对请求的服务进行拦截,通过Eureka Server 获取到服务实例的IP:Por ...

  9. feign响应拦截_[Spring cloud 一步步实现广告系统] 11. 使用Feign实现微服务调用

    上一节我们使用了Ribbon(基于Http/Tcp)进行微服务的调用,Ribbon的调用比较简单,通过Ribbon组件对请求的服务进行拦截,通过Eureka Server 获取到服务实例的IP:Por ...

最新文章

  1. python利用自动识别写模块_Python 利用pytesser模块识别图像文字
  2. python实现后台系统的JWT认证
  3. 论文笔记:Inception v1
  4. 2.1.3 操作系统之原语实现对进程的控制
  5. ping 命令使用代理_网络检测知识篇:ping命令使用知识,你知道几点?
  6. mount 挂载光盘
  7. 配置数据库连接池的时候。
  8. prettyping.sh: ping 之美
  9. 2021年中国一次性防护服市场趋势报告、技术动态创新及2027年市场预测
  10. Windows系统安装运行库
  11. EASBOS获取系统状态控制期间
  12. 毕业设计-两轮自平衡小车主控板
  13. codeforces 69A(Young Physicist) Java
  14. 【转载】Ubuntu完全教程,让你成为Ubuntu高手!
  15. Day18 洛谷P1321 单词覆盖还原
  16. 前端学习周报(第一周)
  17. 【Python】数据分析——直方图、散点图、线性回归、多项式回归、拟合度
  18. 少儿编程课程和乐高机器人有什么不同
  19. 厦大2021届大一小学期C语言作业1 数组+字符串+指针+位操作
  20. python中turtle的用法及实例--你的唐僧哥哥

热门文章

  1. C#操作SqlServer数据库的常用对象,及其方法
  2. WPF里ItemsControl的分组实现
  3. python操作RabbitMQ
  4. OSI模型和TCP/IP模型
  5. 10.app后端选择什么开发语言
  6. 动态sql语句输出参数
  7. Python基础(1) - 初识Python
  8. 批量修改MSSQL架构名称
  9. 从csv文件批量创建AD用户,带源码。
  10. 各类JDBC数据库连接方式