zoukankan      html  css  js  c++  java
  • 基于python的mysql复制工具

    一简介

    python-mysql-replication 是由python实现的 MySQL复制协议工具,我们可以用它来解析binlog 获取日志的insert,update,delete等事件 ,并基于此做其他业务需求。比如数据更改时失效缓存,监听dml事件通知下游业务方做对应处理。

    其项目信息

    网址     http://www.github.com/noplay/python-mysql-replication
    官方文档 https://python-mysql-replication.readthedocs.io
    

    二 实践

    2.1 安装配置

    获取源代码

    git clone http://www.github.com/noplay/python-mysql-replication

    使用pip 安装

    pip install mysql-replication

    权限:
    可以直接使用复制账号也可以使用其他账号,但是该账号必须SELECT, REPLICATION SLAVE, REPLICATION CLIENT权限

    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'replicator'@'%' IDENTIFIED BY 'xxxxx';

    数据库日志相关的参数设置如下:

    log_bin=on ,binlog_format=row,binlog_row_image=FULL

    2.2 核心类介绍

    python-mysql-replication 的入口是类BinLogStreamReader(),我们在使用该工具时需要实例化一个BinLogStreamReader()对象 stream,BinLogStreamReader 通过 ReportSlave 向主库注册作为一个slave角色,用于接受MySQL的binlog广播。有兴趣的可以研究其代码具体实现。

    该实例提供解析 binlog 各种事件的集合,每个事件也是一个对象。

    初始化BinLogStreamReader()实例需要使用的参数如下:

    connection_settings: 数据库的连接配置信息
    resume_stream:从位置或binlog的最新事件或旧的可用事件开始
    log_file:设置复制开始日志文件
    log_pos:设置复制开始日志pos(resume_stream应该为true)
    auto_position:使用master_auto_position gtid设置位置
    blocking:如果设置为True,会持续监听binlog事件,如果设置为False 则会一次性解析所有可获取的binlog。
    only_events:只解析指定的事件 比如only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],参数类型是一个数组。
    
    #### 以上是比较常用的参数
    
    ignored_events:设置哪些事件可以被忽略。也是一个数组。
    
    only_tables,ignored_tables,only_schemas,ignored_schemas ##根据字面意思理解
    
    freeze_schema:如果为true,则不支持ALTER TABLE速度更快。
    skip_to_timestamp:在达到指定的时间戳之前忽略所有事件,否则会解析所有可访问的binlog
    report_slave:用于向主库注册SHOW SLAVE HOSTS中slave,该值可以是字典比如{'hostname':'127.0.0.1', 'username':'root', 'password':'rep', 'port':3306}
    
    slave_uuid:在SHOW SLAVE HOSTS中slave_uuid。
    fail_on_table_metadata_unavailable:如果我们无法获取有关row_events的表信息,应该引发异常。
    
    

    2.3 如何使用呢?

    最简单的用法 脚本名 pyreplica.py

    from pymysqlreplication import BinLogStreamReader
    MYSQL_SETTINGS = {
        "host": "127.0.0.1",
        "port": 3306,
        "user": "root",
        "passwd": ""
    }
    
    def main():
        # server_id is your slave identifier, it should be unique.
        # set blocking to True if you want to block and wait for the next event at
        # the end of the stream
        stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
                                    server_id=3,
                                    blocking=True)
    
        for binlogevent in stream:
            binlogevent.dump()
        stream.close() ###如果blocking=True ,改行记录可以不用。
    if __name__ == "__main__":
        main()
    
    
    

    开启两个窗口,一个窗口执行,另外一个窗口操作mysql 写入或者修改数据

    python pyreplica.py

    输出如下:

    === GtidEvent ===
    Date: 2019-06-25T17:41:34
    Log position: 339
    Event size: 42
    Read bytes: 25
    Commit: False
    GTID_NEXT: cc726403-93d1-11e9-90b7-ecf4bbde7778:13
    ()
    === QueryEvent ===
    Date: 2019-06-25T17:41:34
    Log position: 411
    Event size: 49
    Read bytes: 49
    Schema: test
    Execution time: 0
    Query: BEGIN
    ()
    === TableMapEvent ===
    Date: 2019-06-25T17:41:34
    Log position: 456
    Event size: 22
    Read bytes: 21
    Table id: 126
    Schema: test
    Table: x
    Columns: 2
    ()
    === WriteRowsEvent ===
    Date: 2019-06-25T17:41:34
    Log position: 500
    Event size: 21
    Read bytes: 12
    Table: test.x
    Affected columns: 2
    Changed rows: 1
    Values:
    --
    ('*', u'a', ':', 1)
    ('*', u'id', ':', 18)
    ()
    === XidEvent ===
    Date: 2019-06-25T17:41:34
    Log position: 531
    Event size: 8
    Read bytes: 8
    Transaction ID: 1293393
    ()
    

    2.3 拓展

    基于该工具提供的日志事件解析我们可以做很多事情,比较有名的工具 binlog2sql 利用该工具解析binlog 做数据回滚 。

    mysql-replication.py

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from pymysqlreplication import BinLogStreamReader
    from pymysqlreplication.row_event import (
        DeleteRowsEvent,
        UpdateRowsEvent,
        WriteRowsEvent,
    )
    import sys
    import json
    
    mysql_settings = {'host': '127.0.0.1','port': 3306, 
                      'user': 'replica', 'passwd': 'xxxx'}
    def main():
    
        stream = BinLogStreamReader(
            connection_settings=mysql_settings,
            server_id=1,
            blocking=True,
            only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
    
        for binlogevent in stream:
            for row in binlogevent.rows:
                event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}
                if isinstance(binlogevent, DeleteRowsEvent):
                    event["action"] = "delete"
                    event["values"] = dict(row["values"].items())
                    event = dict(event.items())
                elif isinstance(binlogevent, UpdateRowsEvent):
                    event["action"] = "update"
                    event["before_values"] = dict(row["before_values"].items())
                    event["after_values"] = dict(row["after_values"].items())
                    event = dict(event.items())
                elif isinstance(binlogevent, WriteRowsEvent):
                    event["action"] = "insert"
                    event["values"] = dict(row["values"].items())
                    event = dict(event.items())
                print json.dumps(event)
                sys.stdout.flush()
    
    if __name__ == "__main__":
        main()
    
    

    执行脚本结果 如下图

    除了解析binlog,我们还可以用python-mysql-replication 做数据全量加增量迁移。比如仅仅迁移某些大表而不是整个库的时候,可以用到。有兴趣的朋友可以想想大概的算法。

  • 相关阅读:
    二维数组的查找问题
    将字符串编码成数值,求数值最大和问题(今日头条笔试题)
    链表的倒序打印
    求方程的近似解
    多边形构成问题(今日头条笔试题)
    各种语言数据类型大小
    luoguP1551 亲戚
    Codeforces 764 A-B
    Mixing Chemicals
    Day 8 of JZJX
  • 原文地址:https://www.cnblogs.com/yangyi402/p/11088728.html
Copyright © 2011-2022 走看看