zoukankan      html  css  js  c++  java
  • 用spark导入数据到hbase

    集群环境:一主三从,Spark为Spark On YARN模式

    Spark导入hbase数据方式有多种

    1.少量数据:直接调用hbase API的单条或者批量方法就可以

    2.导入的数据量比较大,那就需要先生成hfile文件,在把hfile文件加载到hbase里面

    下面主要介绍第二种方法:

    该方法主要使用spark Java API的两个方法:

    1.textFile:将本地文件或者HDFS文件转换成RDD

    2.flatMapToPair:将每行数据的所有key-value对象合并成Iterator对象返回(针对多family,多column)

    代码如下:

    package scala;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFlatMapFunction;
    import org.apache.spark.storage.StorageLevel;
    
    import util.HFileLoader;
    
    public class HbaseBulkLoad {
        
        private static final String ZKconnect="slave1,slave2,slave3:2181";
        private static final String HDFS_ADDR="hdfs://master:8020";
        private static final String TABLE_NAME="DBSTK.STKFSTEST";//表名
        private static final String COLUMN_FAMILY="FS";//列族
        
        public static void run(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", ZKconnect);
            configuration.set("fs.defaultFS", HDFS_ADDR);
            configuration.set("dfs.replication", "1");
            
            String inputPath = args[0];
            String outputPath = args[1];
            Job job = Job.getInstance(configuration, "Spark Bulk Loading HBase Table:" + TABLE_NAME);
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类
            job.setMapOutputValueClass(KeyValue.class);//指定输出值类
            job.setOutputFormatClass(HFileOutputFormat2.class);
            
            FileInputFormat.addInputPaths(job, inputPath);//输入路径
            FileSystem fs = FileSystem.get(configuration);
            Path output = new Path(outputPath);
            if (fs.exists(output)) {
                fs.delete(output, true);//如果输出路径存在,就将其删除
            }
            fs.close();
            FileOutputFormat.setOutputPath(job, output);//hfile输出路径
            
            //初始化sparkContext
            SparkConf sparkConf = new SparkConf().setAppName("HbaseBulkLoad").setMaster("local[*]");
            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            //读取数据文件
            JavaRDD<String> lines = jsc.textFile(inputPath);
            lines.persist(StorageLevel.MEMORY_AND_DISK_SER());
            JavaPairRDD<ImmutableBytesWritable,KeyValue> hfileRdd = 
                    lines.flatMapToPair(new PairFlatMapFunction<String, ImmutableBytesWritable, KeyValue>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(String text) throws Exception {
                    List<Tuple2<ImmutableBytesWritable, KeyValue>> tps = new ArrayList<Tuple2<ImmutableBytesWritable, KeyValue>>();
                    if(null == text || text.length()<1){
                        return tps.iterator();//不能返回null
                    }
                    String[] resArr = text.split(",");
                    if(resArr != null && resArr.length == 14){
                        byte[] rowkeyByte = Bytes.toBytes(resArr[0]+resArr[3]+resArr[4]+resArr[5])
                        byte[] columnFamily = Bytes.toBytes(COLUMN_FAMILY);
                        ImmutableBytesWritable ibw = new ImmutableBytesWritable(rowkeyByte);
                        //EP,HP,LP,MK,MT,SC,SN,SP,ST,SY,TD,TM,TQ,UX(字典顺序排序)
                        //注意,这地方rowkey、列族和列都要按照字典排序,如果有多个列族,也要按照字典排序,rowkey排序我们交给spark的sortByKey去管理
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("EP"),Bytes.toBytes(resArr[9]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("HP"),Bytes.toBytes(resArr[7]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("LP"),Bytes.toBytes(resArr[8]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("MK"),Bytes.toBytes(resArr[13]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("MT"),Bytes.toBytes(resArr[4]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("SC"),Bytes.toBytes(resArr[0]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("SN"),Bytes.toBytes(resArr[1]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("SP"),Bytes.toBytes(resArr[6]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("ST"),Bytes.toBytes(resArr[5]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("SY"),Bytes.toBytes(resArr[2]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("TD"),Bytes.toBytes(resArr[3]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("TM"),Bytes.toBytes(resArr[11]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("TQ"),Bytes.toBytes(resArr[10]))));
                        tps.add(new Tuple2<>(ibw,new KeyValue(rowkeyByte, columnFamily, Bytes.toBytes("UX"),Bytes.toBytes(resArr[12]))));
                    }
                    return tps.iterator();
                }
            }).sortByKey();
            
            Connection connection = ConnectionFactory.createConnection(configuration);
            TableName tableName = TableName.valueOf(TABLE_NAME);
            HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));
    
            //生成hfile文件
            hfileRdd.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, job.getConfiguration());
            
            // bulk load start
            Table table = connection.getTable(tableName);
            Admin admin = connection.getAdmin();
            LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
            load.doBulkLoad(new Path(outputPath), admin,table,connection.getRegionLocator(tableName));
            
            jsc.close();
        }
        
        public static void main(String[] args) {
            try {
                long start = System.currentTimeMillis();
                args = new String[]{"hdfs://master:8020/test/test.txt","hdfs://master:8020/test/hfile/test"};
                run(args);
                long end = System.currentTimeMillis();
                System.out.println("数据导入成功,总计耗时:"+(end-start)/1000+"s");
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    代码打包,上传到集群执行如下命令:

    ./spark-submit --master yarn-client --executor-memory 4G --driver-memory 1G --num-executors 100 --executor-cores 4 --total-executor-cores 400 
    --conf spark.default.parallelism=1000 --class scala.HbaseBulkLoad /home/hadoop/app/hadoop/data/spark-hbase-test.jar

    本次只测试导入了50000条数据,在测试导入15G(1.5亿条左右)数据时,导入速度没有MapReduce快

  • 相关阅读:
    Atitit 图片验证码功能设计文档总结目录1.1. 使用图片验证码img src标签设置图片。。验证码图片有png,jpg,svg等格式。。 11.2. Php png图像 11.3. P
    Atitt php script lan debug bp 最佳实践调试php目录1.1. Error_log 11.2. Echo vs log法 11.3. 输出与debug信息昏药问题
    Atitit 项目分析与统计目录1. 静态分析+动态分析 。其中, 12. 模块分析,与模块位置idx 13. 编程语言类型与版本 13.1. 类库统记表 类型与版本 23.2. 中间
    Atitit 增强代码健壮性 出错继续执行恢复模式,就像vbs那样我以为我可以使用Try/Catch,但是我找不到异常后是否可以继续执行代码,并且找不到如何在最后显示错误消息。目录PHP
    Atitit bootsAtitit bootstrap布局 栅格.docx目录1. 简述container与container-fluid的区别 11.1.1. 在bootstrap中的布局
    Atitit php读取数据库记录集合并循环修改展示//------------------------ini db sys$dbstr = “mysql:host=“ . $mysql_con
    Atitit 验证码功能修复总结文档原有的tp5里面的验证码不知怎么有问题了,试图在tp5框架内修复无果。。使用了新的验证码组件 “lifei6671/php-captcha“: “0.
    Atitt 支付业务 银行国际代码(SWIFT Code银行国际代码(SWIFT Code)是由SWIFT协会提出并被ISO通过的银行识别代码,凡该协会的成员银行都有自己特定的SWIFT代码
    Atitit img hot click link 图像背景拉伸100%Map area trouble..So body backgrd img mode...is easy...No
    Atitit doc mng 文档管理总结目录1. 主要几大内容 12. 存储管理 22.1. 一般来说 ,文档存储在IM网盘note邮箱blog wiki等地 22.2. 文档格式与体
  • 原文地址:https://www.cnblogs.com/gdlin/p/9075963.html
Copyright © 2011-2022 走看看