一、Canal架包下载上传

(一)下载

官网架包地址为:https://github.com/alibaba/canal/releases/tag/canal-1.1.5-alpha-2

本人百度云盘下载地址:

链接:https://pan.baidu.com/s/1MM5YGubaTW3Y2hy1tvBmPw

提取码:jiur

(二)上传解压

创建canal文件夹

cd /usr/local

mkdir canal

将下载好的canal上传至Linux服务器/usr/local/canal目录下进行解压。

tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz

二、配置MySQL文件

(一)修改MySQL my.cnf配置文件

1.查找MySQL在Linux环境中的my.cnf

mysql --help|grep 'my.cnf'

如图:

2.修改my.cnf

vi /etc/my.cnf

log-bin=mysql-bin #添加这一行就ok

binlog-format=ROW #选择row模式

server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

(二)重启 MySQL

查看MySQL启动状态

service mysqld status(5.0版本是mysqld)

service mysql status(5.5.7版本是mysql)

重启MySQL

service mysqld restart

service mysql restart (5.5.7版本命令)

(三)查看MySQL binlog文件是否开启

1.在Linux中登录MySQL

mysql -u 用户名 -p

如:mysql -u root -p

输入密码对应的账号密码

2.查看binlog文件是否开启

show variables like 'log_%';

效果如下:

三、创建canal账号

(一)设置canal账号并赋权

drop user 'canal'@'%';

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; -- 创建canal用户

grant all privileges on *.* to 'canal'@'%' identified by 'canal'; -- 为canal用户赋予repication权限

flush privileges;

如果出现以下情况,说明MySQL设置了密码难度,需要修改MySQL设置,如果没有出现则调过以下步骤

1.查看MySQL的策略

SHOW VARIABLES LIKE 'validate_password%';

2.设置MySQL密码验证强调的策略

set global validate_password_policy=LOW;

3.设置MySQL密码最低长度

set global validate_password_length=5;

4.重新设置一下账号即可

drop user 'canal'@'%';

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; -- 创建canal用户

grant all privileges on *.* to 'canal'@'%' identified by 'canal'; -- 为canal用户赋予repication权限

flush privileges;

如图

四、构建CanalService

(一)修改instance.properties配置

vi /usr/local/canal/conf/example/instance.properties

canal.instance.master.address=127.0.0.1:3306

canal.instance.dbUsername=canal#此处是我们为mysql配置的canal用户

canal.instance.dbPassword=canal#此处是我们为mysql配置的canal用户的密码

canal.mq.topic=mysql-yjlcplatform-topic #mq消息主题

(二)修改canal.properties

vi /usr/local/canal/conf/canal.properties

canal.serverMode = kafka

kafka.bootstrap.servers = 192.168.200.7:9092

(三)启动canal

cd /usr/local/canal/bin

./ startup.sh

查看canal启动日志

cat /usr/local/canal/logs/example/example.log

以下效果需要等一会儿

注意:

在启动canal的时候可能会报kafka连接超时,则重新启动kafka即可。

(四)验证MySQL与kafka是否关联成功

下载ZooInspector工具进行验证:

链接:https://pan.baidu.com/s/1SbiszPvYVfbmdDQRsdLAqg

提取码:cyb8

五、后端项目代码

(一)Pom配置文件

org.springframework.kafka

spring-kafka

(二)bootstrap.yml配置文件

# kafka

spring:

kafka:

# kafka服务器地址(可以多个)

bootstrap-servers: 192.168.200.7:9092

consumer:

# 指定一个默认的组名

group-id: kafka2

# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

auto-offset-reset: earliest

# key/value的反序列化

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

producer:

# key/value的序列化

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

# 批量抓取

batch-size: 65536

# 缓存容量

buffer-memory: 524288

(三)Java文件

1.启动类

/**

* --------------------------------------------------------------

* FileName: AppMemberCanalClient.java

*

* @Description:消费端

* @author: cyb

* @CreateDate: 2020-09-07

* --------------------------------------------------------------

*/

@SpringBootApplication

public class AppMemberCanalClient {

public static void main(String[] args) {

SpringApplication.run(AppMemberCanalClient.class);

}

}

2.后端监听类

package com.yjlc.kafka.client;

import com.alibaba.fastjson.JSONArray;

import com.alibaba.fastjson.JSONObject;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

@Component

public class MembetKafkaConsumer {

@KafkaListener(topics = "mysql-yjlcplatform-topic")

public void receive(ConsumerRecord, ?> consumer) {

System.out.println("topic名称:" + consumer.topic() + ",key:" +

consumer.key() + "," +

"分区位置:" + consumer.partition()

+ ", 下标" + consumer.offset() + "," + consumer.value());

String json = (String) consumer.value();

JSONObject jsonObject = JSONObject.parseObject(json);

String type = jsonObject.getString("type");

String pkNames = jsonObject.getJSONArray("pkNames").getString(0);

JSONArray data = jsonObject.getJSONArray("data");

String table = jsonObject.getString("table");

String database = jsonObject.getString("database");

for (int i = 0; i < data.size(); i++) {

JSONObject dataObject = data.getJSONObject(i);

String key = database + ":" + table + ":"+dataObject.getString(pkNames);

switch (type) {

case "UPDATE":

case "INSERT":

break;

case "DELETE":

break;

}

}

}

}

以上功能亲测有效。如对以上内容有疑问的可以留言讨论,转载请说明出处,本人博客地址为:https://www.cnblogs.com/chenyuanbo/

技术在于沟通交流!

canal下载 linux_canal实时同步mysql数据到redis或ElasticSearch相关推荐

  1. 使用canal实时同步MySQL数据到Elasticsearch

    使用canal实时同步MySQL数据到Elasticsearch 搭建环境 安装 elasticsearch 安装 kibana 下载和安装canal 1.下载canal 2.配置MySQL 3.配置 ...

  2. 利用Canal全量/增量同步mysql数据至ES

    Canal同步mysql数据至ES 1.更改Mysql配置 1.1 开启 Binlog 写入功能 配置 binlog-format 为 ROW 模式,配置my.cnf [mysqld] log-bin ...

  3. 使用maxwell实时同步mysql数据到kafka

    一.软件环境: 操作系统:CentOS release 6.5 (Final) java版本: jdk1.8 zookeeper版本: zookeeper-3.4.11 kafka 版本: kafka ...

  4. Elasticsearch7.9集群部署,head插件,canal同步mysql数据到es,亲自测试,无坑

    Elasticsearch集群部署 1.服务器规划 10.4.7.11 node1 10.4.7.12 node2 10.4.7.13 node3 1. 集群相关    一个运行中的 Elastics ...

  5. Flink CDC 实时同步mysql

    前言 在实际开发中,需要做数据同步的场景是非常多的,比如不同的应用之间不想直接通过RPC的方式进行数据交互,或者说下游应用需要检测来自上游应用的某些业务指标数据的变化时,这些都可以考虑使用数据同步的方 ...

  6. elasticsearch通过logstash同步mysql数据(中文分词)

    Elasticsearch 目录 概述 索引(Index) 类型(Type) 文档(Document) 倒排索引(Inverted Index) 节点(Node) 安装 启动 DSL(查询语句) 官方 ...

  7. Flink-cdc 同步mysql数据

    下载地址:https://github.com/ververica/flink-cdc-connectors/releases 这里下载2.2.0版本:https://github.com/verve ...

  8. confluent实时同步sqlserver数据到kafka

    安装准备 192.168.23.132 (主机名spark01) 192.168.23.133 (主机名spark02) 192.168.23.134 (主机名spark03) confluent-5 ...

  9. Logstash同步mysql数据

    Logstash同步mysql数据到ElasticSearch 1.安装logstash 下载logstash,解压压缩包 下载地址:https://www.elastic.co/downloads/ ...

最新文章

  1. Spring Boot中使用LDAP来统一管理用户信息
  2. 【Python面试】 说说Python模块主要分哪三类?
  3. Mysql基础语法DDL、DML、DQL
  4. jenkins 自动化部署常用插件
  5. php闪屏程序,节日闪屏的两种构成方式
  6. W ndows10用于3D建模,Windows10系统自带3D builder应用有哪些作用
  7. MyBatis使用foreach批量插入一个含List<实体>成员变量的实体类
  8. 互联网名词解释(通用、运营、广告、APP推广、移动推广、APP运营、商务模式、职位、客户管理)
  9. python课程设计汽车销售管理系统_汽车销售管理系统--课程设计
  10. 【C++】1070:人口增长(信息学奥赛)
  11. python图形化编程 在线教程_使用Python Editor进行在线图形化编程
  12. 学python历程中
  13. 鸡兔同笼python程序怎么写_关于鸡的歇后语
  14. 反序列化漏洞利用总结
  15. C#--集合添加数据(ArrayList and list)
  16. 你了解设计公司的服务内容有哪些吗?
  17. 易语言传文本到c 崩溃,win7系统易语言打开支持库配置就崩溃的解决方法
  18. 【MATLAB】Linux下的matlab的安装
  19. RS232 / TTL / CMOS 电平
  20. 顶尖众筹商业模式实操案例分享,打开你全新的赚钱思维!

热门文章

  1. R语言ggplot2可视化:使用热力图可视化dataframe数据、自定义设置热力图的颜色、自定添加标题、轴标签、热力图线框等
  2. R语言使用hexbin包的hexbin函数可视化散点图、应对数据量太大、且有数据重叠的情况、普通散点图可视化效果变差的情况、提供了对六边形单元格的二元绑定、通过图例颜色标定每一个区域数据点的数量
  3. R语言使用ggplot2包使用geom_dotplot函数绘制分组点图(改变分组的次序)实战(dot plot)
  4. Python将图像分割成小块然后将所有的块重新拼接在一起
  5. python构建t检验(Student’s t-test)
  6. python使用psutil获取系统(Windows Linux)所有运行进程信息实战:CPU时间、内存使用量、内存占用率、PID、名称、创建时间等;
  7. 线性判别分析(LDA)和她的家人们
  8. fgbio,picard处理带有UMI的fq序列
  9. (邓爱萍)类对象-this关键字
  10. wince5使用access数据库_关于wince系统支持什么数据库的阿里云论坛用户知识和技术交流...