zoukankan      html  css  js  c++  java
  • Python:mysql-replication监控MySQL的binlog变动

    设置同步账号权限

    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';
    
    # 刷新权限
    flush privileges;
    

    安装

    pip install mysql-replication
    

    代码示例

    # -*- coding: utf-8 -*-
    
    import datetime
    import json
    
    from pymysqlreplication import BinLogStreamReader
    from pymysqlreplication.row_event import (
        DeleteRowsEvent,
        UpdateRowsEvent,
        WriteRowsEvent
    )
    
    
    class DateEncoder(json.JSONEncoder):
        """
        自定义类,解决报错:
        TypeError: Object of type 'datetime' is not JSON serializable
        """
    
        def default(self, obj):
            if isinstance(obj, datetime.datetime):
                return obj.strftime('%Y-%m-%d %H:%M:%S')
    
            elif isinstance(obj, datetime.date):
                return obj.strftime("%Y-%m-%d")
    
            else:
                return json.JSONEncoder.default(self, obj)
    
    
    # 配置数据库信息
    mysql_settings = {
        'host': '127.0.0.1',
        'port': 3306,
        'user': 'root',
        'passwd': '123456'
    }
    
    
    def main():
        # 实例化binlog 流对象
        stream = BinLogStreamReader(
            connection_settings=mysql_settings,
            server_id=100,  # slave标识,唯一
            blocking=True,  # 阻塞等待后续事件
            # 设定只监控写操作:增、删、改
            only_events=[
                DeleteRowsEvent,
                UpdateRowsEvent,
                WriteRowsEvent
            ]
        )
    
        for binlogevent in stream:
            # binlogevent.dump()  # 打印所有信息
    
            for row in binlogevent.rows:
                # 打印 库名 和 表名
                event = {"schema": binlogevent.schema, "table": binlogevent.table}
    
                if isinstance(binlogevent, DeleteRowsEvent):
                    event["action"] = "delete"
                    event["data"] = row["values"]
    
                elif isinstance(binlogevent, UpdateRowsEvent):
                    event["action"] = "update"
                    event["data"] = row["after_values"]  # 注意这里不是values
    
                elif isinstance(binlogevent, WriteRowsEvent):
                    event["action"] = "insert"
                    event["data"] = row["values"]
    
                print(json.dumps(event, cls=DateEncoder))
                # sys.stdout.flush()
    
        # stream.close()  # 如果使用阻塞模式,这行多余了
    
    
    if __name__ == '__main__':
        main()
    """
    输出数据格式
    {
        "schema": "demo",    # 数据库名
        "table": "student",  # 表名
        "action": "update",  # 动作 insert、delete、update
        "data": {            # 数据,里边包含所有字段
            "id": 26, 
            "name": "haha", 
            "age": 34, 
            "update_time": "2019-06-06 16:59:06", 
            "display": 0
        }
    }
    """
    ————————————————
    本文转自:https://blog.csdn.net/mouday/article/details/91047757
    
  • 相关阅读:
    Ant Design Pro:Layout 组件——嵌套布局
    React实战之将数据库返回的时间转换为几分钟前、几小时前、几天前的形式。
    React实战之Ant Design—Upload上传_附件上传
    React实战之60s倒计时按钮(发送短信验证按钮)
    map方法到底会不会改变原始数组?
    【React】react开发vscode插件推荐
    【React】React 工程的 VS Code 插件及配置
    【React】react新特性实例详解(memo、lazy、suspense、hooks)
    数组常用slice和splice的区别
    【React】react-beautiful-dnd 实现组件拖拽
  • 原文地址:https://www.cnblogs.com/leilijian/p/14476708.html
Copyright © 2011-2022 走看看