zoukankan      html  css  js  c++  java
  • Hbase基础(十九):亿级数据从 MySQL 到 Hbase 的三种同步方案与实践(三)同步利器

    原文:https://blog.csdn.net/rlnlo2pnefx9c/article/details/108288956

    4.1 简单粗暴Sqoop

    首先来回顾一下Sqoop架构图:

    架构图

    这里大家记住一个规则:大数据需要切分!如果不切分,这个亿级数据直接导入会崩溃!!!

    什么是Sqoop?

    Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的开源工具,可以将一个关系型数据库中的数据导进到Hadoop的HDFS或者HBase等。

    sqoop核心参数与代码解释:

    sqoop import 
    --connect jdbc:mysql://localhost:3306/loaddb
    --username root
    --password xxxx
    --query "${sql}"
    --hbase-row-key id
    --hbase-create-table
    --column-family info
    --hbase-table mysql_data
    --split-by id -m 4

    --connect 指定连接的数据库,如果你的数据库不是本地的,记得修改地址!--username 用户名 --password  密码 --query sql语句 --hbase-row-key 指定rowkey,如果存在则修改为该值 --hbase-create-table 创建Hbase表 --column-family 列簇 --hbase-table hbase表名

    注意:当-m 设置的值大于1时,split-by必须设置字段!

    由于数据太大,需要分片导入,具体导入代码见仓库:

    up=185941000
    for((i=1; i>0; i++))
    do   
        start=$(((${i} - 1) * 40000 + 1))
        end=$((${i} * 40000))
        if [ $end -ge $up ]
        then 
            end=185941000
        fi  
     
        sql="select id,carflag, touchevent, opstatus,gpstime,gpslongitude,gpslatitude,gpsspeed,gpsorientation,gpsstatus from loaddb.loadTable1 where id>=${start} and id<=${end} and $CONDITIONS";
        
        sqoop import --connect jdbc:mysql://localhost:3306/loaddb --username root --password xxxx --query "${sql}" --hbase-row-key id --hbase-create-table --column-family info --hbase-table mysql_data --split-by id  -m 4
        echo Sqoop import from: ${start} to: ${end} success....................................
        if [ $end -eq $up ]
        then 
            break
        fi  
     
    done

    思路是每隔4万导入一次,当然您也可以修改。

    耗时:(使用linux的time统计bash脚本运行时间)

     enter image description here

    导入结果:

     enter image description here

    如果遇到问题,显示虚拟内存溢出,不断新开进程,杀死之前的进程,解决方案:关闭虚拟内存。

     enter image description here

    修改yarn-site.xml

    <property>
      <name>yarn.nodemanager.vmem-check-enabled</name>
      <value>false</value>
    </property>

    以上就是MySQL同步Hbase方案1。

    4.2 Kafka-thrift同步


    enter image description here

    4.2.1 binlog

    binlog是sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志。

    可以简单的理解该log记录了sql标中的更新删除插入等操作记录。通常应用在数据恢复、备份等场景。

    开启binlog

    对于我的mysql的配置文件在下面这个文件夹,当然直接编辑my.cnf也是可以的。

    1.  
      vi /etc/mysql/mysql.conf.d/mysqld.cnf
    2.  
       

    对配置文件设置如下:

    openbinlog

    查看是否启用

    进入mysql客户端输入:

    1.  
      show variables like '%log_bin%';
    2.  
       

    binlog

    binlog介绍

    我的log存放在var下面的log的mysql下面:

    loglook

    在mysql-bin.index中包含了所有的log文件,比如上述图就是包含了1与2文件,文件长度超过相应大小就会新开一个log文件,索引递增,如上面的000001,000002。

    binlog实战

    首先创建一个表:

    1.  
      create table house(id int not null primary key,house int,price int);
    2.  
       

    向表中插入数据:

    1.  
      insert into loaddb.house(id,house,price) values(1,2,3);
    2.  
       

    上面提到插入数据后,binlog会更新,那么我们去查看上面log文件,应该会看到插入操作。

    Mysql binlog日志有ROW,Statement,MiXED三种格式;

    1.  
      set global binlog_format='ROW/STATEMENT/MIXED'
    2.  
       

    命令行:

    1.  
      show variables like 'binlog_format'
    2.  
       

    row

    对于mysql5.7的,binlog格式默认为ROW,所以不用修改。

    那么为何要了解binlog格式呢,原因很简单,我要查看我的binlog日志,而该日志为二进制文件,打开后是乱码的。对于不同的格式,查看方式不一样!

    对于ROW模式生成的sql编码需要解码,不能用常规的办法去生成,需要加上相应的参数,如下代码:

    1.  
      sudo /usr/bin/mysqlbinlog mysql-bin.000002 --base64-output=decode-rows -v
    2.  
       

    使用mysqlbinlog工具查看日志文件:

    binlog

    4.2.2 启动thrift接口

    thrift为其他语言与hbase操纵接口。启动目的为后面数据插入做准备。

    enter image description here

    4.2.3 kafka-thrift流程小结

    使用github仓库代码将原始数据进行每2w一个文件切分!

    切分输出:

    split上述切分速度非常快,2分钟左右即可切完,可以自定义文件大小。

    编写Kafka数据入Hbase,完整代码见github仓库代码:

    def batchTokafka(self,start_time,table_name):
        table = self.conn.table(table_name)
        i = 1
        with table.batch(batch_size=1024*1024) as bat:
            for m in self.consumer:
                t = time.time()
                database = json.loads(m.value.decode('utf-8'))["database"]
                name = json.loads(m.value.decode('utf-8'))["table"]
                row_data = json.loads(m.value.decode('utf-8'))["data"]
                if database=='loaddb' and name == 'sqlbase1':
                    row_id = row_data["id"]
                    row = str(row_id)
                    print(row_data)
                    del row_data["id"]
                    data = {}
                    for each in row_data:
                        neweach = 'info:' + each
                        data[neweach] = row_data[each]
                    data['info:gpslongitude'] = str(data['info:gpslongitude'])
                    data['info:gpslatitude'] = str(data['info:gpslatitude'])
                    data['info:gpsspeed'] = str(data['info:gpsspeed'])
                    data['info:gpsorientation'] = str(data['info:gpsorientation'])
                    # self.insertData(table_name, row, data)
                    print(data)
                    bat.put(row,data)
                    if i%1000==0:
                        print("===========插入了" + str(i) + "数据!============")
                        print("===========累计耗时:" + str(time.time() - start_time) + "s=============")
                        print("===========距离上次耗时"+ str(time.time() - t)  +"=========")
                    i+=1

    上述运行后,开始MySQL数据插入,这里插入采用4个多进程进行程序插入,速度非常快。

    当MySQL数据在插入的同时,数据流向如下:

    mysql插入->入库mysql->记录binlog->maxwell提取binlog->返回json给kafka->kafka消费端通过thrift接口->写入hbase。

    上述同步的结果如下,为了明确是否真正数据同步,只看了一条数据,作为验证。

    多个进程插入图

    mutprocess

    kafka消费入hbase图

    enter image description here

    MySQL数据图

    enter image description here

    Hbase数据图

    enter image description here

    以上就是从Mysql到Hbase的同步方案2。

    4.3 Kafka-Flink

    enter image description here

    4.3.1 实时同步Flink

    方案3为方案2的改进,上述是通过Python写入Hbase,这里改成java,并使用最新的流处理技术:Flink。

    Flink在ETL场景中使用频繁,非常适合数据同步,于是在这个方案中采用Flink进行同步。

    核心代码实现,完整代码见github仓库地址:

    SingleOutputStreamOperator<Student> student = env.addSource(
    new FlinkKafkaConsumer011<>(
            "test",   //这个 kafka topic 需要和上面的工具类的 topic 一致
            new SimpleStringSchema(),
            props)).setParallelism(9)
            .map(string -> JSON.parseObject(string, Student.class))
            .setParallelism(9);
     
    long start =System.currentTimeMillis();
    student.timeWindowAll(Time.seconds(3)).apply(new AllWindowFunction<Student,
    List<Student>, TimeWindow>() {
        @Override
        public void apply(TimeWindow window, Iterable<Student> values,
        Collector<List<Student>> out) throws Exception {
            ArrayList<Student> students = Lists.newArrayList(values);
            if (students.size() > 0) {
                System.out.println("1s内收集到 mysql表 的数据条数是:" 
                + students.size());
                long end =System.currentTimeMillis();
                System.out.printf("已经用时time:%d
    ",end-start);
                out.collect(students);
            }
        }
    }).addSink(new SinkToHBase()).setParallelism(9);

    使用Flink进行批量入Hbase。

    4.3.2 Flink小结

    首先启动maxwell与kafka,hbase也要启动,接着在数据写入端,可以采用load data infile或者python程序插入法进行数据插入,数据会通过maxwell到kafka再到Flink,然后sink到Hbase。

    插入端为load data infile的同步

     flink_sink

    插入端为Python程序的同步

    flink_load

     

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/14128382.html

  • 相关阅读:
    图论分支-Tarjan初步-边双联通分量
    图论分支-Tarjan初步-割点和割边
    简谈图论重要性&&图论总结
    Angular本地数据存储LocalStorage
    (转)AngularJS中使用的表单验证
    (转)AngularJS判断checkbox/复选框是否选中并实时显示
    (转载)JavaScript世界万物诞生记
    (转载)关于My97 datepicker与Angular ng-model绑定问题解决。
    mysql
    mysql
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14128382.html
Copyright © 2011-2022 走看看