zoukankan      html  css  js  c++  java
  • Spark&Hive:如何使用scala开发spark访问hive作业,如何使用yarn resourcemanager。

    • 背景:

    接到任务,需要在一个一天数据量在460亿条记录的hive表中,筛选出某些host为特定的值时才解析该条记录的http_content中的经纬度:

    解析规则譬如:

    需要解析host: api.map.baidu.com
    需要解析的规则:"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},
    "confidence":25
    需要解析http_conent:renderReverse&&renderReverse({"status":0,"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},"formatted_address":"???????????????????????????????????????","business":"","addressComponent":{"country":"??????","country_code":0,"province":"?????????","city":"?????????","district":"?????????","adcode":"330104","street":"????????????","street_number":"","direction":"","distance":""},"pois":[{"addr":"????????????5277???","cp":" ","direction":"???","distance":"68","name":"????????????????????????????????????","poiType":"????????????","point":{"x":120.25084961536486,"y":30.3112150
    
    • Scala代码实现“访问hive,并保存结果到hive表”的spark任务:

    开发工具为IDEA16,开发语言为scala,开发包有了spark对应集群版本下的很多个jar包,和对应集群版本下的很多个jar包,引入jar包:

    scala代码:

    import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.hive.HiveContext
    import java.util
    import java.util.{UUID, Calendar, Properties}
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.sql.{Row, SaveMode, SQLContext}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.{sql, SparkContext, SparkConf}
    import org.apache.spark.sql.DataFrameHolder
    
    /**
      * temp http_content
      **/
    case class Temp_Http_Content_ParserResult(success: String, lnglatType: String, longitude: String, Latitude: String, radius: String)
    
    /**
      * Created by Administrator on 2016/11/15.
      */
    object ParserMain {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf() 
        //.setAppName("XXX_ParserHttp").setMaster("local[1]").setMaster("spark://172.21.7.10:7077").setJars(List("xxx.jar"))
            //.set("spark.executor.memory", "10g")
        val sc = new SparkContext(conf)
        val hiveContext = new HiveContext(sc)
    
        // use abc_hive_db;
        hiveContext.sql("use abc_hive_db")
        // error date format:2016-11-15,date format must be 20161115
        val rdd = hiveContext.sql("select host,http_content from default.http where hour>='20161115' and hour<'20161116'")
    
        // toDF() method need this line...
        import hiveContext.implicits._
    
        // (success, lnglatType, longitude, latitude, radius)
        val rdd2 = rdd.map(s => parse_http_context(s.getAs[String]("host"), s.getAs[String]("http_content"))).filter(s => s._1).map(s => Temp_Http_Content_ParserResult(s._1.toString(), s._2, s._3, s._4, s._5)).toDF()
        rdd2.registerTempTable("Temp_Http_Content_ParserResult_20161115")
        hiveContext.sql("create table Temp_Http_Content_ParserResult20161115 as select * from Temp_Http_Content_ParserResult_20161115")
    
        sc.stop()
      }
    
      /**
        * @ summary: 解析http_context字段信息
        * @ param http_context 参数信息
        * @ result 1:是否匹配成功;
        * @ result 2:匹配出的是什么经纬度的格式:
        * @ result 3:经度;
        * @ result 4:纬度,
        * @ result 5:radius
        **/
      def parse_http_context(host: String, http_context: String): (Boolean, String, String, String, String) = {
        if (host == null || http_context == null) {
          return (false, "", "", "", "")
        }
    
        //    val result2 = parse_http_context(“api.map.baidu.com”,"renderReverse&&renderReverse({"status":0,"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},"formatted_address":"???????????????????????????????????????","business":"","addressComponent":{"country":"??????","country_code":0,"province":"?????????","city":"?????????","district":"?????????","adcode":"330104","street":"????????????","street_number":"","direction":"","distance":""},"pois":[{"addr":"????????????5277???","cp":" ","direction":"???","distance":"68","name":"????????????????????????????????????","poiType":"????????????","point":{"x":120.25084961536486,"y":30.3112150")
        //    println(result2._1 + ":" + result2._2 + ":" + result2._3 + ":" + result2._4 + ":" + result2._5)
       
        var success = false
        var lnglatType = ""
        var longitude = ""
        var latitude = ""
        var radius = ""
        var lowerCaseHost = host.toLowerCase().trim();
        val lowerCaseHttp_Content = http_context.toLowerCase()
        //    api.map.baidu.com
        //    "result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},
        //    "confidence":25
        //     --renderReverse&&renderReverse({"status":0,"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},"formatted_address":"???????????????????????????????????????","business":"","addressComponent":{"country":"??????","country_code":0,"province":"?????????","city":"?????????","district":"?????????","adcode":"330104","street":"????????????","street_number":"","direction":"","distance":""},"pois":[{"addr":"????????????5277???","cp":" ","direction":"???","distance":"68","name":"????????????????????????????????????","poiType":"????????????","point":{"x":120.25084961536486,"y":30.3112150
        if (lowerCaseHost.equals("api.map.baidu.com")) {
          val indexLng = lowerCaseHttp_Content.indexOf(""lng"")
          val indexLat = lowerCaseHttp_Content.indexOf(""lat"")
          if (lowerCaseHttp_Content.indexOf(""location"") != -1 && indexLng != -1 && indexLat != -1) {
            var splitstr: String = "\,|\{|\}"
            var uriItems: Array[String] = lowerCaseHttp_Content.split(splitstr)
            var tempItem: String = ""
            lnglatType = "BD"
            success = true
            for (uriItem <- uriItems) {
              tempItem = uriItem.trim()
              if (tempItem.startsWith(""lng":")) {
                longitude = tempItem.replace(""lng":", "").trim()
              } else if (tempItem.startsWith(""lat":")) {
                latitude = tempItem.replace(""lat":", "").trim()
              } else if (tempItem.startsWith(""confidence":")) {
                radius = tempItem.replace(""confidence":", "").trim()
              }
            }
          }
        }  
        else if (lowerCaseHost.equals("loc.map.baidu.com")) {
          。。。
        }
    
        longitude = longitude.replace(""", "")
        latitude = latitude.replace(""", "")
        radius = radius.replace(""", "")
    
        (success, lnglatType, longitude, latitude, radius)
      }
    }

    打包,注意应为我们使用的hadoop&hive&spark on yarn的集群,我们这里并不需要想spark&hadoop一样还需要在执行spark-submit时将spark-hadoop-xx.jar打包进来,也不需要在submit-spark脚本.sh中制定jars参数,yarn会自动诊断我们需要哪些集群系统包;但是,如果你应用的是第三方的包,比如ab.jar,那打包时可以打包进来,也可以在spark-submit 参数jars后边指定特定的包。

    • 写spark-submit提交脚本.sh:

    • 当执行spark-submit脚本出现错误时,怎么应对呢?

    注意,我们这里不是spark而是spark on yarn,当我们使用yarn-cluster方式提交时,界面是看不到任何日志新的。我们需要借助yarn管理系统来查看日志:

    1、根据返回的任务id查看历史日志:
    yarn logs -applicationId  application_1475071482566_3329402

    2、yarn页面查看日志

    https://xx.xx.xx.xx:xxxxx/Yarn/ResourceManager/xxxx/cluster
    用户名/密码:user/password
     
     
    3、yarn关闭application:
    从yarn resourcemanger界面中,可以查看到具体的applicationId,使用命令来杀掉该任务:
    更多命令可以参考:http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YarnCommands.html
    yarn application -kill application_1475071482566_3807023

    或者从界面进入spark作业进度管理界面,进行查看作业具体执行进度,也可以kill application

    参考资料:
    http://blog.csdn.net/sparkexpert/article/details/50964732

    Spark On YARN内存分配:http://blog.javachen.com/2015/06/09/memory-in-spark-on-yarn.html?utm_source=tuicool

  • 相关阅读:
    报表设计--主格设置-手工设置-示例
    报表设计--分组报表-分组小计
    报表设计--单元格公式-设置实例
    报表设计--自由报表-自由扩展
    报表设计--交叉报表-多层交叉-实例
    报表设计--交叉报表-多层交叉
    报表设计--分组报表-多层分组-数据汇总
    报表设计--分组报表-多层分组-横向扩展
    报表设计--分组报表-多层分组-纵向扩展
    报表设计--分组报表-普通分组-横向扩展
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/6078908.html
Copyright © 2011-2022 走看看