zoukankan      html  css  js  c++  java
  • 数据同步之DataX

           目前业务中需要进行数据同步, 考虑使用datax数据同步方式替换掉现有的同步方式

    业务场景:

        即将业务中每天生成的日志表中的数据部分字段同步到自己的库中,进行后台数据的查询

    起因:

      之前“大神”写的逻辑中使用每三分钟更新一次的策略进行数据同步,在redis中进行计数和打标记的方式进行数据的增量同步,但是最近发现经常数据会发生丢失的问题,于是进行问题的修复

    解决:

          了解到运营对于这些数据的查询实时性并没有这么高,今天查询昨天的数据这种场景比较多,于是打算使用datax凌晨同步昨天的数据到本地即可,节省资源,减少调用业务的数据库频次

          DataX: https://github.com/alibaba/DataX

          基于java的同步开源项目,基本使用起来较为容易,简单配置即可,完成之后 3万多的数据 只需要不到10秒中就完成了数据同步工作

          数据库-mysql相关的reader和writer相关配置参考:

    • https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
    • https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

         当然他还支持很多的数据库

       这里列个配置demo:

    {
        "job": {
            "setting": {
                "speed": {
                    "byte":10485760
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "root",
                            "password": "123456",
                            "connection": [
                                {
                                    "querySql": [
                                        "select player_id,UNIX_TIMESTAMP(create_time) AS create_time from t_player_log__20210422"
                                    ],
                                    "jdbcUrl": [
                                        "jdbc:mysql://127.0.0.1:3306/demo"
                                    ]
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "writeMode": "insert",
                            "username": "root",
                            "password":"123456",
                            "column": [
                                "player_id",
                                "create_time"
                            ],
                            "session": [
                                "set names utf8mb4"
                            ],
                            "connection": [
                                {
                                    "encoding": "UTF-8",
                                    "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
                                    "table": [
                                        "tt_player_log"
                                    ]
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }

      还支持传入参数的方式动态的配置(python bin/datax.py 支持的参数:-p),例如:

    {
        "job": {
            "setting": {
                "speed": {
                    "byte":10485760
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "${username}",
                            "password": "${password}",
                            "connection": [
                                {
                                    "querySql": [
                                        "select player_id,UNIX_TIMESTAMP(create_time) AS create_time from t_player_log__${dateNum}"
                                    ],
                                    "jdbcUrl": [
                                        "jdbc:mysql://${ip}:${port}/${db}"
                                    ]
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "writeMode": "insert",
                            "username": "root",
                            "password":"123456",
                            "column": [
                                "player_id",
                                "create_time"
                            ],
                            "session": [
                                "set names utf8mb4"
                            ],
                            "connection": [
                                {
                                    "encoding": "UTF-8",
                                    "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
                                    "table": [
                                        "${table}"
                                    ]
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }

      同步命令:

    /bin/python /opt/datax/bin/datax.py /opt/datax/job/game_job.json -p"-Dusername=xxxx -Dpassword=xxx -Dip=xx.xx.xx.xx -Dport=3306 -Ddb=xx -Dtable=xx -DdateNum=20210423"

          相关的job任务会放到job目录下, 新建一个shell脚本放到script下, run_jobs.sh,将相关数据库连接配置到一个数据表中, 添加上定时任务即可

    #!/bin/bash
    
    #说明:
    #    执行同步游戏库玩家日志表数据
    
    day_num=`date -d "-1 day" +%Y%m%d`
    #day_num=`date +%Y%m%d`
    
    
    main(){
        mysql -hxx.xx.xx -Pxx -uxx -pxx -N -e "select game_name,app_id,ip,port,username,password,db_name,player_log_table from t
    xx.tt_game_db where type=2 and status=1" | while read game_name app_id ip port username password db_name player_log_table
        do
            echo "当前游戏:$game_name[$app_id] 数据库信息:$username:$password@$ip:$port/$db_name 目标表:$player_log_table"
            #echo "-Dusername=$username -Dpassword=$password -Dip=$ip -Dport=$port -Ddb=$db_name -Dtable=$player_log_table -DdateNum=$day_num"
            /bin/python /opt/datax/bin/datax.py /opt/datax/job/game_job.json -p"-Dusername=$username -Dpassword=$password -Dip=$ip -Dport=$port -Ddb=
    $db_name -Dtable=$player_log_table -DdateNum=$day_num" 
    
        done
    }
    
    main

    常见问题:

       1.如果数据表中有特殊表情的时候, 可能会出现字符错误的报错信息, 这个时候需要进行设置

    "session": [
          "set names utf8mb4"
    ],

        

  • 相关阅读:
    Nginx 允许多个域名跨域访问
    mongo 命令
    PyTorch torch.cuda.device_count 返回值与实际 GPU 数量不一致
    APUE 学习笔记 —— 文件I/O
    Django transaction 误用之后遇到的一个问题与解决方法
    如何更新 CentOS 镜像源
    Supervisor 的安装与配置教程
    Sentry的安装搭建与使用
    Python, Django 性能分析工具的使用
    记一次 Apache HUE 优化之因使用 Python 魔术方法而遇到的坑
  • 原文地址:https://www.cnblogs.com/xingxia/p/mysql_data_sync.html
Copyright © 2011-2022 走看看