zoukankan      html  css  js  c++  java
  • 把kafka数据从hbase迁移到hdfs,并按天加载到hive表(hbase与hadoop为不同集群)

    需求:
    由于我们用的阿里云Hbase,按存储收费,现在需要把kafka的数据直接同步到自己搭建的hadoop集群上,(kafka和hadoop集群在同一个局域网),然后对接到hive表中去,表按每天做分区

    一、首先查看kafka最小偏移量(offset)

    /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.210:9092 -topic test --time -2 (--time -1为查看kafka最大偏移量)
    test:2:0
    test:1:0
    test:0:0

    显示三个partition最小offset都为0,这样的话我们可以直接从kafka把数据迁移到HDFS上面

    还有另外一种情况:

    如果最小offset不为0,需要先把hbase上面的数据导入到HDFS上,再把offset设置为之前存放在zookeeper上面的offset

    方法:

    1). 首先查看存放在zookeeper上面的offset
    /usr/local/zookeeper/bin/zkCli.sh -server 192.168.1.210:2181
    #] get /consumers/test_group/offsets/test/0
    16051516
    cZxid = 0x10050df5c
    ctime = Wed Sep 19 16:15:50 CST 2018
    mZxid = 0x100691806
    mtime = Thu Nov 08 11:30:08 CST 2018
    pZxid = 0x10050df5c
    cversion = 0
    dataVersion = 6433
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 8
    numChildren = 0
    2). 把导入到Hbase的sparkstreaming任务kill掉,然后把hbase的数据全部导入到HDFS上
    3). 然后编写导入到HDFS上面的spark代码,设置相同的group_id,则会读取之前存放在zookeeper中的offset

    二. 把数据导入到HDFS上面(实现代码不在这里作展示)

    我这边保存的文件以日期为命名格式(每天凌晨执行一次,在sparkstreaming里面设置)
    hdfs dfs -ls /data/res/20181108
    drwxr-xr-x   - hadoop    hadoop          0 2018-11-08 00:00 /data/res/20181108/part-00000
    drwxr-xr-x   - hadoop    hadoop          0 2018-11-08 00:00 /data/res/20181108/part-00001
    drwxr-xr-x   - hadoop    hadoop          0 2018-11-08 00:00 /data/res/20181108/part-00002

    三. 把数据进行getmerge,获取每天的数据,然后导入到hive表中去

    实现shell脚本,每天凌晨1点跑,下面按天分割文件,主要是防止当天会掺杂着昨天的数据

    ]$ cat hdfs2hive.sh 
    #! /bin/sh
    
    rm -f /data/hadoop/data/*
    
    hdfs="/usr/local/hadoop/bin/hdfs"
    hive="/usr/local/hive/bin/hive"
    
    yesterday=`date +"%Y%m%d" -d "-1 days"`
    today=`date +"%Y%m%d"`
    
    ## 合并hdfs文件夹到本地
    hdfs_dir="/data/soda_yx/$yesterday"
    
    res=`$hdfs dfs -ls $hdfs_dir`
    
    if [ -n "$res" ];then
        $hdfs dfs -getmerge $hdfs_dir "/data/hadoop/data/data/res.data"
        if [ $? -eq 0 ];then
            echo "merge to local file is success."
        fi
    fi
    
    ## 按天过滤出文件
    dir="/data/hadoop/data"
    `cat $dir/res.data |awk -F"	" '{if($36=="'"$yesterday"'") print $0}' > $dir/$yesterday`
    `cat $dir/res.data |awk -F"	" '{if($36=="'"$today"'") print $0}' > $dir/$today`
    if [ $? -eq 0 ];then
        echo "filter file is success."
    fi
    
    ## 插入数据到hive中去
    if [ -n "$dir/$yesterday" ];then
        $hive -e "LOAD DATA LOCAL INPATH '$local_file' INTO TABLE xxx.soda_report partition(dt='$yesterday')"
        if [ $? -eq 0 ];then
            echo "Import local data to hive is success."
        fi
    fi
       
    if [ -n "$dir/$today" ];then
        $hive -e "LOAD DATA LOCAL INPATH '$dir/$today' INTO TABLE xxx.soda_report partition(dt='$today')"
        if [ $? -eq 0 ];then
            echo "Import local data to hive is success."
        fi
    fi
  • 相关阅读:
    LG P4161 [SCOI2009]游戏/LG P6280 [USACO20OPEN]Exercise G
    BZOJ3473 字符串
    BZOJ4545 DQS的trie
    LG P5212 SubString
    batj ,tmd用的都是什么技术。
    java社招面试题目
    python,go,java 的发展
    互联网 后端技术必备知识
    java语言三件套
    java spring全家桶
  • 原文地址:https://www.cnblogs.com/654wangzai321/p/9970162.html
Copyright © 2011-2022 走看看