一、binlog实时同步

1.binlog介绍
是一个二进制格式的文件,用于记录用户对数据库更新的SQL语句信息,例如更改数据库表和更改内容的SQL
语句都会就到binlog里,但对库表等内容的查询不会记录。
默认情况下,binlog日志是二进制格式的,不能使用文本查看工具的命令查看(例如cat、vi等),而使用mysqlbinlog解析
查看。

2.binlog的作用
当有数据写入数据库时,会同时把更新的sql语句写入对应的binlog文件里。
使用mysqldump备份时,只是对一段时间的数据进行全备,但如果备份后突然发现数据库服务器故障,这个时候就要用到
binlog的日志了。
主要作用是用于数据库的主从复制以及数据的增量恢复。

3.如何开启binlog日志功能?
修改/etc/my.cnf 配置文件,将下面的内容拷贝进入配置文件。
#binlog日志存路径
log-bin=/youfan/mysqlbinlogdata/mysql-bin
#日志记录形式,日志中会记录成每一行行数据被修改的形式
binlog-format=ROW
#指定当前机器的服务ID(若是集群不能重复)
server_id=1

my.cnf说明:
my.cnf是mysql启动时加载的配置文件,一般会放在mysql的安装目录中,用户也可以放在其他目录加载.
使用locate my.cnf命令可以列出所有的my.cnf文件,查找my.cnf的位置,可以输入 whereis my.cnf

查看MySQL默认读取my.cnf的目录,mysql默认会搜寻my.cnf的目录,顺序排前的优先。
mysql --help|grep 'my.cnf'

用rpm包安装的MySQL是不会安装/etc/my.cnf文件的.
至于为什么没有这个文件而MySQL却也能正常启动和作用,在点有两个说法,
第一种说法,my.cnf只是MySQL启动时的一个参数文件,可以没有它,这时MySQL会用内置的默认参数启动,
第二种说法,MySQL在启动时自动使用/usr/share/mysql目录下的my-medium.cnf文件,这种说法仅限于rpm包安装的MySQL,
解决方法,只需要复制一个/usr/share/mysql目录下的.cnf文件到/etc目录,并改名为my.cnf即可。

4.重启mysql服务并查看mysql日志
#重启服务器
systemctl restart mysqld.service
#另外一种重启方式
/etc/init.d/mysql stop 
/etc/init.d/mysql start

#登录
mysql -uroot -p
#查看配置信息
show variables like '%log_bin%'

#报错则执行以下语句
set global show_compatibility_56=on;

#查看binlog日志内容,模拟插入一条数据
use datawarehouse;
show tables;
insert into hongbaoinfo values(8,'测试红包',100,1,1,1,1);

#查看第一个binlog内容
show binlog events;

#查看所有的binlog文件列表
show binary logs;

#查看指定日志文件内容(在mysql外部执行)
mysqlbinlog -d datawarehouse /var/lib/mysql/mysql-bin.000002

二、数据库实时同步

1.选型要求:保证数据的一致性、及时性、完整性、以及代码无侵入性
-------------------------------------------------------------------------------------
实现方式    优缺点
代码实现    技术难度低,侵入性强,实时性高
基于mybatis    有一定的技术难度,但是无法覆盖所有的场景
Aop实现        技术难度低,半侵入性,需要规范代码,依然无法覆盖所有的场景
logstash    技术难度低,无侵入性,无需开发,但会造成资源浪费。
canal        实时性强,对于应用无任何侵入性,且性能更好,不会造成资源浪费

2.canal介绍:
是阿里巴巴的一个开源项目,基于java实现,整体已经在很多大型的互联网项目生产环境中使用,
包括阿里、美团等都有广泛的应用,是一个非常成熟的数据库同步方案,基础的使用只需要进行简单的配置即可。

3.canal底层原理:
模拟层mysql slave的方式,监听mysql的binlog日志来获取数据,binlog设置为row模式后,可以获取到每一个增删改
的脚本,同时还能获取到修改前和修改后的数据,基于这个特性,canal能高性能的获取到mysql数据的变更。

4.canal的使用
分区server端和client端
Server端部署好之后,直接监听mysql binlog,它只能接收数据,没有进行任何逻辑处理。
Client端需要大家进行简单的开发。

5.canal Adapter
官方做了独立组件Adapter,可以将canal server端获取的数据转换成几个常用的中间件数据源,目前支持kafka,rocketmq,
hbase,es,针对这几个中间件的支持,直接配置即可,无需开发。

6.下载安装cannal
#新建目录
mkdir canal
#解压canal
tar -zxvf canal.deployer-1.1.2.tar.gz

#配置文件
cd conf
vim canal.properties

# 默认值tcp,这里改为投递到Kafka
canal.serverMode = kafka
# Kafka bootstrap.servers,可以不用写上全部的brokers
canal.mq.servers = 192.168.50.104:9092
# 投递失败的重试次数,默认0,改为2
canal.mq.retries = 2
# Kafka batch.size,即producer一个微批次的大小,默认16K,这里加倍
canal.mq.batchSize = 32768
# Kafka max.request.size,即一个请求的最大大小,默认1M,这里也加倍
canal.mq.maxRequestSize = 2097152
# Kafka linger.ms,即sender线程在检查微批次是否就绪时的超时,默认0ms,这里改为200ms
# 满足batch.size和linger.ms其中之一,就会发送消息
canal.mq.lingerMs = 200
# Kafka buffer.memory,缓存大小,默认32M
canal.mq.bufferMemory = 33554432
# 获取binlog数据的批次大小,默认50
canal.mq.canalBatchSize = 50
# 获取binlog数据的超时时间,默认200ms
canal.mq.canalGetTimeout = 200
# 是否将binlog转为JSON格式。如果为false,就是原生Protobuf格式
canal.mq.flatMessage = true
# 压缩类型,官方文档没有描述
canal.mq.compressionType = none
# Kafka acks,默认all,表示分区leader会等所有follower同步完才给producer发送ack
# 0表示不等待ack,1表示leader写入完毕之后直接ack
canal.mq.acks = all
# Kafka消息投递是否使用事务
# 主要针对flatMessage的异步发送和动态多topic消息投递进行事务控制来保持和Canal binlog位置的一致性
# flatMessage模式下建议开启
canal.mq.transaction = true

cd example/
vim instance.properties
#主机id
canal.instance.master.address=127.0.0.1:3306

#数据库的用户名和密码
canal.instance.dbUsername=canal1
canal.instance.dbPassword=canal1

#kafka topic
canal.mq.topic=test1

#创建数据库用户
CREATE USER canal1 IDENTIFIED BY 'canal1';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal1'@'%';
FLUSH PRIVILEGES;

#启动cannal
bin/startup.sh
jps中包含 CanalLauncher即可

#查看日志
tail -f logs/example/example.log

#启动kafka
zkServer.sh start(查看启动状态zkServer.sh status)

#启动kafka:
cd /usr/local/kafka 
./kafka-server-start.sh ../config/server.properties
#监听test1 topic
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1

#数据库执行insert语句
insert into hongbaoinfo values(9,'测试红包1',100,1,1,1,1);

四、java项目

新建BillionDataAnaly模块
新建包com.youfan.transfer
新建类TransferData.java
修改pom.xml,增加依赖。
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <version>1.9.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.9.0</version>
</dependency>

flink代码:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.50.104:9092");
properties.setProperty("group.id", "youfan");
//构建FlinkKafkaConsumer
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test1", new SimpleStringSchema(), properties);
指定偏移量
//myConsumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(myConsumer);
env.enableCheckpointing(5000);
stream.print();
env.execute("Flink Streaming Java API Skeleton");

binlog实时同步相关推荐

  1. mysql switch binlog_TiDB binlog实时同步数据到下游Kafka

    1 TiDB测试集群,使用tiup进行安装部署和运维操作. 集群状态如下: 2 简要介绍一下TiDB binlog架构TiDB Binlog集群主要分为Pump和Drainer两个组件,以及binlo ...

  2. binlog流程 mysql_小米 MySQL 数据实时同步到大数据数仓的架构与实践

    背景MySQL由于自身简单.高效.可靠的特点,成为小米内部使用最广泛的数据库,但是当数据量达到千万/亿级别的时候,MySQL的相关操作会变的非常迟缓:如果这时还有实时BI展示的需求,对于mysql来说 ...

  3. 如何基于Canal 和 Kafka,实现 MySQL 的 Binlog 近实时同步

    转载自 如何基于Canal 和 Kafka,实现 MySQL 的 Binlog 近实时同步 近段时间,业务系统架构基本完备,数据层面的建设比较薄弱,因为笔者目前工作重心在于搭建一个小型的数据平台.优先 ...

  4. centos7时间同步_基于 Canal 和 Kafka 实现 MySQL 的 Binlog 近实时同步

    点击蓝色"架构文摘"关注我哟 加个"星标",每天上午 09:25,干货推送! 作者:Throwable    掘金:https://juejin.im/post ...

  5. 基于 Canal 和 Kafka 实现 MySQL 的 Binlog 近实时同步

    前提 近段时间,业务系统架构基本完备,数据层面的建设比较薄弱,因为笔者目前工作重心在于搭建一个小型的数据平台.优先级比较高的一个任务就是需要近实时同步业务系统的数据(包括保存.更新或者软删除)到一个另 ...

  6. 百万级商品数据实时同步,查询结果秒出

    来自:微微科技公司 前阵子老板安排了一个新任务,要建设一个商家商品搜索系统,能够为用户提供快速.准确的搜索能力,在用户输入搜索内容时,要能从商家名称和商品名称两个维度去搜索,搜索出来的结果,按照准确率 ...

  7. binlog工具_基于Binlog实时同步数仓,有哪些不为人知的坑?

    最近看到一篇文章<基于Canal与Flink实现数据实时增量同步>,主要讲解的是基于Flink有关于MySQL Binlog数据采集的方案,看了一下实践方法和具体代码操作,感觉有一些欠考虑 ...

  8. mysql hadoop架构,Debezium实现Mysql到Elasticsearch高效实时同步

    问题导读 1. Debezium有哪些特点? 2.Debezium如何实现Mysql到ES增删改实时同步? 3.如何配置connector连接器? 题记 来自Elasticsearch中文社区的问题- ...

  9. MySQL亿级数据量实时同步,小米如何完美hold住

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"1024"获取公众号专属1024GB资料 作者丨刘心光 来源丨小米云技术(ID:mi- ...

最新文章

  1. 滴滴 Elasticsearch 集群跨版本升级与平台重构之路
  2. 浏览器在线预览pdf、txt、office文件
  3. 3d制作中需要注意的问题_珠宝首饰工艺篇-戒指3D造型设计制作注意要点
  4. erlang环境变量——HOME
  5. alpine linux安装java,alpinelinux安装openjre
  6. [软件工程] 面向对象方法学引论
  7. 3个国内最大的黑客学习网站
  8. c语言sum求和程序,C语言实现的统计素数并求和代码分享
  9. Java拿到前一天的零点零分
  10. QGIS二次开发2:添加矢量、栅格图层及图层列表的实现
  11. 开涛spring3(6.9) - AOP 之 6.9 代理机制
  12. 8个亿!河南首富再次无偿捐款西湖大学,西湖大学河南籍校董高达11位
  13. 3.提取线稿(PS)
  14. Efficient Parameter-free Clustering Using First Neighbor Relations
  15. 金融分析与风险管理——期权类型及到期时的盈亏
  16. 方法的调用,构造方法,方法的重载
  17. ARP病毒终极解决方案
  18. CentOS 目录结构介绍
  19. 计算机cpu的风扇是多少电压,哪位知道cpu风扇电压是多少?
  20. 独自封装windows 10系统详细教程(四)

热门文章

  1. 什么是Linux发行版 以及各发行版的区别
  2. Linux usleep不准问题排查
  3. bat批处理开发-wifi联网系列(3):查询当前连接的wifi SSID和密码,封装为bat函数(如何传递入参和返回出参)
  4. Oracle数据库实验四查询实验三(Oracle 11g)
  5. 2019-7-27 [MySQL] DQL 简单查询[别名/去重/运算] 条件查询 排序查询 聚合查询 分组查询 导出与导入 多表操作[一对多/多对多][创外键 创联合主键 约束 添加 删除 测试]
  6. java与C/C++的比较
  7. PPP概念股有哪些?PPP概念股大全
  8. 计算方法 7.数值积分计算方法
  9. 【微信小程序】小程序实现文件的上传及预览,以PDF文件为例。
  10. IntelliJ IDEA 2018版本操作总结(长期更新)