下载安装使用Canal

  • 下载安装使用Canal
    • 下载
    • 数据库配置
    • 解压Canal
    • 以Springboot项目简单使用canal

下载安装使用Canal

下载

  • 下载地址:https://github.com/alibaba/canal/releases
  • github你懂的龟速下载,如果有需要可以通过我的链接下载1.1.3版本
  1. 下载地址:https://pan.baidu.com/s/1ZLyRCB72GDW_fs-0BVU40A
  2. 提取码:9met

数据库配置

  • 查看是否开启bin-log:show variables like 'log_bin';(默认是未开启的需要配置)
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | OFF    |
+---------------+-------+
1 row in set (0.00 sec)
  • 开启binlog-配置bin-log
  1. vi /etc/my.cnf(这里的一般数据库默认安装是这里)
  2. 在my.cnf的 [mysqld] 标签下添加以下配置:
 [mysqld] log-bin = mysql-bin #binlog文件名  binlog_format = ROW #选择row模式server_id = 1
  1. 重启mysql
    使用 service 启动:service mysqld start
    使用 service 停止:service mysqld stop
    使用 service 重启:service mysqld restart
  2. 开启root用户远程访问权限
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456';
flush privileges;
  1. 检查binlog的开启
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)

value值为ON表示已开启

解压Canal

  1. 将canal压缩文件解压到/usr/local/canal
    mkdir /usr/local/canal
    讲文件放到该目录下
    解压:tar -zxvf <tab>
    进入conf/example/instance.properties文件修改参数
#在position info中写上
canal.instance.master.address = 127.0.0.1:3306  #自己的ip和数据库端口号#在username/password下写上
canal.instance.dbUsername = root #数据库用户,该用户需要可以远程访问的
canal.instance.dbPassword = 123456 #密码
  1. 进到bin目录启动canal
./startup.sh
  • tips:远程访问的端口是:11111
    可以使用telnet <ip> <port>测试端口是否能访问。如果不能访问通常是因为防火墙未关闭,可关闭防火墙和开放端口
firewall-cmd --state #查看运行状态
#开放1024的端口
firewall-cmd --add-port=1024/tcp --permanent
#重载生效刚才的端口设置
firewall-cmd --reload

以Springboot项目简单使用canal

  • maven依赖
<!--   版本自选  --><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>commons-dbutils</groupId><artifactId>commons-dbutils</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId></dependency></dependencies>
  • Application.yml
server:port: 10001
spring:application:name: canal-clientprofiles:active: devdatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/canal?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8username: rootpassword: 123456
  • 启动类
@SpringBootApplication
public class CanalApplication implements CommandLineRunner {@Resourceprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalApplication.class,args);}@Overridepublic void run(String... args) throws Exception {canalClient.run();}
}
  • Canal的client类
@Component
public class CanalClient {//sql队列private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate DataSource dataSource;/*** canal入库方法*/public void run() {//这里canal默认端口号是11111CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1"/*这里是ip*/,11111), "example", "", "");int batchSize = 1000;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql.toString());}}/*** 数据处理** @param entrys*/private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private void saveUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private void saveDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private void saveInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 入库* @param sql*/public void execute(String sql) {Connection con = null;try {if(null == sql) return;con = dataSource.getConnection();QueryRunner qr = new QueryRunner();int row = qr.execute(con, sql);System.out.println("update: "+ row);} catch (SQLException e) {e.printStackTrace();} finally {DbUtils.closeQuietly(con);}}
}

下载安装使用Canal相关推荐

  1. Redis学习之路(一)--下载安装redis

    redis学习之路--下载安装redis windows安装redis 1.下载redis 2.安装 3.查看是否安装成功 windows安装redis 1.下载redis 网址:https://gi ...

  2. git 下载 安装

    1.下载Git,官网地址:https://git-scm.com/,进入官网首页 在右下方的显示器中找到最新的版本下载,点击下载,跳转到下载页面 下载完成 2.安装Git 双击刚刚下载完成的安装文件, ...

  3. Docker初学5:下载安装可视化图形工具Portainer

    下载安装可视化图形工具Portainer # 搜索Portainer [root@iZh40ti53pk77iZ ~]# docker search portainer NAME DESCRIPTIO ...

  4. 01-01java概述 doc命令、jdk\jre下载安装、path、classpath配置、开发中常见小问题

    1:计算机概述(了解) (1)计算机(2)计算机硬件(3)计算机软件系统软件:window,linux,mac应用软件:qq,yy,飞秋(4)软件开发(理解)软件:是由数据和指令组成的.(计算器)开发 ...

  5. Kali Linux攻防系统(一:攻防系统Kali Linux下载安装与更新)

    任务一:攻防系统Kali Linux下载安装与更新 1.1.安装Kali Linux虚拟机 1.1.1.电脑硬件配置至少达到 CPU 内存 存储 >四核 >4G >20G 1.1.2 ...

  6. 平板电脑安装软件_题宝典软件升级了,微信公众号版不受影响,电脑版/手机APP/平板APP需要重新下载安装...

    亲爱的小伙伴们 大家好 题宝典软件升级了 那我们应该升级题库软件呢? 我们来一起看看 一.微信公众号版 进入步骤 关注本公众号(tbd339),点击菜单栏的"做题中心",如下图,然 ...

  7. windows10 下载 安装 使用 Sox

    windows10 下载 安装 使用 Sox 官网 http://sox.sourceforge.net/ 下载地址 https://sourceforge.net/projects/sox/file ...

  8. windows10中git 的下载安装

    git下载安装 下载网址: https://pc.qq.com/detail/13/detail_22693.html 双击安装 安装

  9. Sublime Text 3便携版下载安装和常用插件安装--顺便解决报错An error occured installing和no packages available for install

    文章目录 Sublime Text 3便携版下载安装和常用插件安装 1.Sublime Text 3便携版下载: 2.sublime 插件控制器(Package Control)安装 2.1离线安装P ...

最新文章

  1. 英特尔诺基亚将联手开发智能手机
  2. 蓝牙小电池图标_丽声小百科 | 乐趣助听器如何连接iPhone手机?
  3. 周期三角波傅里叶级数例题_傅里叶详解之傅里叶级数
  4. MFC模态与非模态对话框的创建与销毁
  5. UE4的编译配置详解
  6. css 平行四边形 梯形 组合_微课|人教版五年级数学上册6.4组合图形的面积(P99)...
  7. 两间三层小型别墅图片_占地仅120平的现代风别墅,带KTV和健身房,引领时尚新潮流...
  8. 对HGE游戏引擎的一次封装
  9. ADC采样间隔问题+TRGO作为ADC的触发源头
  10. 解决“在上下文中找不到 owin.Environment 项”
  11. TCP header
  12. svn linux clean up,SVN清理失败 (svn cleanup) 的解决方法
  13. 富士相机设置传原图_「富士相机」机身设置分享,摄影小白也可以拍摄到胶片感的照片...
  14. 余姚计算机编程培训,余姚编程软件培训
  15. 爬虫入门(简单网页信息爬取)
  16. 为什么计算机网络使用数字信号,什么是数字信号
  17. 大学生网课答案查询公众号搭建教程
  18. CSAPP:第二章——信息的表示和处理
  19. 什么是前置审批许可、后置审批许可?
  20. VS2016 发布项目提示 CS0006 C# Metadata file 'xxxxxxx.dll' could not be found

热门文章

  1. p12..Matplotlib:Contours等高线图
  2. cmd chcp命令切换字符格式
  3. 偷偷学Python,怎么高空建楼(Python自动化办公实现批量替换Word)
  4. 如何成为一名优秀的程序员(一)
  5. js递归理解及使用案例
  6. 【C++笔记】 判断两个数互质(做大公约数为1)
  7. 可修改UID的白卡,请问为什么一般读写器不能修改这种卡的UID,必须要特定的读写器才能修改?
  8. 【各种转换】数组转换成字符串,集合转换成字符串,字符串转集合
  9. git 重置用户名 密码信息
  10. 破解网吧电影,获取电影的实际地址!