Maxwell 的使用
1.Maxwell介绍
Maxwell介绍Maxwell是由美国zendesk开源,用java编写的Mysql实时抓取软件,其抓取的原理也是基于binlog。
2.Maxwell 和canal工具对比
➢Maxwell没有canal那种server+client模式,只有一个server把数据发送到消息队列或redis。如果需要多个实例,通过指定不同配置文件启动多个进程。
➢Maxwell有一个亮点功能,就是canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
➢Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续上次点儿读取数据。
➢Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
➢Maxwell比Canal更加轻量级。
3.安装
安装Maxwell

[root@hadoop01 module]$ tar -zxvf /opt/software/maxwell-1.25.0.tar.gz -C /opt/module/3.4使用前准备工作
➢在数据库中建立一个maxwell库用于存储Maxwell的元数据[root@hadoop01 module]$ mysql -uroot -p123456mysql> CREATE DATABASE maxwell ;
➢分配一个账号可以操作该数据库mysql> GRANT ALL ON maxwell.* TO ‘maxwell’@’%’ IDENTIFIED BY ‘123456’;
➢分配这个账号可以监控其他数据库的权限mysql> GRANT SELECT,REPLICATION SLAVE , REPLICATION CLIENT ON . TO maxwell@’%’;

4.使用Maxwell监控抓取MySQL数据
[[root@hadoop01 maxwell-1.25.0]$ cp config.properties.example config.properties
➢修改配置文件

producer=kafka
kafka.bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
#需要添加
kafka_topic=user_info_log
#mysql login info
host=hadoop01
user=maxwell
password=123456
#需要添加后续初始化会用
client_id=maxwell_1

注意:默认还是输出到指定Kafka主题的一个kafka分区,因为多个分区并行可能会打乱binlog的顺序如果要提高并行度,首先设置kafka的分区数>1,然后设置producer_partition_by属性可选值producer_partition_by=database|table|primary_key|random|column

➢在/home/atguigu/bin目录下编写maxwell.sh启动脚本

[root@hadoop01 maxwell-1.25.0]$ vim/home/root/bin/maxwell.sh
/opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &

➢授予执行权限

[root@hadoop01 maxwell-1.25.0]$ sudo chmod +x /home/root/bin/maxwell.sh

➢运行启动程序

[atguigu@hadoop202 maxwell-1.25.0]$ maxwell.sh

➢启动Kafka消费客户端,观察结果

[root@hadoop01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic user_info_log

5.Maxwell版本的ODS层处理
5.1执行不同操作,Maxwell和canal数据格式对比
➢执行insert测试语句

INSERT INTO z_user_info VALUES(30,‘zhang3’,‘13810001010’),(31,‘li4’,‘1389999999’);

  canal

{“data”:[{“id”:“30”,“user_name”:“zhang3”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“li4”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385314000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:null,“pkNames”:[“id”],“sql”:"",“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385314116,“type”:“INSERT”}

  maxwell

{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“xoffset”:0,“data”:{“id”:30,“user_name”:“zhang3”,“tel”:“13810001010”}}
{“database”:“gmall202004”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“commit”:true,“data”:{“id”:31,“user_name”:“li4”,“tel”:“1389999999”}}

5.2总结数据特点
➢日志结构
canal 每一条SQL会产生一条日志,如果该条Sql影响了多行数据,则已经会通过集合的方式归集在这条日志中。(即使是一条数据也会是数组结构)maxwell 以影响的数据为单位产生日志,即每影响一条数据就会产生一条日志。如果想知道这些日志是否是通过某一条sql产生的可以通过xid进行判断,相同的xid的日志来自同一sql。
➢数字类型
当原始数据是数字类型时,maxwell会尊重原始数据的类型不增加双引,变为字符串。canal一律转换为字符串。
➢带原始数据字段定义
canal数据中会带入表结构。maxwell更简洁。

5.3SparkStreaming对Topic分流业务代码

// An highlighted block
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}import com.atguigu.gmall.realtime.utils.{MyKafkaSink, MyKafkaUtil, OffsetManagerUtil}i
import  org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/** Desc: 基于Maxwell从Kafka中读取业务数据,进行分流*/
object BaseDBMaxwellApp {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = newSparkConf().setMaster("local[4]").setAppName("BaseDBCanalApp")val ssc = new StreamingContext(sparkConf, Seconds(5))val topic = "ods_order_info"val groupId = "base_db_maxwell_group"//从Redis中读取偏移量var recoredDStream: InputDStream[ConsumerRecord[String, String]] = nullval kafkaOffsetMap:Map[TopicPartition,Long=OffsetManagerUtil.getOffset(topic,groupId)if(kafkaOffsetMap!=null &&kafkaOffsetMap.size >0){recoredDStream = MyKafkaUtil.getKafkaStream(topic,ssc,kafkaOffsetMap,groupId)}else{recoredDStream = MyKafkaUtil.getKafkaStream(topic,ssc,groupId)}
//获取当前采集周期中处理的数据对应的分区已经偏移量var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]val offsetDStream: DStream[ConsumerRecord[String, String]] =                                         recoredDStream.transform {rdd => {offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}
}
//将从kafka中读取到的recore数据进行封装为json对象
val jsonObjDStream: DStream[JSONObject] = offsetDStream.map {record => {//获取value部分的json字符串val jsonStr: String = record.value()//将json格式字符串转换为json对象val jsonObject: JSONObject = JSON.parseObject(jsonStr)jsonObject}
}//从json对象中获取table和data,发送到不同的kafka主题
jsonObjDStream.foreachRDD{rdd=>{rdd.foreach{jsonObj=>{val opType: String = jsonObj.getString("type")val dataJsonObj: JSONObject = jsonObj.getJSONObject("data")if(dataJsonObj!=null && !dataJsonObj.isEmpty && !"delete".equals(opType)){//获取更新的表名val tableName: String = jsonObj.getString("table")//拼接发送的主题var sendTopic = "ods_" + tableName//向kafka发送消息MyKafkaSink.send(sendTopic,dataJsonObj.toString())}}}    //修改Redis中Kafka的偏移量OffsetManagerUtil.saveOffset(topic,groupId,offsetRanges)}}ssc.start()ssc.awaitTermination()}}

5.4测试
➢启动Redis
➢启动Maxwel
l➢运行BaseDBMaxwellApp程序
➢查看kafka下的主题

[root@hadoop01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic ods_order_info

6.生产数据 查看情况

Maxwell 的使用相关推荐

  1. [物理学与PDEs]第1章第7节 媒质中的 Maxwell 方程组 7.2 媒质交界面上的条件

    通过 Maxwell 方程组的积分形式易在交界面上各量应满足交界面条件: $$\beex \bea \sez{{\bf D}}\cdot{\bf n}=\omega_f,&\sex{\omeg ...

  2. MySQL 的 Binlog 日志处理工具(Canal/Maxwell/Databus/DTS)对比

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! Canal 定位:基于数据库增量日志解析,提供增量数据订阅 ...

  3. android 6.0虚拟内存,Maxwell的统一虚拟内存只用在专业卡上,GTX 750 Ti或无缘

    拼 命 加 载 中 ... 在GeForce GTX 750 Ti/750显卡上,我们已经见识到了Maxwell架构在高效能上的威力,但Maxwell架构到现在还是个迷,NVIDIA还没有公布详细的架 ...

  4. UA OPTI501 电磁波 求解Maxwell方程组的波动方程方法

    UA OPTI501 电磁波 求解Maxwell方程组的波动方程方法 标量势满足的2阶PDE 矢量势满足的2阶PDE Laplace算子的含义 标量势与矢量势的平面波解 Maxwell方程组: ∇⋅D ...

  5. UA OPTI501 电磁波 求解麦克斯韦方程组的Fourier方法3 Coulomb Gauge下讨论Maxwell方程

    UA OPTI501 电磁波 求解麦克斯韦方程组的Fourier方法3 Coulomb Gauge下讨论Maxwell方程 Use the macroscopic equation with D-fi ...

  6. UA PHYS515A 电磁理论III 静磁学问题1 Maxwell方程与静磁学问题

    UA PHYS515A 电磁理论III 静磁学问题1 Maxwell方程与静磁学问题 静磁学的Maxwell方程 毕奥-萨伐尔定律 安培定律 静磁学的Maxwell方程 假设电场与电位移为0,B⃗\v ...

  7. [物理学与PDEs]第1章第3节 真空中的 Maxwell 方程组, Lorentz 力 3.1 真空中的 Maxwell 方程组...

    1.稍微修正以前局部使用的方程组可以得到真空中的 Maxwell 方程组: $$\beex \bea \Div {\bf E}&=\cfrac{\rho}{\ve_0},\\ \rot{\bf ...

  8. 二阶偏微分方程组 龙格库塔法_深度科普---电磁波(三):无激励下的真空中的Maxwell方程组的解...

    很久没有写过与自己专业相关的文章了,于是计划穿插进几篇有关电磁波的深度科普的文章.计划分为几个部分: 1. 真空中的 方程组 2. 材料中的 方程组和电磁场的边值条件 3. 无激励下的真空中的 方程组 ...

  9. 如何使用 Kafka、MongoDB 和 Maxwell’s Daemon 构建 SQL 数据库的审计系统

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"书",获取后台回复"k8s",可领取k8s资料 本文要点 审计日志 ...

  10. 如何从几何角度上理解方程组只有一个解_深度科普---电磁波(三):无激励下的真空中的Maxwell方程组的解...

    很久没有写过与自己专业相关的文章了,于是计划穿插进几篇有关电磁波的深度科普的文章.计划分为几个部分: 1. 真空中的 方程组 2. 材料中的 方程组和电磁场的边值条件 3. 无激励下的真空中的 方程组 ...

最新文章

  1. 计算机图形学真实绘图实验报告,计算机图形学实验报告.doc
  2. mysql 前30秒_mysql – 简单查询需要15-30秒
  3. 大学计算机基础实验指导试题,(大学计算机基础实验指导)模拟试题(二)参考答案...
  4. 关键字_Java Volatile关键字
  5. WordPress4.8.1版本存在XSS跨站攻击漏洞
  6. 不固定图片宽高瀑布流_APP设计学习:瀑布流式的产品UI设计
  7. 蜘蛛侠3.1(无错版)站群分享源码 带视频,关键字软件安装使用教程
  8. JavaScript 工作原理之二-如何在 V8 引擎中书写最优代码的 5 条小技巧(译)
  9. STM32启动模式及API(转)
  10. OpenStack温哥华峰会Day2日记:大数据带你看峰会热点
  11. python高次方程求根公式,Python简单求解高阶方程的数值解
  12. vue2和vue3 的 keep-alive的用法
  13. h5 移动开发 html页面跳转,iosh5混合开发项目仿app页面跳转优化
  14. C#API说明【Write函数】[汉字转拼音]
  15. 模拟斗地主,按顺序初始化54张扑克牌,然后进行洗牌,随机抽取三张底牌,剩下的按照顺序进行发牌,打印出三个人的牌和底牌(ArrayList)
  16. Android 8.0 VTS 测试 FAIL 项解决记录
  17. 关于128x64OLED屏幕字体类型以及字体大小添加方法
  18. 区间素数个数 树状数组 HIT 1867 经理的烦恼
  19. 遗传算法pareto matlab,matlab遗传算法三目标优化,出来的pareto前沿图只是二维图...
  20. table标签,制作表格注册表

热门文章

  1. python计算协方差矩阵_协方差矩阵、相关系数矩阵的EXCEL和python实现
  2. 大数据助公交行业降成本提效率
  3. html px2rem转化器,【技术】移动端适配 px2rem/px2vw 的原理与实现
  4. python爬虫代理服务器_Python爬虫之服务器:代理IP万能
  5. magento 2 Send Tracking Information 发送跟踪信息 无法发送邮件
  6. 6nm工艺的微观神话,于毫厘间雕刻5G的“神笔马良”
  7. Canoe 安装流程
  8. 医疗行业怎么做数据防泄露
  9. 人生:中国青年女科学家颜宁教授:不向前走,你根本不能轻易定义成功与失败
  10. Layui分页乱码,时间控件乱码