易用的canaljava 客户端

canal 自身提供了简单的客户端,数据格式较为复杂,处理消费数据也不太方便,为了方便给业务使用,提供一种直接能获取实体对象的方式来进行消费才更方便。

先说一下实现的思路,首先canal 客户端的消息对象有两种,message 和 flatMessage,分别是普通的消息(protobuf格式)和消息队列的扁平消息(json格式),现在将这两种消息转化为我们直接使用的 model 对象,根据消息中的数据库表名称找到对应的的实体对象,那么如何根据数据库表名找到实体对象呢?

第一种方式,如果我们的实体对象都使用JPA 的 @Table注解来标识表和实体的对应关系,可以使用该注解来找到实体对象和表名的关系

第二种方式,可以使用自定义注解的来标注实体和表名的关系,为解耦各个表的处理,我们使用策略模式来封装各个表的增删改操作

canal 主要客户端类

ClientIdentity

canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的)

CanalConnector

SimpleCanalConnector/ClusterCanalConnector : 两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制

CanalNodeAccessStrategy

SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.

ClientRunningMonitor/ClientRunningListener/ClientRunningData

client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.

Canal 客户端类型

canal 客户端可以主要分以下几种类型

单一ip 直连模式

这种方式下,可以启动多个客户端,连接同一个canal 服务端,多个客户端只有一个client 工作,其他的可以作为冷备,当一个client的挂了,其他的客户端会有一个进入工作模式

缺点:连接同一个服务端,如果服务端挂了将导致不可用

多ip 模式

这种方式下,客户端连接多个canal服务端,一个客户端随机选择一个canal server 消费,当这个server 挂了,会选择另外一个进行消费

缺点:不支持订阅消费

zookeeper 模式

使用zookeeper来server,client 的状态,当两个canal server 连接zookeeper 后,

优先连接的节点作为 活跃节点,client从活跃节点消费,当server挂了以后,从另外一个节点消费

缺点:不支持订阅消费

消息 队列模式

canal 支持消息直接发送到消息队列,从消息队列消费,目前支持的有kafka 和rocketMq,这种方式支持订阅消费

canal 客户端实现

EntryHandler 实体消息处理器

首先定义一个策略接口,定义增加,更新,删除功能,使用java 8声明方法为default,让客户端选择实现其中的方法,提高灵活性,客户端实现EntryHandler接口后,会返回基于handler中的泛型的实例对象,在对应的方法中实现自定义逻辑

public interface EntryHandler {

default void insert(T t) {

}

default void update(T before, T after) {

}

default void delete(T t) {

}

}

定义一个canalClient 的抽象类,封装canal 的链接开启关闭操作,启动一个线程不断去消费canal 数据,依赖一个 messageHandler 封装消息处理的逻辑

public abstract class AbstractCanalClient implements CanalClient {

@Override

public void start() {

log.info("start canal client");

workThread = new Thread(this::process);

workThread.setName("canal-client-thread");

flag = true;

workThread.start();

}

@Override

public void stop() {

log.info("stop canal client");

flag = false;

if (null != workThread) {

workThread.interrupt();

}

}

@Override

public void process() {

if (flag) {

try {

connector.connect();

connector.subscribe(filter);

while (flag) {

Message message = connector.getWithoutAck(batchSize, timeout, unit);

log.info("获取消息 {}", message);

long batchId = message.getId();

if (message.getId() != -1 && message.getEntries().size() != 0) {

messageHandler.handleMessage(message);

}

connector.ack(batchId);

}

} catch (Exception e) {

log.error("canal client 异常", e);

} finally {

connector.disconnect();

}

}

}

}

基于该抽象类,分别提供各种客户端的实现

SimpleCanalClient

ClusterCanalClient

ZookeeperCanalClient

KafkaCanalClient

消息处理器 messageHandler

消息处理器 messageHandler 封装了消息处理逻辑,其中定义了一个消息处理方法

public interface MessageHandler {

void handleMessage(T t);

}

消息处理器可能要适配4种情况,分别是消费message,flatMessage和两种消息的同步与异步消费

消息处理的工作主要有两个

获取增删改的行数据,交给行处理器继续处理

在上下文对象中保存其他的数据,例如库名,表名,binlog 时间戳等等数据

首先我们封装一个抽象的 message 消息处理器,实现MessageHandler接口

public abstract class AbstractMessageHandler implements MessageHandler {

@Override

public void handleMessage(Message message) {

List entries = message.getEntries();

for (CanalEntry.Entry entry : entries) {

if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {

try {

EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName());

if(entryHandler!=null){

CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName())

.executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();

CanalContext.setModel(model);

CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

List rowDataList = rowChange.getRowDatasList();

CanalEntry.EventType eventType = rowChange.getEventType();

for (CanalEntry.RowData rowData : rowDataList) {

rowDataHandler.handlerRowData(rowData,entryHandler,eventType);

}

}

} catch (Exception e) {

throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);

}finally {

CanalContext.removeModel();

}

}

}

}

}

分别定义两个实现类,同步与异步实现类,继承AbstractMessageHandler抽象类

public class SyncMessageHandlerImpl extends AbstractMessageHandler {

public SyncMessageHandlerImpl(List extends EntryHandler> entryHandlers, RowDataHandler rowDataHandler) {

super(entryHandlers, rowDataHandler);

}

@Override

public void handleMessage(Message message) {

super.handleMessage(message);

}

}

public class AsyncMessageHandlerImpl extends AbstractMessageHandler {

private ExecutorService executor;

public AsyncMessageHandlerImpl(List extends EntryHandler> entryHandlers, RowDataHandler rowDataHandler, ExecutorService executor) {

super(entryHandlers, rowDataHandler);

this.executor = executor;

}

@Override

public void handleMessage(Message message) {

executor.execute(() -> super.handleMessage(message));

}

}

RowDataHandler 行消息处理器

消息处理器依赖的行消息处理器主要是将原始的column list 转为 实体对象,并将相应的增删改消息交给相应的hangler对象方法,行消息处理器分别需要处理两种对象,一个是 message的行数据 和 flatMessage 的行数据

public interface RowDataHandler {

void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception;

}

两个行处理器的实现为

public class RowDataHandlerImpl implements RowDataHandler {

private IModelFactory> modelFactory;

public RowDataHandlerImpl(IModelFactory modelFactory) {

this.modelFactory = modelFactory;

}

@Override

public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception {

if (entryHandler != null) {

switch (eventType) {

case INSERT:

Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());

entryHandler.insert(object);

break;

case UPDATE:

Set updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated)

.map(CanalEntry.Column::getName).collect(Collectors.toSet());

Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);

Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());

entryHandler.update(before, after);

break;

case DELETE:

Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());

entryHandler.delete(o);

break;

default:

break;

}

}

}

}

public class MapRowDataHandlerImpl implements RowDataHandler>> {

private IModelFactory> modelFactory;

public MapRowDataHandlerImpl(IModelFactory> modelFactory) {

this.modelFactory = modelFactory;

}

@Override

public void handlerRowData(List> list, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception{

if (entryHandler != null) {

switch (eventType) {

case INSERT:

Object object = modelFactory.newInstance(entryHandler, list.get(0));

entryHandler.insert(object);

break;

case UPDATE:

Object before = modelFactory.newInstance(entryHandler, list.get(1));

Object after = modelFactory.newInstance(entryHandler, list.get(0));

entryHandler.update(before, after);

break;

case DELETE:

Object o = modelFactory.newInstance(entryHandler, list.get(0));

entryHandler.delete(o);

break;

default:

break;

}

}

}

}

IModelFactory bean实例创建工厂

行消息处理的依赖的工厂 主要是是通过反射创建与表名称对应的bean实例

public interface IModelFactory {

Object newInstance(EntryHandler entryHandler, T t) throws Exception;

default Object newInstance(EntryHandler entryHandler, T t, Set updateColumn) throws Exception {

return null;

}

}

CanalContext canal 消息上下文

目前主要用于保存bean实例以外的其他数据,使用threadLocal实现

代码已在github开源canal-client

canal java_易用的 canal java 客户端 canal-client相关推荐

  1. java利用canal监听数据库

    springcloud如何使用canal监听mysql数据库操作 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQ ...

  2. java day47【redis概念 、下载安装 、 命令操作 、持久化操作 、使用Java客户端操作redis】...

    第一章  Redis 1. 概念: redis是一款高性能的NOSQL系列的非关系型数据库 1.1.什么是NOSQL NoSQL(NoSQL = Not Only SQL),意即"不仅仅是S ...

  3. 电信网管中的Java客户端(二)

    电信网管中的Java客户端(二)<?xml:namespace prefix = o ns = "urn:schemas-microsoft-com:office:office&quo ...

  4. TuGraph开源JAVA客户端工具TuGraph-OGM,无缝对接JAVA开发生态

    (预计阅读时间:7分钟) TuGraph 图数据库提供了 JAVA.C++.Python 等多种语言的 SDK 支持,方便客户在各种场景下使用.用户使用 SDK 向TuGraph服务器发送Cypher ...

  5. Sentinel简介与使用(Java客户端)

    一.Sentinel简介 官方中文文档:https://sentinelguard.io/zh-cn/docs/introduction.html github网址:https://github.co ...

  6. Memcached 集群环境Java客户端

    Memcached 集群环境Java客户端 学习了: http://blog.csdn.net/zhouzhiwengang/article/details/53154112 http://guazi ...

  7. 原生 Java 客户端进行消息通信

    原生 Java 客户端进行消息通信 Direct 交换器 DirectProducer:direct类型交换器的生产者 NormalConsumer:普通的消费者 MulitBindConsumer: ...

  8. 转载——Java与WCF交互(一):Java客户端调用WCF服务

    最近开始了解WCF,写了个最简单的Helloworld,想通过java客户端实现通信.没想到以我的基础,居然花了整整两天(当然是工作以外的时间,呵呵),整个过程大费周折,特写下此文,以供有需要的朋友参 ...

  9. Java与WCF交互(一):Java客户端调用WCF服务

    最近开始了解WCF,写了个最简单的Helloworld,想通过java客户端实现通信.没想到以我的基础,居然花了整整两天(当然是工作以外的时间,呵呵),整个过程大费周折,特写下此文,以供有需要的朋友参 ...

最新文章

  1. leetcode231
  2. 手把手玩转win8开发系列课程(11)
  3. Redis:Redis集群实战
  4. deque双向队列的使用
  5. rfc mail content-type
  6. 通过实现网站访问计数器带你理解 轻量级锁CAS原理,还学不会算我输!!!
  7. mysql 性能优化 20 条建议
  8. 精通ASP.NET Web程序测试
  9. 【原】创建 WPF 不规则窗口
  10. (转)技嘉 MA790FXT-UD5P搭配AMD X4 965超频解析
  11. Python UI自动化 编程(一) UIAutomation
  12. m3u8流媒体下载 swift
  13. 蓝筹股是什么意思?低估值蓝筹股有哪些?拥有蓝筹股的好处?
  14. AXI中的wrap burst
  15. 系统启动时启动服务器,在BOIS如何设置启动项 启动方式怎么选择
  16. docker-bridge如何通信
  17. win 10 硬盘安装 ubuntu 18.04
  18. 安卓手机上编程开发环境
  19. 如何使用 scp 将文件夹从远程复制到本地?
  20. Django之破解数独

热门文章

  1. ubuntu14.04如何更换阿里云源
  2. java开灯问题_C++之开灯问题(链表)
  3. Machine Learning学习计划
  4. 温故而知新 js 的错误处理机制
  5. MailBee.NET Objects接收电子邮件(POP3)教程一:接收简单的电子邮件
  6. localStorage本地存储数组、读取、修改、删除
  7. maven添加sqlserver的jdbc驱动包
  8. Linux多任务编程(三)---exec函数族及其基础实验
  9. 程序员的奋斗史(二)——今天你跑步了吗
  10. TEDxChengdu 演讲(瞬雨)