zoukankan      html  css  js  c++  java
  • 基于binlog来分析mysql的行记录修改情况(python脚本分析)

     
        最近写完mysql flashback,突然发现还有有这种使用场景:有些情况下,可能会统计在某个时间段内,MySQL修改了多少数据量?发生了多少事务?主要是哪些表格发生变动?变动的数量是怎么样的? 但是却不需要行记录的修改内容,只需要了解 行数据的 变动情况。故也整理了下。
        昨晚写的脚本,因为个人python能力有限,本来想这不发这文,后来想想,没准会有哪位园友给出优化建议。
     


        如果转载,请注明博文来源: www.cnblogs.com/xinysu/   ,版权归 博客园 苏家小萝卜 所有。望各位支持!
     


    1 实现内容 

        有些情况下,可能会统计在某个时间段内,MySQL修改了多少数据量?发生了多少事务?主要是哪些表格发生变动?变动的数量是怎么样的? 但是却不需要行记录的修改内容,只需要了解 行数据的 变动情况。
        这些情况部分可以通过监控来大致了解,但是也可以基于binlog来全盘分析,binlog的格式是row模式。
        在写flashback的时候,顺带把这个也写了个脚步,使用python编写,都差不多原理,只是这个简单些,介于个人python弱的不行,性能可能还有很大的提升空间,也希望园友能协助优化下。
        先贴python脚步的分析结果图如下,分为4个部分:事务耗时情况、事务影响行数情况、DML行数情况以及操作最频繁表格情况。

    2 脚本简单描述

        脚本依赖的模块中,pymysql需要自行安装。
        创建类queryanalyse,其中有5个函数定义:_get_db、create_tab、rowrecord、binlogdesc跟closeconn。

    2.1 _get_db

        该函数用来解析输入参数值,参数值一共有7个,都是必须填写的。分别为host,user,password,port,table name for transaction,table name for records,对应的简写如下:
     
    ALL options need to assign:
     
    -h    : host, the database host,which database will store the results after analysis
    -u    : user, the db user
    -p    : password, the db user's password
    -P    : port, the db port
    -f    : file path, the binlog file
    -tr    : table name for record , the table name to store the row record
    -tt    : table name for transaction, the table name to store transactions
        比如,执行脚本:python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow,该函数负责处理各个选项的参数值情况,并存储。

    2.2 create_tab

        创建两个表格,分别用来存储 binlog file文件的分析结果。一个用来存储事务的执行开始时间跟结束时间,由选项 -tt来赋值表名;一个是用来存储每一行记录的修改情况,由选项 -tr来赋值表名。
        事务表记录内容:事务的开始时间及事务的结束时间。
        行记录表的内容:库名,表名,DML类型以及事务对应事务表的编号。
     
    root@localhost:mysql3310.sock  14:42:29 [flashback]>show create table tbrow G
    *************************** 1. row ***************************
           Table: tbrow
    Create Table: CREATE TABLE `tbrow` (
      `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
      `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
      `tran_num` int(11) NOT NULL COMMENT 'the transaction number',
      `dbname` varchar(50) NOT NULL,
      `tbname` varchar(50) NOT NULL,
      PRIMARY KEY (`auto_id`),
      KEY `sqltype` (`sqltype`),
      KEY `dbname` (`dbname`),
      KEY `tbname` (`tbname`)
    ) ENGINE=InnoDB AUTO_INCREMENT=295151 DEFAULT CHARSET=utf8
    1 row in set (0.00 sec)
     
    root@localhost:mysql3310.sock  14:42:31 [flashback]>SHOW CREATE TABLE TBTRAN G
    *************************** 1. row ***************************
           Table: TBTRAN
    Create Table: CREATE TABLE `tbtran` (
      `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
      `begin_time` datetime NOT NULL,
      `end_time` datetime NOT NULL,
      PRIMARY KEY (`auto_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=6390 DEFAULT CHARSET=utf8
    1 row in set (0.00 sec)

    2.3 rowrecord

        重点函数,分析binlog文件内容。这里有几个规律:
    1. 每个事务的结束点,是以 'Xid = ' 来查找
      1. 事务的开始时间,是事务内的第一个 'Table_map' 行里边的时间
      2. 事务的结束时间,是以 'Xid = '所在行的 里边的时间
    2. 每个行数据是属于哪个表格,是以 'Table_map'来查找
    3. DML的类型是按照 行记录开头的情况是否为:'### INSERT INTO'  、'### UPDATE' 、'### DELETE FROM' 
    4. 注意,单个事务可以包含多个表格多种DML多行数据修改的情况。

    2.4 binlogdesc

        描述分析结果,简单4个SQL分析。
    1. 分析修改行数据的 事务耗时情况
    2. 分析修改行数据的 事务影响行数情况
    3. 分析DML分布情况
    4. 分析 最多DML操作的表格 ,取前十个分析

    2.5 closeconn

        关闭数据库连接。

    3 使用说明

        首先,确保python安装了pymysql模块,把python脚本拷贝到文件 queryanalyse.py。
        然后,把要分析的binlog文件先用 mysqlbinlog 指令分析存储,具体binlog的文件说明,可以查看之前的博文:关于binary log那些事——认真码了好长一篇。mysqlbinlog的指令使用方法,可以详细查看文档:https://dev.mysql.com/doc/refman/5.7/en/mysqlbinlog.html 。
        比较常用通过指定开始时间跟结束时间来分析 binlog文件。
    mysqlbinlog --start-datetime='2017-04-23 00:00:03' --stop-datetime='2017-04-23 00:30:00' --base64-output=decode-rows -v /data/mysql/logs/mysql-bin.007335 > /tmp/binlog_test.log
         
        分析后,可以把这个 binlog_test.log文件拷贝到其他空闲服务器执行分析,只需要有个空闲的DB来存储分析记录即可。
        假设这个时候,拷贝 binlog_test.log到测试服务器上,测试服务器上的数据库可以用来存储分析内容,则可以执行python脚本了,注意要进入到python脚本的目录中,或者指定python脚本路径。
     
    python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f= /tmp/binlog_test.log -tt=flashback.tbtran -tr=flashback.tbrow
     
        没了,就等待输出吧。
        性能是硬伤,在虚拟机上测试,大概500M的binlog文件需要分析2-3min,有待提高!

    4 python脚本

      1 import pymysql
      2 from pymysql.cursors import DictCursor
      3 import re
      4 import os
      5 import sys
      6 import datetime
      7 import time
      8 import logging
      9 import importlib
     10 importlib.reload(logging)
     11 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ')
     12 
     13 
     14 usage=''' usage: python [script's path] [option]
     15 ALL options need to assign:
     16 
     17 -h     : host, the database host,which database will store the results after analysis 
     18 -u     : user, the db user
     19 -p     : password, the db user's password
     20 -P     : port, the db port
     21 -f     : file path, the binlog file
     22 -tr    : table name for record , the table name to store the row record
     23 -tt    : table name for transaction, the table name to store transactions
     24 Example: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow
     25 
     26 '''
     27 
     28 class queryanalyse:
     29     def __init__(self):
     30         #初始化
     31         self.host=''
     32         self.user=''
     33         self.password=''
     34         self.port='3306'
     35         self.fpath=''
     36         self.tbrow=''
     37         self.tbtran=''
     38 
     39         self._get_db()
     40         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tb_for_record={},tb_for_tran={}'.format(self.host,self.user,self.port,self.fpath,self.tbrow,self.tbtran))
     41 
     42         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8')
     43         self.cur = self.mysqlconn.cursor(cursor=DictCursor)
     44         logging.info('MySQL which userd to store binlog event connection is ok')
     45 
     46         self.begin_time=''
     47         self.end_time=''
     48         self.db_name=''
     49         self.tb_name=''
     50 
     51     def _get_db(self):
     52         #解析用户输入的选项参数值,这里对password的处理是明文输入,可以自行处理成是input格式,
     53         #由于可以拷贝binlog文件到非线上环境分析,所以password这块,没有特殊处理
     54         logging.info('begin to assign values to parameters')
     55         if len(sys.argv) == 1:
     56             print(usage)
     57             sys.exit(1)
     58         elif sys.argv[1] == '--help':
     59             print(usage)
     60             sys.exit()
     61         elif len(sys.argv) > 2:
     62             for i in sys.argv[1:]:
     63                 _argv = i.split('=')
     64                 if _argv[0] == '-h':
     65                     self.host = _argv[1]
     66                 elif _argv[0] == '-u':
     67                     self.user = _argv[1]
     68                 elif _argv[0] == '-P':
     69                     self.port = int(_argv[1])
     70                 elif _argv[0] == '-f':
     71                     self.fpath = _argv[1]
     72                 elif _argv[0] == '-tr':
     73                     self.tbrow = _argv[1]
     74                 elif _argv[0] == '-tt':
     75                     self.tbtran = _argv[1]
     76                 elif _argv[0] == '-p':
     77                     self.password = _argv[1]
     78                 else:
     79                     print(usage)
     80 
     81     def create_tab(self):
     82         #创建两个表格:一个用户存储事务情况,一个用户存储每一行数据修改的情况
     83         #注意,一个事务可以存储多行数据修改的情况
     84         logging.info('creating table ...')
     85         create_tb_sql ='''CREATE TABLE IF NOT EXISTS  {} (
     86                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
     87                           `begin_time` datetime NOT NULL,
     88                           `end_time` datetime NOT NULL,
     89                           PRIMARY KEY (`auto_id`)
     90                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
     91                         CREATE TABLE IF NOT EXISTS  {} (
     92                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
     93                           `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
     94                           `tran_num` int(11) NOT NULL COMMENT 'the transaction number',
     95                           `dbname` varchar(50) NOT NULL,
     96                           `tbname` varchar(50) NOT NULL,
     97                           PRIMARY KEY (`auto_id`),
     98                           KEY `sqltype` (`sqltype`),
     99                           KEY `dbname` (`dbname`),
    100                           KEY `tbname` (`tbname`)
    101                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    102                         truncate table {};
    103                         truncate table {};
    104                         '''.format(self.tbtran,self.tbrow,self.tbtran,self.tbrow)
    105 
    106         self.cur.execute(create_tb_sql)
    107         logging.info('created table {} and {}'.format(self.tbrow,self.tbtran))
    108 
    109     def rowrecord(self):
    110         #处理每一行binlog
    111         #事务的结束采用 'Xid =' 来划分
    112         #分析结果,按照一个事务为单位存储提交一次到db
    113         try:
    114             tran_num=1    #事务数
    115             record_sql='' #行记录的insert sql
    116             tran_sql=''   #事务的insert sql
    117 
    118             self.create_tab()
    119 
    120             with open(self.fpath,'r') as binlog_file:
    121                 logging.info('begining to analyze the binlog file ,this may be take a long time !!!')
    122                 logging.info('analyzing...')
    123 
    124                 for bline in binlog_file:
    125 
    126                     if bline.find('Table_map:') != -1:
    127                         l = bline.index('server')
    128                         n = bline.index('Table_map')
    129                         begin_time = bline[:l:].rstrip(' ').replace('#', '20')
    130 
    131                         if record_sql=='':
    132                             self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]
    133 
    134                         self.db_name = bline[n::].split(' ')[1].replace('`', '').split('.')[0]
    135                         self.tb_name = bline[n::].split(' ')[1].replace('`', '').split('.')[1]
    136                         bline=''
    137 
    138                     elif bline.startswith('### INSERT INTO'):
    139                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (1,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
    140 
    141                     elif bline.startswith('### UPDATE'):
    142                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (2,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
    143 
    144                     elif bline.startswith('### DELETE FROM'):
    145                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (3,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
    146 
    147                     elif bline.find('Xid =') != -1:
    148 
    149                         l = bline.index('server')
    150                         end_time = bline[:l:].rstrip(' ').replace('#', '20')
    151                         self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]
    152                         tran_sql=record_sql+"insert into {}(begin_time,end_time) VALUES ('{}','{}')".format(self.tbtran,self.begin_time,self.end_time)
    153 
    154                         self.cur.execute(tran_sql)
    155                         self.mysqlconn.commit()
    156                         record_sql = ''
    157                         tran_num += 1
    158 
    159         except Exception:
    160             return 'funtion rowrecord error'
    161 
    162     def binlogdesc(self):
    163         sql=''
    164         t_num=0
    165         r_num=0
    166         logging.info('Analysed result printing...
    ')
    167         #分析总的事务数跟行修改数量
    168         sql="select 'tbtran' name,count(*) nums from {}  union all select 'tbrow' name,count(*) nums from {};".format(self.tbtran,self.tbrow)
    169         self.cur.execute(sql)
    170         rows=self.cur.fetchall()
    171         for row in rows:
    172             if row['name']=='tbtran':
    173                 t_num = row['nums']
    174             else:
    175                 r_num = row['nums']
    176         print('This binlog file has {} transactions, {} rows are changed '.format(t_num,r_num))
    177 
    178         # 计算 最耗时 的单个事务
    179         # 分析每个事务的耗时情况,分为5个时间段来描述
    180         # 这里正常应该是 以毫秒来分析的,但是binlog中,只精确时间到second
    181         sql='''select 
    182                       count(case when cost_sec between 0 and 1 then 1 end ) cos_1,
    183                       count(case when cost_sec between 1.1 and 5 then 1 end ) cos_5,
    184                       count(case when cost_sec between 5.1 and 10 then 1 end ) cos_10,
    185                       count(case when cost_sec between 10.1 and 30 then 1 end ) cos_30,
    186                       count(case when cost_sec >30.1 then 1 end ) cos_more,
    187                       max(cost_sec) cos_max
    188                 from 
    189                 (
    190                         select 
    191                             auto_id,timestampdiff(second,begin_time,end_time) cost_sec
    192                         from {}
    193                 ) a;'''.format(self.tbtran)
    194         self.cur.execute(sql)
    195         rows=self.cur.fetchall()
    196 
    197         for row in rows:
    198             print('The most cost time : {} '.format(row['cos_max']))
    199             print('The distribution map of each transaction costed time: ')
    200             print('Cost time between    0 and  1 second : {} , {}%'.format(row['cos_1'],int(row['cos_1']*100/t_num)))
    201             print('Cost time between  1.1 and  5 second : {} , {}%'.format(row['cos_5'], int(row['cos_5'] * 100 / t_num)))
    202             print('Cost time between  5.1 and 10 second : {} , {}%'.format(row['cos_10'], int(row['cos_10'] * 100 / t_num)))
    203             print('Cost time between 10.1 and 30 second : {} , {}%'.format(row['cos_30'], int(row['cos_30'] * 100 / t_num)))
    204             print('Cost time                     > 30.1 : {} , {}%
    '.format(row['cos_more'], int(row['cos_more'] * 100 / t_num)))
    205 
    206         # 计算 单个事务影响行数最多 的行数量
    207         # 分析每个事务 影响行数 情况,分为5个梯度来描述
    208         sql='''select 
    209                     count(case when nums between 0 and 10 then 1 end ) row_1,
    210                     count(case when nums between 11 and 100 then 1 end ) row_2,
    211                     count(case when nums between 101 and 1000 then 1 end ) row_3,
    212                     count(case when nums between 1001 and 10000 then 1 end ) row_4,
    213                     count(case when nums >10001 then 1 end ) row_5,
    214                     max(nums) row_max
    215                from 
    216                   (
    217                     select 
    218                              count(*) nums
    219                     from {} group by tran_num
    220                    ) a;'''.format(self.tbrow)
    221         self.cur.execute(sql)
    222         rows=self.cur.fetchall()
    223 
    224         for row in rows:
    225             print('The most changed rows for each row: {} '.format(row['row_max']))
    226             print('The distribution map of each transaction changed rows : ')
    227             print('Changed rows between    1 and    10 second : {} , {}%'.format(row['row_1'],int(row['row_1']*100/t_num)))
    228             print('Changed rows between   11 and   100 second : {} , {}%'.format(row['row_2'], int(row['row_2'] * 100 / t_num)))
    229             print('Changed rows between  101 and  1000 second : {} , {}%'.format(row['row_3'], int(row['row_3'] * 100 / t_num)))
    230             print('Changed rows between 1001 and 10000 second : {} , {}%'.format(row['row_4'], int(row['row_4'] * 100 / t_num)))
    231             print('Changed rows                       > 10001 : {} , {}%
    '.format(row['row_5'], int(row['row_5'] * 100 / t_num)))
    232 
    233         # 分析 各个行数 DML的类型情况
    234         # 描述 delete,insert,update的分布情况
    235         sql='select sqltype ,count(*) nums from {} group by sqltype ;'.format(self.tbrow)
    236         self.cur.execute(sql)
    237         rows=self.cur.fetchall()
    238 
    239         print('The distribution map of the {} changed rows : '.format(r_num))
    240         for row in rows:
    241 
    242             if row['sqltype']==1:
    243                 print('INSERT rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
    244             if row['sqltype']==2:
    245                 print('UPDATE rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
    246             if row['sqltype']==3:
    247                 print('DELETE rows :{} , {}%
     '.format(row['nums'],int(row['nums']*100/r_num)))
    248 
    249         # 描述 影响行数 最多的表格
    250         # 可以分析是哪些表格频繁操作,这里显示前10个table name
    251         sql = '''select 
    252                       dbname,tbname ,
    253                       count(*) ALL_rows,
    254                       count(*)*100/{} per,
    255                       count(case when sqltype=1 then 1 end) INSERT_rows,
    256                       count(case when sqltype=2 then 1 end) UPDATE_rows,
    257                       count(case when sqltype=3 then 1 end) DELETE_rows
    258                 from {} 
    259                 group by dbname,tbname 
    260                 order by ALL_rows desc 
    261                 limit 10;'''.format(r_num,self.tbrow)
    262         self.cur.execute(sql)
    263         rows = self.cur.fetchall()
    264 
    265         print('The distribution map of the {} changed rows : '.format(r_num))
    266         print('tablename'.ljust(50),
    267               '|','changed_rows'.center(15),
    268               '|','percent'.center(10),
    269               '|','insert_rows'.center(18),
    270               '|','update_rows'.center(18),
    271               '|','delete_rows'.center(18)
    272               )
    273         print('-------------------------------------------------------------------------------------------------------------------------------------------------')
    274         for row in rows:
    275             print((row['dbname']+'.'+row['tbname']).ljust(50),
    276                   '|',str(row['ALL_rows']).rjust(15),
    277                   '|',(str(int(row['per']))+'%').rjust(10),
    278                   '|',str(row['INSERT_rows']).rjust(10)+' , '+(str(int(row['INSERT_rows']*100/row['ALL_rows']))+'%').ljust(5),
    279                   '|',str(row['UPDATE_rows']).rjust(10)+' , '+(str(int(row['UPDATE_rows']*100/row['ALL_rows']))+'%').ljust(5),
    280                   '|',str(row['DELETE_rows']).rjust(10)+' , '+(str(int(row['DELETE_rows']*100/row['ALL_rows']))+'%').ljust(5),
    281                   )
    282         print('
    ')
    283 
    284         logging.info('Finished to analyse the binlog file !!!')
    285 
    286     def closeconn(self):
    287         self.cur.close()
    288         logging.info('release db connections
    ')
    289 
    290 def main():
    291     p = queryanalyse()
    292     p.rowrecord()
    293     p.binlogdesc()
    294     p.closeconn()
    295 
    296 if __name__ == "__main__":
    297     main()
  • 相关阅读:
    写给大家看的设计书(第3版)
    C#总结2012
    python 网络编程学习 1
    深入 聚集索引与非聚集索引(一)
    深研TCP/IP详解卷1开篇
    在实际项目中如何应用门面模式(Facade)
    MVC系列_权限管理之权限控制
    gae上部署了一个定时往手机发送天气预报的小python程序
    ASP.NET开发工具整理第二季
    python 实现文件的递归拷贝
  • 原文地址:https://www.cnblogs.com/xinysu/p/6908722.html
Copyright © 2011-2022 走看看