一、先上效果图,再做阐述

DEMO

主从延迟,快速定位哪个表引起的

二、背景

当生产业务出现了主从延迟,DBA无法立刻知道是什么操作引起的,需要解析binlog/relay log、分析,处理问题的时间就是业务被影响的时间,所以不够快。

每张表或者每个库的DML的历史趋势,DBA是需要了解TOP N的,才能更好的分析业务、帮助业务。

其他,不一一罗列,活学活用,方可对症下药

三、实现工具

binlog2sql工具(本质python脚本)进行微改

MySQL

Grafana

四、实现原理

原理比较简单:

binlog2sql,模拟slave,与生产master建立主从关系(类似canal)。

解析binlog event,分析db、table_name、dml类型(insert/update/delete)、event时间。

按照分钟级别、小时级别对db、table_name、dml类型、dml总数、每分钟/每小时记录。

为什么选择生产master,而不是生产slave节点建里关系?因为如果生产slave发生主从延迟,那么binlog2sql依赖延迟的slave,实时性会不可控。

一定要记录binlog event时间作为dml发生时间,记录入库也要记录当前时间,这样可以准确判断DML发生时间同时,也可以判断Binlog2SQL的延迟时间。

这里记录结果用的MySQL并按照分钟、小时两个维度,为了利用MySQL减少对结果集的聚合统计,比较通用。可以选择时序DB或者ClickHouse,效果更佳。

五、源码附上

记录结果MySQL表结构

CREATE TABLE `t_monitor_mysql_table_dml_by_minute` (

`id` bigint(20) NOT NULL AUTO_INCREMENT,

`ip` varchar(100) DEFAULT NULL,

`port` int(11) DEFAULT NULL,

`table_schema` varchar(100) DEFAULT NULL,

`table_name` varchar(200) DEFAULT NULL,

`dml_type` varchar(100) DEFAULT NULL,

`dml_count` bigint(20) DEFAULT NULL,

`start_time` datetime DEFAULT NULL,

`create_time` datetime DEFAULT NULL,

PRIMARY KEY (`id`),

UNIQUE KEY `ix_unique_1` (`ip`,`port`,`start_time`,`table_schema`,`table_name`,`dml_type`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

CREATE TABLE `t_monitor_mysql_table_dml_by_hour` (

`id` bigint(20) NOT NULL AUTO_INCREMENT,

`ip` varchar(100) DEFAULT NULL,

`port` int(11) DEFAULT NULL,

`table_schema` varchar(100) DEFAULT NULL,

`table_name` varchar(200) DEFAULT NULL,

`dml_type` varchar(100) DEFAULT NULL,

`dml_count` bigint(20) DEFAULT NULL,

`start_time` datetime DEFAULT NULL,

`create_time` datetime DEFAULT NULL,

PRIMARY KEY (`id`),

UNIQUE KEY `ix_unique_1` (`ip`,`port`,`start_time`,`table_schema`,`table_name`,`dml_type`),

KEY `ix_1` (`ip`,`port`,`table_schema`,`table_name`,`dml_type`,`start_time`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

为什么要建立唯一索引呢?

因为累积计数,利用了insert into ... on duplicate key update dml_count=dml_count+values(dml_count);

改写binlog2sql脚本 :dbms_monitor_table_dml_stat.py

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import sys

import datetime

import pymysql

from pymysqlreplication import BinLogStreamReader

from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent

from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, temp_open, \

reversed_lines, is_dml_event, event_type

from db_help import *

class Binlog2sql(object):

def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None,

start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False,

flashback=False, stop_never=False, back_interval=1.0, only_dml=True, sql_type=None):

"""

conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'}

"""

if not start_file:

raise ValueError('Lack of parameter: start_file')

self.conn_setting = connection_settings

self.start_file = start_file

self.start_pos = start_pos if start_pos else 4 # use binlog v4

self.end_file = end_file if end_file else start_file

self.end_pos = end_pos

if start_time:

self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")

else:

self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")

if stop_time:

self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S")

else:

self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")

self.only_schemas = only_schemas if only_schemas else None

self.only_tables = only_tables if only_tables else None

self.no_pk, self.flashback, self.stop_never, self.back_interval = (no_pk, flashback, stop_never, back_interval)

self.only_dml = only_dml

self.sql_type = [t.upper() for t in sql_type] if sql_type else []

self.binlogList = []

self.connection = pymysql.connect(**self.conn_setting)

with self.connection as cursor:

cursor.execute("SHOW MASTER STATUS")

self.eof_file, self.eof_pos = cursor.fetchone()[:2]

cursor.execute("SHOW MASTER LOGS")

bin_index = [row[0] for row in cursor.fetchall()]

if self.start_file not in bin_index:

raise ValueError('parameter error: start_file %s not in mysql server' % self.start_file)

binlog2i = lambda x: x.split('.')[1]

for binary in bin_index:

if binlog2i(self.start_file) <= binlog2i(binary) <= binlog2i(self.end_file):

self.binlogList.append(binary)

cursor.execute("SELECT @@server_id")

self.server_id = cursor.fetchone()[0]

if not self.server_id:

raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port']))

def process_binlog(self,ip,port):

ms = MYSQL('记录结果集MySQL IP', 3306, "DB", 1, 30) # 需要人工更改,记录结果使用长连接,提高效率

con = ms.GetConnect()

stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id,

log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas,

only_tables=self.only_tables, resume_stream=True, blocking=True)

flag_last_event = False

e_start_pos, last_pos = stream.log_pos, stream.log_pos

# to simplify code, we do not use flock for tmp_file.

tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port']))

for binlog_event in stream:

if (is_dml_event(binlog_event) and event_type(binlog_event) in self.sql_type):

event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp)

event_time = str(event_time.strftime('%Y-%m-%d %H:%M:%S'))

sql ="insert into t_monitor_mysql_table_dml_by_minute(ip,port,table_schema,table_name,dml_type,start_time,create_time,dml_count) values('{0}',{1},'{2}','{3}','{4}',date_format('{5}','%Y-%m-%d %H:%i:00'),now(),1) on duplicate key update dml_count=dml_count+values(dml_count);".format(ip,port,binlog_event.schema,binlog_event.table,event_type(binlog_event),event_time)

ms.ExecNonQuery(sql, con)

sql = "insert into t_monitor_mysql_table_dml_by_hour(ip,port,table_schema,table_name,dml_type,start_time,create_time,dml_count) values('{0}',{1},'{2}','{3}','{4}',date_format('{5}','%Y-%m-%d %H:00:00'),now(),1) on duplicate key update dml_count=dml_count+values(dml_count);".format(ip, port, binlog_event.schema, binlog_event.table, event_type(binlog_event), event_time)

ms.ExecNonQuery(sql, con)

if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)):

last_pos = binlog_event.packet.log_pos

if flag_last_event:

break

stream.close()

return True

def __del__(self):

pass

if __name__ == '__main__':

args = command_line_args(sys.argv[1:])

conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'}

binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos,

end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time,

stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables,

no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never,

back_interval=args.back_interval, only_dml=args.only_dml, sql_type=args.sql_type)

binlog2sql.process_binlog(ip=args.host,port=args.port)

数据库配置python脚本:db_help.py

# -*- coding:utf-8 -*-

# coding=utf-8

import datetime

import time

import types

import pymysql

class MYSQL:

def __init__(self,ip,port,database_name,type,timeout):

if database_name == "结果集DB名称":

self.host = ip

self.port = port

self.user = "db_user"

self.pwd = "db_password"

self.db = database_name

self.type = type

self.timeout=timeout

def GetConnect(self):

self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.pwd, db=self.db,

charset="utf8",read_timeout=self.timeout)

if self.type ==1:

cur = self.conn.cursor(pymysql.cursors.DictCursor)

else:

cur = self.conn.cursor()

return cur

def ExecQuery(self, sql,cur):

cur.execute(sql)

resList = cur.fetchall()

return resList

def ExecNonQuery(self, sql,cur):

cur.connection.autocommit(True)

cur.execute(sql)

def ms_close(self, cur):

self.conn.close()

cur.close()

运行脚本,进行数据收集:

python dbms_monitor_table_dml_stat.py -h 生产MySQL地址 -u DB用户 -p 密码 --stop-never --start-file 起始同步的binlog文件名 --start-pos 起始binlog的pos

六、补充

达到收集DML目的,也可以利用MySQL sys库schema_table_statistics。时间粒度可控性不强。

我之所以使用binlog2sql,收集DML只是顺手干的,要实现的目标扩展功能是准实时将表、主键、值、SQL和回滚SQL记录下来。这样方便研发排查记录的变更轨迹、为回滚需求提供自定义界面和准确地评估依据。

mysql 2100,MySQL 实现准实时的表级别DML计数相关推荐

  1. Canal+Kafka实现mysql与redis数据准实时同步

    思维导图 文章已收录Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary 前言 在很多业务情况下,我们都会在系统中加入redis缓存 ...

  2. mysql binlog 统计_对MySQL binlog日志解析,统计每张表的DML次数

    想要获取每天数据库每张表的DML的次数,统计热度表,可以使用该脚本 # coding:utf-8 # 解析binlog,统计热度表,表的DML个数 import sys import os # mys ...

  3. log解析工具 px4_详解MySQL Binlog解析工具--binlog2sql,基于表级别的数据恢复

    概述 最近碰到某个表需要做数据回退,如果回退整个数据库动作又太大,所以用了binlog2sql来实现,这里先介绍一下binlog2sql的相关内容. binlog2sql是一个开源的Python开发的 ...

  4. 利用Flume将MySQL表数据准实时抽取到HDFS

    转自:http://blog.csdn.net/wzy0623/article/details/73650053 一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取 ...

  5. flume mysql hdfs_利用Flume将MySQL表数据准实时抽取到HDFS

    一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问.这种方式只需要很少量的配置即可完成数据抽 ...

  6. mysql 数据展示装置_实时生成数据宽表的方法和装置与流程

    本发明涉及计算机技术领域,尤其涉及一种实时生成数据宽表的方法和装置. 背景技术: 数据仓库是面向主题的.集成的.相对稳定的.随时间不短变化得数据集合,用以支持经营管理中的决策制定.数据仓库中的数据面向 ...

  7. mysql实时写入查询_MySQL实时写入表

    {"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],&q ...

  8. mysql同步数据到另一张表_mysql:Otter跨机房数据同步(单向)

    重要说明:需要同步的表必须要有主键 主键 主键 otter是一款基于Java且免费.开源基于数据库增量日志解析,准实时同步到本机房或异地机房的mysql/oracle数据库的解决方案. Otter目前 ...

  9. 【MySQL】MySQL分库分表详解

    目录 一.前言 1.1 数据量 1.2 磁盘 1.3 数据库连接 二.垂直拆分 or 水平拆分? 三.垂直拆分 3.1 垂直分库 3.2 垂直分表 3.3 垂直拆分的优缺点 四.水平拆分 4.1 水平 ...

最新文章

  1. 解决报错OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized.
  2. 安卓开发仿微信图片拖拽_使用Android 模仿微信朋友圈图片拖拽返回
  3. jQuery从入门到忘记
  4. web项目接到请求之后执行sql特别慢_小米开源!SQL优化工具,人工智能帮你 Rewrite...
  5. MySQL DATE_ADD() 函数
  6. 一个不错的验证码的例子
  7. 使用Python和Numpy进行波士顿房价预测任务(二)【深度学习入门_学习笔记】
  8. Uncaught TypeError: Illegal invocation问题解决方法
  9. Pytorch——分类问题
  10. SpringBoot启动类自动包扫描 三种方式
  11. 【UVA11168】Airport(凸包+点到直线距离(一般式))
  12. 《数字图像处理 第三版》(冈萨雷斯)——第七章 小波和多分辨率处理
  13. linux mint 划动鼠标快捷截图
  14. 仓库管理系统怎么选?想高效管理仓库的老板,别错过这篇干货!
  15. CNSD/Echarts图的使用
  16. JESD204接口调试总结——Xilinx JESD204B IP testbench解析
  17. Linux中对mariadb数据库的管理
  18. f_sync有大用但不可以滥用
  19. vectorvn1610报价_VECTOR VN1610 CAN Network 通讯模块
  20. 1665_MIT 6.828 JOS虚拟存储的设置

热门文章

  1. 大数据学习笔记53:Flume Sink Processors(Flume接收器处理器)
  2. 普通变量与寄存器变量速度对比
  3. 【BZOJ4016】最短路径树问题,最短路+点分治
  4. python第四章单元测试_Python 单元测试
  5. 【英语学习】【Level 07】U02 Live Work L5 This is where we work
  6. Mosquitto感知客户端上下线的方法
  7. linux下make 文件写法之简单函数调用
  8. 佐治亚理工学院 计算投资公开课第五周作业 市场仿真器
  9. 快速傅立叶变换(FFT)的海面模拟
  10. python fortran混合编程输入矩阵_如何将动态数组从Python传递到Fortran动态链接库