【上一章 【Kafka】第二篇-Kafka的核心概念及分区消费规则】

学习路线

  • Kafka集群架构
  • Kafka集群环境
    • 1、kafka是一个压缩包,直接解压即可使用,所以我们就解压三个kafka;
    • 2、配置kafka集群:server.properties
      • (1)三台分别配置为:
      • (2)三台分别配置listener=PAINTEXT:IP:PORT
      • (3)配置日志目录
      • (4)配置zookeeper连接地址
      • (5)设置副本个数
    • 集群启动:
  • Kafka案例实战
  • What is Canal?
    • 工作原理
      • MySQL的主从复制将经过如下步骤:
    • Canal工作原理
    • Canal使用场景
  • Canal运行环境
    • MySQL环境的准备
      • 相关命令:
      • Canal要注意binlog日志格式要求为row格式;
    • Canal环境准备
      • 1、下载 canal部署程序
      • 2、配置修改
      • 3、启动Canal
      • 4、查看进程:
      • 5、查看 server 日志
      • 6、查看 instance 的日志
      • 7、关闭Canal
  • MySQL+Canal+Kafka应用实践
    • 1.修改canal 配置文件
    • 2.修改instance配置文件:
  • 结合ELK实现日志收集系统

Kafka集群架构

  • 一个Kafka 集群体系架构包括多个Producer、多个Broker 、多个Consumer,以及一个zooKeeper 集群,其中 ZooKeeper是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的,Producer将消息发送到 Broker, Broker 负责将收到的消息存储到磁盘中,而Consumer 负责从 Broker 订阅并消费消息;

  • 我们知道Kafka的一个topic下可以有多个分区,每个分区又引入了多副本 Replica机制,通过增加副本数量可以提升容灾能力,同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是一主多从的关系,其中leader副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步,副本处于不同的broker,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务,Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker故障时仍然能保证服务可用;

  • 上图中,Kafka集群中有4个broker,某个topic主题中有3个分区,且副本因子(即副本个数)也为3,那么每个分区便有1个leader副本和2个follower 副本,生产者和消费者只与leader副本进行交互,而follow副本只负责消息的同步,很多时候 follower 副本中的消息相对leader副本而言会有一定的滞后;

  • 分区中的所有副本统称为AR (Assigned Replicas),所有与leader副本保持一定程度同步的副本组成 ISR (On-Sync Replicas) , ISR 集合是AR集合中 一个子集,消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度滞后,leader副本同步滞后过多的副本(不包 leader 副本)组成 OSR (Out-of-Sync Replicas ),即AR=ISR+OSR,在正常情况下,follower副本都应该与 leader 副本保持一定程度的同步,即AR=ISR,OSR集合为空;

  • leader 副本负责维护和跟踪ISR集合中所有follower的滞后状态,follower副本落后或故障时,leader副本会把它从ISR集合中剔除,OSR集合中若有 follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至 ISR 集合,leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR 集合中的副本没有机会被选举为leader;

  • OrderTopic --> 指定了3个分区 -->每一个分区指定3个副本(备份)
    这3个副本包含了主副本(所以就是1个主副本,2个从副本)

Kafka集群环境

1、kafka是一个压缩包,直接解压即可使用,所以我们就解压三个kafka;

2、配置kafka集群:server.properties

(1)三台分别配置为:
broker.id=1、broker.id=2、broker.id=3

该配置项是每个broker的唯一id,取值在0~255之间;

(2)三台分别配置listener=PAINTEXT:IP:PORT
listeners=PLAINTEXT://192.168.1.1:9091
listeners=PLAINTEXT://192.168.1.1:9092
listeners=PLAINTEXT://192.168.1.1:9093

三台分别配置advertised.listeners=PAINTEXT:IP:PORT

advertised.listeners=PLAINTEXT://192.168.1.1:9091
advertised.listeners=PLAINTEXT://192.168.1.1:9092
advertised.listeners=PLAINTEXT://192.168.1.1:9093
(3)配置日志目录
log.dirs=/usr/local/kafka_2.13-2.5.0-01/logs/kafka-logs
log.dirs=/usr/local/kafka_2.13-2.5.0-02/logs/kafka-logs
log.dirs=/usr/local/kafka_2.13-2.5.0-03/logs/kafka-logs

这是极为重要的配置项,kafka所有数据就是写入这个目录下的磁盘文件中的;

(4)配置zookeeper连接地址
zookeeper.connect=localhost:2181
如果zookeeper是集群,则:
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
(5)设置副本个数
offsets.topic.replication.factor=3

集群启动:

用 daemon参数

./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh -daemon ../config/server.properties

然后查看/logs/server.log日志文件,查看启动是否正常;
Kafka 集群会定期自动关闭但却无法找到原因,从日志上broker 是正常关闭而非异常崩溃,原因就是启动方式没有使用-daemon;

Kafka案例实战

  • MySQL + Canal + kafka + logstash + elasticsearch + kibana (或者elasticsearch-head)

  • MySQL + Canal + kafka + 自己写个client消费kafka–> 业务处理
    系统 + kafka + logstash + elasticsearch + kibana (或者elasticsearch-head)

  • 抖音/快手: 系统日志 --> kafka --> logstash --> elasticsearch --> kibana (或者elasticsearch-head)

What is Canal?

  • Canal [k?'n?l],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;

  • 历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了Canal项目;
    Github:https://github.com/alibaba/canal

工作原理

传统MySQL主从复制工作原理

从上层来看,复制分成三步:

MySQL的主从复制将经过如下步骤:

  • 1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
  • 2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;
  • 3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;
  • 4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;
  • 5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
  • 6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;

Canal工作原理

  • 1、canal 模拟 MySQL slave 的交互协议,把自己伪装为 MySQL slave,向 MySQL master 发送dump 协议;
  • 2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即canal );
  • 3、canal 解析 binary log 对象 (原始数据为byte流)

Canal使用场景

Canal是基于MySQL变更日志增量订阅和消费的组件,可以使用在如下一些一些应用场景:

  • 数据库实时备份;
    -业务cache刷新;
  • search build;
  • 价格变化等重要业务消息;
  • 带业务逻辑的增量数据处理;
  • 跨数据库的数据备份(异构数据同步) 例如mysql => oracle,mysql=>mongo,mysql =>redis,mysql => elasticsearch等;
    当前canal 主要是支持源端 MySQL(也支持MariaDB),版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x;

Canal运行环境

MySQL环境的准备

  • 1、准备好MySQL运行环境;(安装好)
  • 2、开启 MySQL的binlog写入功能,配置 binlog-format 为ROW模式;
    my.cnf中配置如下:
[mysqld]
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置MySQL replaction需要定义,不要和canal的slaveId重复
  • 3、授权canal连接MySQL账号具有作为MySQL slave的权限, 如果已有账户可直接 grant授权:
    (1)启动MySQL服务器;
    (2)登录mysql:./mysql -uroot -p -h127.0.0.1 -P3306
    (3)执行如下命令:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

相关命令:

#是否启用了日志

show variables like 'log_bin';

#怎样知道当前的日志

show master status;

#查看mysql binlog模式

show variables like 'binlog_format';

#获取binlog文件列表

show binary logs;

#查看当前正在写入的binlog文件

show master status\G

#查看指定binlog文件的内容

show binlog events in 'mysql-bin.000002';

Canal要注意binlog日志格式要求为row格式;

Binlog的三种基本类型分别为:

  • ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但是会占用较多的空间,需要使用mysqlbinlog工具进行查看;
  • STATEMENT模式 只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
    MIX模式 比较灵活的记录,例如说当遇到了表结构变更的时候,就会记录为- - statement模式,当遇到了数据更新或者删除情况下就会变为row模式;

Canal环境准备

1、下载 canal部署程序

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal.deployer-1.1.4

2、配置修改

vim conf/example/instance.properties

主要是修改配置文件中与自己的数据库配置相关的信息;

canal.instance.master.address=127.0.0.1:3306

(其他的都不需要修改)

3、启动Canal

./startup.sh

4、查看进程:

ps -ef | grep canal

5、查看 server 日志

cat logs/canal/canal.log

6、查看 instance 的日志

vi logs/example/example.log

7、关闭Canal

./stop.sh

Canal Server的默认端口号为:11111,如果需要调整的话,可以到/conf目录下的canal.properties文件中进行修改;
至此,我们的MySQL和Canal环境便准备OK了;

MySQL+Canal+Kafka应用实践

1.修改canal 配置文件

vim conf/canal.properties
  • (1)
#可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
  • (2)
##################################################
#########                    MQ                      #############
##################################################
canal.mq.servers = 192.168.172.128:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

2.修改instance配置文件:

在canal目录下

vim conf/example/instance.properties
  • (1)
canal.instance.master.address=127.0.0.1:3306
  • (2)
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
  • 3.启动:
先启动mysql
再启动zookeeper
再启动kafka
最后启动canal

查看日志:

vim logs/canal/canal.log

消费者接收kafka消息:

./kafka-console-consumer.sh -bootstrap-server 192.168.1.1:9091,192.192.168.1.1:9092,192.168.1.1:9093 --topic mytopic --from-beginning

或者使用我们的程序接收消息;

结合ELK实现日志收集系统

  • 数据(任何数据)–> kafka --> logstash --> elasticsearch --> kibana (elasticsearch-head插件)
  • logstash-from-kafka-to-es.yml
input {kafka {bootstrap_servers => "localhost:9091"
topics => ["test"]
}
}output {elasticsearch {hosts => ["localhost:9200"]
index => "kafka-topic-%{+YYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
  • 启动
./logstash -f ../config/logstash_from_kafka_to_es.yml

【下一章 【Kafka】第四篇-Kafka为什么这么快?】】

【Kafka】第三篇-Kafka的集群及Canal介绍相关推荐

  1. Kafka 详解(二)------集群搭建

    这里通过 VMware ,我们安装了三台虚拟机,用来搭建 kafka集群,虚拟机网络地址如下: hostname                      ipaddress             ...

  2. kafka tool 2.1连接kerberos的kafka(cdh6.3.2)集群。

    kafka tool 2.1连接kerberos的kafka(cdh6.3.2)集群. kafka tools 下载连接:https://www.kafkatool.com/download.html ...

  3. Kafka基于Zookeeper搭建高可用集群实战

    Kafka基于Zookeeper搭建高可用集群实战 1 前言 1.1 高可用的由来 为何需要Replication? 在Kafka在0.8以前的版本中,是没有Replication的,一旦某一个Bro ...

  4. SpringCloud(第 051 篇)EurekaServer集群高可用注册中心以及简单的安全认证

    SpringCloud(第 051 篇)EurekaServer集群高可用注册中心以及简单的安全认证 - 一.大致介绍 1.前面章节分析了一下 Eureka 的源码,我们是不是在里面注意到了 Peer ...

  5. windows+mysql集群搭建-三分钟搞定集群

    一.集群了解 计算机一级考试系统要用集群,目标是把集群搭建起来,保证一个库dang了,不会影响程序的运行.首先看了一些关于集群的资料,然后根据步骤一步步的整,遇到了一些问题,在这里把我遇到的问题以及解 ...

  6. 【云原生】第十篇--Docker主机集群化方案 Docker Swarm

    Docker主机集群化方案 Docker Swarm 一.docker swarm介绍 二.docker swarm概念与架构 2.1 架构 2.2 概念 三.docker swarm集群部署 3.1 ...

  7. 《叶问》37期,三节点的MGR集群关掉两个节点后还能继续读写吗

    不发碎碎念了,唠叨那些没啥意思,重回『叶问』正轨. 1. 三节点的MGR集群关掉两个节点后还能继续读写吗 这里要先明确一个前提,两个节点是正常关闭MGR服务,还是异常宕机. 如果两个节点是手动执行 s ...

  8. 搭建服务器集群的方法介绍

    搭建服务器集群的方法介绍 搭建本地服务器集群 软硬件要求 安装服务器 网络配置 搭建本地服务器集群 软硬件要求 一台电脑(系统不限,配置高一点更好) VirtualBox Centos7 Virtua ...

  9. Citus集群拓扑架构介绍

    作者:杨杰 简介 Citus是Postgres的开源扩展,将Postgres转换成一个分布式数据库,在集群的多个节点上分发数据和查询,具有像分片.分布式SQL引擎.复制表和分布式表等特性. 因为Cit ...

最新文章

  1. mysql 批量修改数据库存储引擎_mysql批量修改表存储引擎
  2. Python的运算符与表达式
  3. 利用datagrip从hive导入csv数据(还没整理完)
  4. SAP Spartacus UnitDetailsComponent的路由跟踪
  5. android:id=@android:id/tabhost 、android:id=@+id/llRoot 、android:id=@id/llRoot 之间的区别...
  6. java斗地主怎么出牌_斗地主滑动选牌出牌(Cocos Creator)
  7. Java金融计算机计算irr_手把手教你使用金融计算器
  8. php程序员自我描述_php程序员自我评价简历范文
  9. eclipse运行代码后变为红色和绿色
  10. JSR303数据校验
  11. 关于OpenCV for Python入门-face_recognition实现人脸识别
  12. python测试之道进阶_深入学习AB测试(一)-AB Testing With Python[项目实战]
  13. 计算机教师培训项目申报书,课题《基于培养教师信息素养的教学研究》申报书(2013年4月—2015年3月)...
  14. 人工智能知识图谱研究
  15. 苹果三代耳机_p360 AirPods Pro 苹果三代耳机
  16. Nanopi NEO Core测试
  17. 中国电子商务知识产权保护回顾与展望 | 连载(二)
  18. autojs脚本代码大全(实战演练2)
  19. 记一次在VMware中安装黑苹果的经历
  20. rust跳人头上_rust怎么跳的远 | 手游网游页游攻略大全

热门文章

  1. POJ2942 UVA1364 Knights of the Round Table 圆桌骑士
  2. matlab视网膜血管分割,视网膜血管增强与分割算法研究
  3. websocketpp wss
  4. 《MySQL数据库应用技术》
  5. 腾讯云轻量应用服务器搭建后端服务-基于django,nginx,uwsgi,supervisor
  6. 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢?
  7. JavaScript中BOM和DOM(持续更新)
  8. java编写火车订票系统_毕业设计(论文)-基于JavaWeb技术的火车订票系统.doc
  9. 小白终是踏上了这条不归路----小文的mysql学习笔记(22)--------函数
  10. Flutter Decoration背景设定(边框、圆角、阴影、形状、渐变、背景图像等)