zoukankan      html  css  js  c++  java
  • Spark读取Hbase中的数据

    大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1)、调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:

    JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));

    Scala版本如下:

    val myRDD= sc.parallelize(List(1,2,3))

      这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。Java版如下:

    /////////////////////////////////////////////////////////////////////
     User: 过往记忆
     Date: 14-6-29
     Time: 23:59
     bolg:
     本文地址:/archives/1051
     过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
     过往记忆博客微信公共帐号:iteblog_hadoop
    /////////////////////////////////////////////////////////////////////
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
     
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    sc.addFile("wyp.data");
    JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data"));

    Scala版本如下:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
     
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    sc.addFile("spam.data")
    val inFile = sc.textFile(SparkFiles.get("spam.data"))

      在实际情况下,我们需要的数据可能不是简单的存放在HDFS文本中,我们需要的数据可能就存放在Hbase中,那么我们如何用Spark来读取Hbase中的数据呢?本文的所有测试是基于Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代码的编写有点不同。本文只是简单地用Spark来读取Hbase中的数据,如果需要对Hbase进行更强的操作,本文可能不能帮你。话不多说,Spark操作Hbase的Java版本代码如下:

    package com.iteblog.spark;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
    import org.apache.hadoop.hbase.util.Base64;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Serializable;
    import scala.Tuple2;
    
    import java.io.IOException;
    import java.util.List;
    
    /**
     * User: iteblog
     * Date: 14-6-27
     * Time: 下午5:18
     *blog: http://www.iteblog.com
     *
     * Usage: bin/spark-submit --master yarn-cluster --class com.iteblog.spark.SparkFromHbase
     * --jars /home/q/hbase/hbase-0.96.0-hadoop2/lib/htrace-core-2.01.jar,
     * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-common-0.96.0-hadoop2.jar,
     * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-client-0.96.0-hadoop2.jar,
     * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-protocol-0.96.0-hadoop2.jar,
     * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-server-0.96.0-hadoop2.jar
     * ./spark_2.10-1.0.jar
     */
    public class SparkFromHbase implements Serializable {
    
        /**
         * copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
         *
         * @param scan
         * @return
         * @throws IOException
         */
        String convertScanToString(Scan scan) throws IOException {
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
            return Base64.encodeBytes(proto.toByteArray());
        }
    
        public void start() {
            SparkConf sparkConf = new SparkConf();
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
    
    
            Configuration conf = HBaseConfiguration.create();
    
            Scan scan = new Scan();
            //scan.setStartRow(Bytes.toBytes("195861-1035177490"));
            //scan.setStopRow(Bytes.toBytes("195861-1072173147"));
            scan.addFamily(Bytes.toBytes("cf"));
            scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));
    
            try {
    
                String tableName = "wyp";
                conf.set(TableInputFormat.INPUT_TABLE, tableName);
                conf.set(TableInputFormat.SCAN, convertScanToString(scan));
    
    
                JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
                        TableInputFormat.class, ImmutableBytesWritable.class,
                        Result.class);
    
                JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair(
                        new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() {
                            @Override
                            public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
                                byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));
                                if (o != null) {
                                    return new Tuple2<String, Integer>(new String(o), 1);
                                }
                                return null;
                            }
                        });
    
                JavaPairRDD<String, Integer> counts = levels.reduceByKey(
                        new Function2<Integer, Integer, Integer>() {
                            @Override
                            public Integer call(Integer i1, Integer i2) {
                                return i1 + i2;
                            }
                        });
    
                List<Tuple2<String, Integer>> output = counts.collect();
                for (Tuple2 tuple : output) {
                    System.out.println(tuple._1() + ": " + tuple._2());
                }
    
                sc.stop();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new SparkFromHbase().start();
            System.exit(0);
        }
    }

    这样本段代码段是从Hbase表名为flight_wap_order_log的数据库中读取cf列簇上的airName一列的数据,这样我们就可以对myRDD进行相应的操作:

    System.out.println(myRDD.count());

    本段代码需要在pom.xml文件加入以下依赖:

    <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>0.9.1</version>
    </dependency>
     
    <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>0.98.2-hadoop2</version>
    </dependency>
     
    <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>0.98.2-hadoop2</version>
    </dependency>
     
    <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>0.98.2-hadoop2</version>
    </dependency>
     
    <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>0.98.2-hadoop2</version>
    </dependency>

    Scala版如下:

    import org.apache.spark._
    import org.apache.spark.rdd.NewHadoopRDD
    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
    import org.apache.hadoop.hbase.client.HBaseAdmin
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
     
    /////////////////////////////////////////////////////////////////////
     User: 过往记忆
     Date: 14-6-29
     Time: 23:59
     bolg:
     本文地址:/archives/1051
     过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
     过往记忆博客微信公共帐号:iteblog_hadoop
    /////////////////////////////////////////////////////////////////////
     
    object HBaseTest {
      def main(args: Array[String]) {
        val sc = new SparkContext(args(0), "HBaseTest",
          System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
     
        val conf = HBaseConfiguration.create()
        conf.set(TableInputFormat.INPUT_TABLE, args(1))
     
        val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
     
        hBaseRDD.count()
     
        System.exit(0)
      }
    }

    我们需要在加入如下依赖:

    libraryDependencies ++= Seq(
            "org.apache.spark" % "spark-core_2.10" % "0.9.1",
            "org.apache.hbase" % "hbase" % "0.98.2-hadoop2",
            "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2",
            "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2",
            "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"
    )

      在测试的时候,需要配置好Hbase、Hadoop环境,否则程序会出现问题,特别是让程序找到Hbase-site.xml配置文件。

    package com.iteblog.spark;
       
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.hbase.HBaseConfiguration;
      import org.apache.hadoop.hbase.client.Result;
      import org.apache.hadoop.hbase.client.Scan;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
      import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
      import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
      import org.apache.hadoop.hbase.util.Base64;
      import org.apache.hadoop.hbase.util.Bytes;
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.Function2;
      import org.apache.spark.api.java.function.PairFunction;
      import scala.Serializable;
      import scala.Tuple2;
       
      import java.io.IOException;
      import java.util.List;
       
      /**
      * User: iteblog
      * Date: 14-6-27
      * Time: 下午5:18
      *blog: http://www.iteblog.com
      *
      * Usage: bin/spark-submit --master yarn-cluster --class com.iteblog.spark.SparkFromHbase
      * --jars /home/q/hbase/hbase-0.96.0-hadoop2/lib/htrace-core-2.01.jar,
      * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-common-0.96.0-hadoop2.jar,
      * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-client-0.96.0-hadoop2.jar,
      * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-protocol-0.96.0-hadoop2.jar,
      * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-server-0.96.0-hadoop2.jar
      * ./spark_2.10-1.0.jar
      */
      public class SparkFromHbase implements Serializable {
       
      /**
      * copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
      *
      * @param scan
      * @return
      * @throws IOException
      */
      String convertScanToString(Scan scan) throws IOException {
      ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
      return Base64.encodeBytes(proto.toByteArray());
      }
       
      public void start() {
      SparkConf sparkConf = new SparkConf();
      JavaSparkContext sc = new JavaSparkContext(sparkConf);
       
       
      Configuration conf = HBaseConfiguration.create();
       
      Scan scan = new Scan();
      //scan.setStartRow(Bytes.toBytes("195861-1035177490"));
      //scan.setStopRow(Bytes.toBytes("195861-1072173147"));
      scan.addFamily(Bytes.toBytes("cf"));
      scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));
       
      try {
       
      String tableName = "wyp";
      conf.set(TableInputFormat.INPUT_TABLE, tableName);
      conf.set(TableInputFormat.SCAN, convertScanToString(scan));
       
       
      JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
      TableInputFormat.class, ImmutableBytesWritable.class,
      Result.class);
       
      JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair(
      new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() {
      @Override
      public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
      byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));
      if (o != null) {
      return new Tuple2<String, Integer>(new String(o), 1);
      }
      return null;
      }
      });
       
      JavaPairRDD<String, Integer> counts = levels.reduceByKey(
      new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
      return i1 + i2;
      }
      });
       
      List<Tuple2<String, Integer>> output = counts.collect();
      for (Tuple2 tuple : output) {
      System.out.println(tuple._1() + ": " + tuple._2());
      }
       
      sc.stop();
       
      } catch (Exception e) {
      e.printStackTrace();
      }
      }
       
      public static void main(String[] args) throws InterruptedException {
      new SparkFromHbase().start();
      System.exit(0);
      }
      }
  • 相关阅读:
    error MSB6006: ”cmd.exe” exited with code 1
    OpenGL简介
    OSG例程(1) 交互(Pick)
    $err,hr
    [转载]操作数的寻址方式
    严重推荐的图形学讲义
    编译通过,运行时osgDB::ReadImageFile()出错 d和非d的lib
    空间变换的顺序SRT
    OSG例程(3) 利用更新回调制作路径动画
    Visitor模式的C++实现
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13041865.html
Copyright © 2011-2022 走看看