zoukankan      html  css  js  c++  java
  • MySQL to Redis同步

    方式1:使用MQ在这里插入图片描述

    对于此类业务,增加一个消费订阅基本没什么成本,服务本身也不需要做任何更改。唯一需要担心的一个问题是丢消息的情况?因为现在消息是缓存数据的唯一来源,一旦出现丢消息,缓存里缺失的那条数据永远不会被补上。 MQ 集群,像 Kafka 或者 RocketMQ,它都有高可用和高可靠的保证机制,可以满足数据可靠性要求的。

    方式2:使用 Binlog 实时更新 Redis 缓存

    在这里插入图片描述
    数据更新服务只负责处理业务逻辑,更新 MySQL,完全不用管如何去更新缓存。另外起程序,支撑binlog的解析到更新redis的过程。
    常用canal负责更新缓存的服务,把自己伪装成一个 MySQL 的从节点,从 MySQL 接收 Binlog,解析 Binlog 之后,可以得到实时的数据变更信息,然后根据这个变更信息去更新 Redis 缓存。

    canal 官方图

    在这里插入图片描述
    官方:canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

    我的配置过程:

    1. 开启binlog
    [mysqld]
    log-bin = /usr/local/var/mysql/mysql_bin_log/mysql-bin #开启binlog/路径
    server_id = 1 ## 配置一个ServerID
    binlog_format=ROW #格式
    

    给 Canal 开一个专门的 MySQL 用户并授权,确保这个用户有复制 Binlog 的权限:

    CREATE USER canal IDENTIFIED BY 'canal'; 
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    

    查看是否开启binlog:
    show variables like '%log_bin%';
    在这里插入图片描述
    查看对应的binlog以及对应记录行数位置:
    show master status
    在这里插入图片描述
    安装canal:

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
    tar zvfx canal.deployer-1.1.4.tar.gz
    

    配置:
    vim canal/conf/example/instance.properties

    
    canal.instance.gtidon=false
    
    # position info
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.master.journal.name=mysql-bin.000007
    canal.instance.master.position=1905
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # username/password
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset = UTF-8
    canal.instance.defaultDatabaseName=test
    # table regex
    canal.instance.filter.regex=.*\..
    

    启动/重启/停止脚本位于bin目录下
    使用canal/bin/startup.sh启动,启动之后看一下日志文件 canal/logs/example/example.log,如果里面没有报错,就说明启动成功并连接到我们的 MySQL 上了。

    使用客户端进行消费(Python):

    #encoding:utf-8
    #author:donghao
    
    import time
    import redis
    import json
    from canal.client import Client
    from canal.protocol import EntryProtocol_pb2
    client = Client()
    client.connect(host='127.0.0.1', port=11111)
    client.check_valid(username=b'', password=b'')
    client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\..*')
    pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='donghao')
    cache = redis.Redis(connection_pool=pool)
    while True:
        message = client.get(100)
        entries = message['entries']
        for entry in entries:
            entry_type = entry.entryType
            if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
                continue
            row_change = EntryProtocol_pb2.RowChange()
            row_change.MergeFromString(entry.storeValue)
            event_type = row_change.eventType
            header = entry.header
            database = header.schemaName
            table = header.tableName
            event_type = header.eventType
            for row in row_change.rowDatas:
                format_data = dict()
                if event_type == EntryProtocol_pb2.EventType.DELETE:
                    for column in row.beforeColumns:
                        format_data = {
                            column.name: column.value
                        }
                elif event_type == EntryProtocol_pb2.EventType.INSERT:
                    for column in row.afterColumns:
                        format_data = {
                            column.name: column.value
                        }
                else:
                    format_data['before'] = format_data['after'] = dict()
                    for column in row.beforeColumns:
                        format_data['before'][column.name] = column.value
                    for column in row.afterColumns:
                        format_data['after'][column.name] = column.value
                data = dict(
                    db=database,
                    table=table,
                    event_type=event_type,
                    data=format_data,
                )
                if event_type == EntryProtocol_pb2.EventType.UPDATE:
                    # 更新前
                    cache_key = "article:article_id_{}".format(format_data['before']['id'])
                    cached_article = cache.get(cache_key)
                    if cached_article:
                        print('更新前 ', json.loads(cached_article))
                    # 更新缓存
                    cache.set(cache_key, json.dumps(format_data['after']))
                    print('更新后 ', json.loads(cache.get(cache_key)))
                print(data)
        time.sleep(1)
    client.disconnect()
    

    在这里插入图片描述

  • 相关阅读:
    去哪儿网门票数据爬虫更新
    每周进度总结12
    每日进度总结20
    每日进度总结19
    每日进度总结18
    每日进度总结17
    每日进度总结16
    每日进度总结15
    每日进度总结14
    每周进度总结11
  • 原文地址:https://www.cnblogs.com/donghaoblogs/p/12824874.html
Copyright © 2011-2022 走看看