zoukankan      html  css  js  c++  java
  • spark从hbase读取数据以及zookeeper坑

    1、遇到错误,认真查看日志,这才是解决问题的王道啊!

    不过很多时候,尤其是开始摸索的时候,一下要接触的东西太多了,学东西也太赶了,加上boss不停的催进度,结果欲速则不达,最近接触大数据,由于平台是别人搭建的,搭没搭好不知道,也不清楚细节,出了问题也不知道是自己这边的还是平台的问题。有的时候就是小问题忽略了,结果花了好多时间又才重新发现。

    提交job:

    ./spark-submit --class myapp.KMeansWeather --master yarn --deploy-mode cluster ./hbase_handles_cc.jar  (集群)

    or

    ./spark-submit --class myapp.KMeansWeather --master local[1] ./hbase_handles_cc.jar (本地)

    提交本地过后老是出现问题,就是程序不断的提示错误,又不会停止,zookeeper客户端连接一直报无法定位登录配置信息。

    INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)

    还说是未知错误,我就更不知道错误在哪了,后来折腾了半天,第二天才看了下zookeeper的日志,zookeeper.out,如下:

    2016-08-25 10:13:00,150 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /10.3.9.231:33070
    2016-08-25 10:13:00,150 [myid:1] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@362] - Exception causing close of session 0x0 due to java.io.IOException: ZooKeeperServer not running

    然后我用./bin/zkServer.sh stop 与./bin/zkServer.sh start重新启动,然后查看状态,./bin/zkServer.sh status,发现

    [root@hadoop bin]# ./zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /root/traffic-platform/zookeeper-3.4.8/bin/../conf/zoo.cfg
    Error contacting service. It is probably not running.

    于是向安装环境那个人问了下,然后他最后说是集群里node3网线掉了…………

    目前我们测试集群用了4台机子,有一台后来别人在用,所以配置的4台结果只有3台,分别是master,node2,node3,结果这次node3网线掉了,所以只有两个节点了,因为zookeeper在机子少于集群的一半就无法选举出leader来了吧。

    把网线接上后,就可以了,再次查看每台机子的状态。如下:一个是leader,其它的为follower

    [root@hadoop bin]# ./zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /root/traffic-platform/zookeeper-3.4.8/bin/../conf/zoo.cfg
    Mode: leader

    其它:

    查网上信息:说客户端程序的zookeeper版本不同也会导致连接不上,一直报错。

    另外:看自己的hbase-site.xml的配置文件路径是否加入到了编译的path里面,当对Hbase进行操作时,它会指定相应的属性包括集群ip,以及端口等

     

    注意看红色部分有那个黄色块的表示加入到了path中了。

    2、spark读取Hbase

    初学这个,在网上查了下资料,官网的资料也是难得翻所认没翻到啊。

    于是查看网上的信息来写,与结合mllib中的JavaSparkPI算法

        public static void getValueFromHB() throws IOException {
            final Logger logger = LoggerFactory.getLogger(KMeansWeather.class);
            String f="BaseInfo";
            String table="NewCityWeather";
            Scan scan=new Scan();
            scan.addFamily(Bytes.toBytes(f));
            //scan.addColumn(f,)
    
            Configuration conf= HBaseConfiguration.create();//读取hbase-site.xml等配置
            //conf.set("hbase.zookeeper.quorum","10.3.9.135,10.3.9.231,10.3.9.232");//这些在hbase-site.xml中是有的
            //conf.set("hbase.zookeeper.property.clientPort","2222");
            conf.set(TableInputFormat.INPUT_TABLE,table);//设置查询的表
            conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()));//设置扫描的列
            //SparkConf confsp=new SparkConf().setAppName("SparkHBaseTest").setMaster("yarn-client");
            //SparkConf confsp=new SparkConf().setAppName("SparkHBaseTest").setMaster("spark://10.3.9.135:7077");
         //设置应用名称,就是在spark web端显示的应用名称,当然还可以设置其它的,在提交的时候可以指定,所以不用set上面两行吧
         SparkConf confsp=new SparkConf().setAppName("SparkHBaseTest");
         //创建spark操作环境对象 JavaSparkContext sc
    = new JavaSparkContext(confsp); // JavaSparkContext sc = new JavaSparkContext("yarn-client", "hbaseTest", // System.getenv("SPARK_HOME"), System.getenv("JARS")); //sc.addJar("D:\jiuzhouwork\other\sparklibex\spark-examples-1.6.1-hadoop2.7.1.jar");
          
         //从数据库中获取查询内容生成RDD JavaPairRDD<ImmutableBytesWritable,Result> myRDD=sc.newAPIHadoopRDD(conf,TableInputFormat.class,ImmutableBytesWritable.class,Result.class);
         //遍历数据 collect foreach
         List<Tuple2<ImmutableBytesWritable, Result>> output=myRDD.collect();
         for (Tuple2 tuple: output ) {
           System.out.println(tuple._1+":"+tuple._2);
         }

         System.out.println(
    "sss:"+myRDD.count()); logger.info("lwwwww:"+myRDD.count());//输出RDD数据条数 //System.out.println("sss:"); //logger.info("lwwwww:"); JavaRDD rdd=JavaRDD.fromRDD(JavaPairRDD.toRDD(myRDD),myRDD.classTag()); // JavaRDD<Vector> points = myRDD.map(new ParsePoint()); // // KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL()); // // System.out.println("Cluster centers:"); // for (Vector center : model.clusterCenters()) { // System.out.println(" " + center); // } // double cost = model.computeCost(points.rdd()); // System.out.println("Cost: " + cost); // // sc.stop(); }

     注意

    SparkConf confsp=new SparkConf().setAppName("SparkHBaseTest")
    .setMaster("local")//以本地的形式运行
         .setMaster("spark://10.3.9.135:7077");//以standalone的方式运行,就是spark的集群调度方式
         .setMaster("yarn-client");//以yarn的集群方式运行,yarn
            .setJars(new String[]{"D:\jiuzhouwork\workspace\hbase_handles\out\artifacts\hbase_handles_jar\hbase_handles.jar"});//设置提交的jar


    遍历数据:collect foreach 结果

    问题: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytes

    由于没有设置序列化的类。
    在conf/spark-defaults.conf文件中设置

    如果没有spark-defaults.conf,则复制一份spark-defaults.conf.template更名为spark-defaults.conf

    问题: 从HBASE中读出数据信息但没有值?

    解决:通过Map,MapToPair,FlatMapToPair然后进行键值处理与映射,形成新的RDD,通过处理ImmutableBytesWritable.getValue(family,qualifier)来获得信息。以前在Map中的操作对JavaPairRDD现在只能放在MapToPair里面了,所以如果用Map有错,试着用一下MapToPair

  • 相关阅读:
    eclipse A Java Runtime Environment(JRE)
    【Android】自己定义圆形ImageView(圆形头像 可指定大小)
    addEventListener()、attachEvent()和removeEventListener()、detachEvent()的差别?
    Android 自己定义控件实现刮刮卡效果 真的就仅仅是刮刮卡么
    qt自己定义搜索框(超简单,带效果图)
    OpenCV基础篇之像素操作对照度调节
    NYOJ 16 矩形嵌套 (DAG上的DP)
    hdu 1247 Hat’s Words(从给的单词中找hat&#39;s word 并按字典序输出)
    Android学习路线(十八)支持不同设备——支持不同的屏幕
    移动智能设备功耗优化系列--前言(NVIDIA资深project师分享)
  • 原文地址:https://www.cnblogs.com/lwhp/p/5805941.html
Copyright © 2011-2022 走看看