1.cancl安装

下载路径:cancl下载路径

下载完安装包,安装完成后,需要修改conf\example路径下配置文件instance.properties:设置position info和table meta tsdb info下面的属性即可。

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
#canal.instance.tsdb.enable= true
#canal.instance.tsdb.url= jdbc:mysql://127.0.0.1:3306/maruko?useUnicode=true&characterEncoding=utf-8&useSSL=false
#canal.instance.tsdb.dbUsername= root
#canal.instance.tsdb.dbPassword= maruko#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername= root
canal.instance.dbPassword= maruko
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

需要注意的是,只需要配置canal.instance.master.address,canal.instance.dbUsername,canal.instance.dbPassword即可,不要去配置canal.instance.tsdb相关属性,如果配置了启动会报错的。

2.依赖引入

引入依赖

        <!--canal--><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency>

3.监听配置

去实现EntryHandler接口,添加自己的业务逻辑,比如缓存的删除更新插入,实现对增删改查的逻辑重写。

@CanalTable("kafka_test")
@Component
@Slf4j
public class KafkaHandler implements EntryHandler<KafkaTest> {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void insert(KafkaTest item) {log.info("canal插入insert," + item);// 写数据到redisredisTemplate.opsForValue().set("kafka_test" + item.getId(), item);}@Overridepublic void update(KafkaTest before, KafkaTest after) {log.warn("更新前update before," + before);log.warn("更新后update after," + after);// 写数据到redisredisTemplate.opsForValue().set("kafka_test" + after.getId(), after);}@Overridepublic void delete(KafkaTest item) {log.warn("删除delete," + item);// 删除数据到redisredisTemplate.delete("kafka_test" + item.getId());}
}

4.yml配置

默认destination就是example,如果修改了服务安装里面的配置,这儿需要同步修改。

#cancl配置
canal:server: localhost:11111  #你canal的地址destination: example

如果不想让控制台一直打印某些信息,可以配置如下配置屏蔽AbstractCanalClient类process()一直打印this.log.info(“获取消息 {}”, message)。

logging:level:tracer: trace # 开启trace级别日志,在开发时可以开启此配置,则控制台可以打印es全部请求信息及DSL语句,为了避免重复,开启此项配置后,可以将EE的print-dsl设置为false.#top.javatool.canal.client: warn  #禁止AbstractCanalClient 打印常規日志 获取消息 {}

5.第二种方案(解决数据库存在下划线,用上述方法,某些字段会为空)

上面的方式只适合数据库字段和实体类字段,属性完全一致的情况;当数据库字段含有下划线的适合,因为我们直接去监听的binlog日志,里面的字段是数据库字段,因为跟实体类字段不匹配,所以会出现字段为空的情况,这个适合需要去获取列的字段,对字段进行属性转换,实现方法如下:

引入依赖

        <dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency>

创建监听

@CanalEventListener
@Slf4j
public class KafkaListener {@Autowiredprivate RedisTemplate redisTemplate;/*** @param eventType 当前操作数据库的类型* @param rowData   当前操作数据库的数据*/@ListenPoint(schema = "maruko", table = "kafka_test")public void listenKafkaTest(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {KafkaTest kafkaTestBefore = new KafkaTest();KafkaTest kafkaTestAfter = new KafkaTest();//遍历数据获取k-vList<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();getEntity(beforeColumnsList, kafkaTestBefore);log.warn("获取到提交前的对象为:" + kafkaTestBefore);getEntity(afterColumnsList, kafkaTestAfter);log.warn("获取到提交后的对象为:" + kafkaTestAfter);//判断是新增还是更新还是删除switch (eventType.getNumber()) {case CanalEntry.EventType.INSERT_VALUE:case CanalEntry.EventType.UPDATE_VALUE:redisTemplate.opsForValue().set("kafka_test" + kafkaTestAfter.getId(), kafkaTestAfter);break;case CanalEntry.EventType.DELETE_VALUE:redisTemplate.delete("kafka_test" + kafkaTestBefore.getId());break;}}/*** 遍历获取属性转换为实体类** @param columnsList* @param kafkaTest*/private void getEntity(List<CanalEntry.Column> columnsList, KafkaTest kafkaTest) {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");for (CanalEntry.Column column : columnsList) {String name = column.getName();String value = column.getValue();switch (name) {case KafkaTest.ID:if (StringUtils.hasLength(value)) {kafkaTest.setId(Integer.parseInt(value));}break;case KafkaTest.CONTENT:if (StringUtils.hasLength(value)) {kafkaTest.setContent(value);}break;case KafkaTest.PRODUCER_STATUS:if (StringUtils.hasLength(value)) {kafkaTest.setProducerStatus(Integer.parseInt(value));}break;case KafkaTest.CONSUMER_STATUS:if (StringUtils.hasLength(value)) {kafkaTest.setConsumerStatus(Integer.parseInt(value));}break;case KafkaTest.UPDATE_TIME:if (StringUtils.hasLength(value)) {try {kafkaTest.setUpdateTime(format.parse(value));} catch (ParseException p) {log.error(p.getMessage());}}break;case KafkaTest.TOPIC:if (StringUtils.hasLength(value)) {kafkaTest.setTopic(value);}break;case KafkaTest.CONSUMER_ID:if (StringUtils.hasLength(value)) {kafkaTest.setConsumerId(value);}break;case KafkaTest.GROUP_ID:if (StringUtils.hasLength(value)) {kafkaTest.setGroupId(value);}break;case KafkaTest.PARTITION_ID:if (StringUtils.hasLength(value)) {kafkaTest.setPartitionId(Integer.parseInt(value));}break;case KafkaTest.PRODUCER_OFFSET:if (StringUtils.hasLength(value)) {kafkaTest.setProducerOffset(Long.parseLong(value));}break;case KafkaTest.CONSUMER_OFFSET:if (StringUtils.hasLength(value)) {kafkaTest.setConsumerOffset(Long.parseLong(value));}break;case KafkaTest.TEST:if (StringUtils.hasLength(value)) {kafkaTest.setTest(value);}break;}}}}

实体类

@Data
@TableName("kafka_test")
public class KafkaTest {public static final String ID = "id";public static final String CONTENT = "content";public static final String PRODUCER_STATUS = "producer_status";public static final String CONSUMER_STATUS = "consumer_status";public static final String UPDATE_TIME = "update_time";public static final String TOPIC = "topic";public static final String CONSUMER_ID = "consumer_id";public static final String GROUP_ID = "group_id";public static final String PARTITION_ID = "partition_id";public static final String PRODUCER_OFFSET = "consumer_offset";public static final String CONSUMER_OFFSET = "producer_offset";public static final String TEST = "test";@TableId(type = IdType.AUTO)private Integer id;@TableField("content")private String content;@TableField("producer_status")private Integer producerStatus;@TableField("consumer_status")private Integer consumerStatus;@TableField("update_time")private Date updateTime;@TableField("topic")private String topic;@TableField("consumer_id")private String consumerId;@TableField("group_id")private String groupId;@TableField("partition_id")private int partitionId;@TableField("consumer_offset")private Long consumerOffset;@TableField("producer_offset")private Long producerOffset;@TableField("test")private String test;
}

springboot集成canal,实现缓存实时刷新,驼峰问题相关推荐

  1. 【SpringBoot应用篇】SpringBoot集成j2cache二级缓存框架

    [SpringBoot应用篇]SpringBoot集成j2cache二级缓存框架 j2cache介绍 j2cache入门使用 pom application.yml caffeine.properti ...

  2. SpringBoot整合canal实现缓存更新

    canal是阿里巴巴的开源组件,用于监听MySQL的binlog日志而实现消息的同步机制,提供增量数据订阅和消费. canal必须基于MySQL的主从架构才可使用,canal会伪装成MySQL的一个s ...

  3. springboot心跳检测_springboot websocket 实时刷新 添加心跳机制(亲测可用版)

    思路 在我之前的一篇文章当中写到了websocket的实时刷新,但是有个问题没有解决,就是长时间没有数据的时候,这个连接就会自动断开,然后再次进行连接的话,需要再次进行连接.如果加入心跳机制的话,10 ...

  4. SpringBoot + RabbitMQ + canal实现缓存更新操作

    之前已经分享了一篇关于springboot + canal实现缓存更新的功能,但这种方案存在缺陷,当项目属于微服务架构时,一个服务可能有多个实例,即使多个实例都在监听canal的消息,但是只有一个实例 ...

  5. springboot项目更改代码后实时刷新问题

    在spring boot使用的过程中, 发现我修改了静态文件, 前台刷新后, 没有任何变化, 必须重新启动, 才能看到, 这简直不能让人接受. 那有什么方法来解决这个问题呢. Baidu之后, 得到了 ...

  6. Spring Boot学习总结(21)——SpringBoot集成Redis等缓存以注解的方式优雅实现幂等,防千万次重复提交实例代码

    前言 在实际的开发项目中,一个对外暴露的接口往往会面临很多次请求,我们来解释一下幂等的概念:任意多次执行所产生的影响均与一次执行的影响相同.按照这个含义,最终的含义就是 对数据库的影响只能是一次性的, ...

  7. SpringBoot 集成 layering-cache 实现两级缓存调研与实践

    前言 对于系统查多改少的数据,可以通过缓存来提升系统的访问性能.一般情况下我们会采用 Redis ,但是如果仅仅依赖 Redis 很容易出现缓存雪崩的情况.为了防止缓存雪崩可以通过 Redis 高可用 ...

  8. SpringBoot项目实现配置实时刷新功能

    需求描述:在SpringBoot项目中,一般业务配置都是写死在配置文件中的,如果某个业务配置想修改,就得重启项目.这在生产环境是不被允许的,这就需要通过技术手段做到配置变更后即使生效.下面就来看一下怎 ...

  9. springboot集成webSocket实现实时推送

    springboot集成webSocket实现实时推送 webSocket实现推送 webSocket是什么? 需求说明 websocket集成步骤 pom.xml webSocket实现 自定义处理 ...

最新文章

  1. centos7搭建svn,并用http访问
  2. 1024,千家公司程序员幸福指数大比拼!最“幸福”的程序员是你吗?
  3. hdu 5265(二分+枚举)
  4. ConcurrentHashMap的源码分析-treeifyBin
  5. python输出运行时间表_Python编程第十二课 了解程序运行的时间
  6. AutoCode For XML(XML解析代码生成器)发布
  7. SpringBoot 2 快速整合 | 统一异常处理
  8. 《恋上数据结构第1季》红黑树(未完)
  9. 历往游戏代码与当前DEADXSPACE项目最新进度。
  10. 隐马尔可夫模型通俗导论
  11. Excel VBA数组使用方法
  12. 141.环形链表(力扣leetcode)博主可答疑该问题
  13. MyBatis 安装下载 及入门案例
  14. LSTM调参经验(细读)
  15. python使用h5py读取mat文件数据
  16. Anaconda => PyCharm => CUDA => cudnn => PyTorch 环境配置
  17. vi打开文件提示Another program may be editing the same file
  18. bat文件刷屏,请规范命名
  19. 游戏测试工程师的光荣与梦想(一)-百炼成钢
  20. python 循序渐进学习:输出线段图案、正方形图案、直角三角形图案、翻转直角三角形图案、带空格直角三角形图案

热门文章

  1. 爬虫之旅(一):爬取b站首页的源代码
  2. 时间精力管理4象限法
  3. centos7常用快捷键
  4. 2018-3-25至2018-8-9的日语笔记
  5. DAX: 复购率计算
  6. SAP ABAP ZBA_R001 查询用户下的角色,事务代码
  7. PO模式项目实战思路分析
  8. 红米10xpro手机图纸
  9. RingAllreduce和NCCL
  10. 科学研究:青少年熬夜更易产生反社会行为,还会延缓大脑发育