zoukankan      html  css  js  c++  java
  • 数据同步之DataX

           目前业务中需要进行数据同步, 考虑使用datax数据同步方式替换掉现有的同步方式

    业务场景:

        即将业务中每天生成的日志表中的数据部分字段同步到自己的库中,进行后台数据的查询

    起因:

      之前“大神”写的逻辑中使用每三分钟更新一次的策略进行数据同步,在redis中进行计数和打标记的方式进行数据的增量同步,但是最近发现经常数据会发生丢失的问题,于是进行问题的修复

    解决:

          了解到运营对于这些数据的查询实时性并没有这么高,今天查询昨天的数据这种场景比较多,于是打算使用datax凌晨同步昨天的数据到本地即可,节省资源,减少调用业务的数据库频次

          DataX: https://github.com/alibaba/DataX

          基于java的同步开源项目,基本使用起来较为容易,简单配置即可,完成之后 3万多的数据 只需要不到10秒中就完成了数据同步工作

          数据库-mysql相关的reader和writer相关配置参考:

    • https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
    • https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

         当然他还支持很多的数据库

       这里列个配置demo:

    {
        "job": {
            "setting": {
                "speed": {
                    "byte":10485760
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "root",
                            "password": "123456",
                            "connection": [
                                {
                                    "querySql": [
                                        "select player_id,UNIX_TIMESTAMP(create_time) AS create_time from t_player_log__20210422"
                                    ],
                                    "jdbcUrl": [
                                        "jdbc:mysql://127.0.0.1:3306/demo"
                                    ]
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "writeMode": "insert",
                            "username": "root",
                            "password":"123456",
                            "column": [
                                "player_id",
                                "create_time"
                            ],
                            "session": [
                                "set names utf8mb4"
                            ],
                            "connection": [
                                {
                                    "encoding": "UTF-8",
                                    "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
                                    "table": [
                                        "tt_player_log"
                                    ]
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }

      还支持传入参数的方式动态的配置(python bin/datax.py 支持的参数:-p),例如:

    {
        "job": {
            "setting": {
                "speed": {
                    "byte":10485760
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "${username}",
                            "password": "${password}",
                            "connection": [
                                {
                                    "querySql": [
                                        "select player_id,UNIX_TIMESTAMP(create_time) AS create_time from t_player_log__${dateNum}"
                                    ],
                                    "jdbcUrl": [
                                        "jdbc:mysql://${ip}:${port}/${db}"
                                    ]
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "writeMode": "insert",
                            "username": "root",
                            "password":"123456",
                            "column": [
                                "player_id",
                                "create_time"
                            ],
                            "session": [
                                "set names utf8mb4"
                            ],
                            "connection": [
                                {
                                    "encoding": "UTF-8",
                                    "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
                                    "table": [
                                        "${table}"
                                    ]
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }

      同步命令:

    /bin/python /opt/datax/bin/datax.py /opt/datax/job/game_job.json -p"-Dusername=xxxx -Dpassword=xxx -Dip=xx.xx.xx.xx -Dport=3306 -Ddb=xx -Dtable=xx -DdateNum=20210423"

          相关的job任务会放到job目录下, 新建一个shell脚本放到script下, run_jobs.sh,将相关数据库连接配置到一个数据表中, 添加上定时任务即可

    #!/bin/bash
    
    #说明:
    #    执行同步游戏库玩家日志表数据
    
    day_num=`date -d "-1 day" +%Y%m%d`
    #day_num=`date +%Y%m%d`
    
    
    main(){
        mysql -hxx.xx.xx -Pxx -uxx -pxx -N -e "select game_name,app_id,ip,port,username,password,db_name,player_log_table from t
    xx.tt_game_db where type=2 and status=1" | while read game_name app_id ip port username password db_name player_log_table
        do
            echo "当前游戏:$game_name[$app_id] 数据库信息:$username:$password@$ip:$port/$db_name 目标表:$player_log_table"
            #echo "-Dusername=$username -Dpassword=$password -Dip=$ip -Dport=$port -Ddb=$db_name -Dtable=$player_log_table -DdateNum=$day_num"
            /bin/python /opt/datax/bin/datax.py /opt/datax/job/game_job.json -p"-Dusername=$username -Dpassword=$password -Dip=$ip -Dport=$port -Ddb=
    $db_name -Dtable=$player_log_table -DdateNum=$day_num" 
    
        done
    }
    
    main

    常见问题:

       1.如果数据表中有特殊表情的时候, 可能会出现字符错误的报错信息, 这个时候需要进行设置

    "session": [
          "set names utf8mb4"
    ],

        

  • 相关阅读:
    UVA 10617 Again Palindrome
    UVA 10154 Weights and Measures
    UVA 10201 Adventures in Moving Part IV
    UVA 10313 Pay the Price
    UVA 10271 Chopsticks
    Restore DB後設置指引 for maximo
    每行SQL語句加go換行
    种服务器角色所拥有的权限
    Framework X support IPV6?
    模擬DeadLock
  • 原文地址:https://www.cnblogs.com/xingxia/p/mysql_data_sync.html
Copyright © 2011-2022 走看看