zoukankan      html  css  js  c++  java
  • spark读HFile对hbase表数据进行分析

    要求:计算hasgj表,计算每天新增mac数量。

    因为spark直接扫描hbase表,对hbase集群访问量太大,给集群造成压力,这里考虑用spark读取HFile进行数据分析。

    1、建立hasgj表的快照表:hasgjSnapshot

    语句为:snapshot 'hasgj','hasgjSnapshot'

    2、计算每天mac增量的代码如下:

    package com.ba.sparkReadHbase.operatorHfile.hfileinputformat;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.NavigableMap;
    import java.util.Set;
    import java.util.Map.Entry;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
    import org.apache.hadoop.hbase.util.Base64;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import scala.Tuple2;
    
    public class SparkReadHFile {
        private static String convertScanToString(Scan scan) throws IOException {
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
            return Base64.encodeBytes(proto.toByteArray());
        }
    
        public static void main(String[] args) throws IOException {
            final String date=args[0];
            int max_versions = 3;
            SparkConf sparkConf = new SparkConf().setAppName("sparkReadHfile");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            Configuration hconf = HBaseConfiguration.create();
            hconf.set("hbase.rootdir", "/hbase");
            hconf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes("ba"));
            scan.setMaxVersions(max_versions);
            hconf.set(TableInputFormat.SCAN, convertScanToString(scan));
            Job job = Job.getInstance(hconf);
            Path path = new Path("/snapshot");
            String snapName ="hasgjSnapshot";
            TableSnapshotInputFormat.setInput(job, snapName, path);
            JavaPairRDD<ImmutableBytesWritable, Result> newAPIHadoopRDD = sc.newAPIHadoopRDD(job.getConfiguration(), TableSnapshotInputFormat.class, ImmutableBytesWritable.class,Result.class);
            List<String> collect = newAPIHadoopRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, String>(){
                private static final long serialVersionUID = 1L;
                public String call(Tuple2<ImmutableBytesWritable, Result> v1)
                        throws Exception {
                    // TODO Auto-generated method stub
                    String newMac =null;
                    Result result = v1._2();
                    if (result.isEmpty()) {
                        return null;
                    }
                    String rowKey = Bytes.toString(result.getRow());
                    //System.out.println("行健为:"+rowKey);
                    NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("ba"));
                    Set<Entry<byte[], byte[]>> entrySet = familyMap.entrySet();
                    java.util.Iterator<Entry<byte[], byte[]>> it = entrySet.iterator();
                    String colunNmae =null;
                    String minDate="34561213";
                    while(it.hasNext()){
                        colunNmae = new String(it.next().getKey());//
                        if(colunNmae.compareTo(minDate)<0){
                            minDate=colunNmae;
                        }
                    }
                    
                    if (date.equals(minDate)) {
    //                    row=rowKey.substring(4);
                        newMac=rowKey;
                        //ls.add(rowKey.substring(4));
                        //bf.append(rowKey+"----");
                    }
                    return  newMac;
                }
             }).collect();
              ArrayList<String> arrayList = new ArrayList<String>();
                for (int i = 0; i < collect.size(); i++) {
                    if (collect.get(i) !=null) {
                        arrayList.add(collect.get(i));
                    }
                }
                 System.out.println("新增mac数"+(arrayList.size()));
            
        }
    }

    3、特别说明:

    hasgj表的表结构:

     0000F470ABF3A587                          column=ba:20170802, timestamp=1517558687930, value=                                                                         
     0000F470ABF3A587                          column=ba:20170804, timestamp=1517593923254, value=                                                                         
     0000F470ABF3A587                          column=ba:20170806, timestamp=1517620990589, value=                                                                         
     0000F470ABF3A587                          column=ba:20170809, timestamp=1517706294758, value=                                                                         
     0000F470ABF3A587                          column=ba:20170810, timestamp=1517722369020, value=                                                                         
     0000F470ABF3A587                          column=ba:20170811, timestamp=1517796060984, value=                                                                         
     0000F470ABF3A587                          column=ba:20170816, timestamp=1517882948856, value=                                                                         
     0000F470ABF3A587                          column=ba:20170818, timestamp=1517912603602, value=                                                                         
     0000F470ABF3A587                          column=ba:20170819, timestamp=1517938488763, value=                                                                         
     0000F470ABF3A587                          column=ba:20170821, timestamp=1517989742180, value=                                                                         
     0000F470ABF3A587                          column=ba:20170827, timestamp=1518383470292, value=                                                                         
     0000F470ABF3A587                          column=ba:20170828, timestamp=1520305841272, value=                                                                         
     0000F470ABF3A587                          column=ba:20170831, timestamp=1522115116459, value=                                                                         
     0000F4730088A5D3                          column=ba:20170805, timestamp=1517598564121, value=                                                                         
     0000F47679E83F7D                          column=ba:20170817, timestamp=1517890046587, value=                                                                         
     0000F47FBA753FC7                          column=ba:20170827, timestamp=1518365792130, value=                                                                         
     0000F48C02F8EB83                          column=ba:20170810, timestamp=1517729864592, value=                                                                         
     0000F49578E63F55                          column=ba:20170828, timestamp=1520302223714, value=                                                                         
     0000F4AC4A93F7A5                          column=ba:20170810, timestamp=1517724545955, value=                                                                         
     0000F4B4807679AA                          column=ba:20170801, timestamp=1517543775374, value=                                                                         
     0000F4B7E374C0FF                          column=ba:20170804, timestamp=1517578239073, value=                                                                         
     0000F4BDBF6EBF37                          column=ba:20170829, timestamp=1520558747936, value=                                                                         
     0000F4CB52FDDA58                          column=ba:20170806, timestamp=1517638015583, value=                                                                         
     0000F4CB52FDDA58                          column=ba:20170807, timestamp=1517677405900, value=      

    4、提交作业命令:

    ./spark-submit --master yarn-client  --num-executors 7 --executor-cores 2 --driver-memory 2g  --executor-memory 30g --class com.ba.sparkReadHbase.operatorHfile.hfileinputformat.SparkReadHFile  /home/xxx0108/ftttttttt/testJar/sparkTest9.jar 20170806

  • 相关阅读:
    dom4j解析xml字符串实例
    使用Dom4j生成xml文件(utf-8编码)
    viewer.js插件简单使用说明
    html标签title属性效果优化
    XML文件读写编码不是UTF-8的问题
    webservice wsdl文件标签讲解
    定销房概念扫盲
    Sql Server系列:视图
    Centos7安装mysql5.6
    java实现链式队列
  • 原文地址:https://www.cnblogs.com/kwzblog/p/9007713.html
Copyright © 2011-2022 走看看