zoukankan      html  css  js  c++  java
  • [Spark Core] Spark 使用第三方 Jar 包的方式


    0. 说明

      Spark 下运行job,使用第三方 Jar 包的 3 种方式。


    1. 方式一

      将第三方 Jar 包分发到所有的 spark/jars 目录下

      


    2. 方式二

      将第三方 Jar 打散,和我们自己的 Jar 包打到一起

      类似的例子可以参考  在 Spark 集群上运行程序  中的打包部分


     

    3. 方式三

      在 spark-submit 命令中,通过 --jars 指定使用的第三方 Jar 包

      

      【案例:使用 spark-shell 执行 taggen】

      1. 启动 spark-shell,指定 fastjson 类库。
      定位到 fastjson jar 包

    D:maven_repositorycomalibabafastjson1.2.47fastjson-1.2.47.jar

      2. 启动spark-shell

    spark-shell --master spark://s101:7077 --jars /home/centos/fastjson-1.2.47.jar

      3. 定义函数 extractTag

    // 定义函数,抽取标签列表
    def extractTag(json: String) = {
    import com.alibaba.fastjson.JSON
    var list: scala.List[String] = Nil
    // 将字符串解析成 json 对象
    val obj = JSON.parseObject(json)
    val arr = obj.getJSONArray("extInfoList")
    if (arr != null && arr.size > 0) { // 得到数组的第一个 json 对象
    val firstObj = arr.getJSONObject(0)
    val values = firstObj.getJSONArray("values")
    if (values != null && values.size > 0) {
    var i = 0
    while (i < values.size) {
    val tag = values.getString(i)
    list = tag :: list
    i += 1;
    }
    }
    }
    list
    }

      4. 加载文件

    // 1. 加载文件
    val rdd1 = sc.textFile("/user/centos/temptags.txt")

      5. 解析每行的 json 数据成为集合

    // 2. 解析每行的json数据成为集合
    val rdd2 = rdd1.map(line => {
    val arr: Array[String] = line.split("	")
    // 商家id
    val busid: String = arr(0)
    // json
    val json: String = arr(1)
    val list: scala.List[String] = extractTag(json)
    (busid, list)
    })

      6. 过滤空集合

    // 3. 过滤空集合 (85766086,[干净卫生, 服务热情, 价格实惠, 味道赞])
    val rdd3 = rdd2.filter(t => {
    !t._2.isEmpty
    })

      7. 将值压扁

    //4. 将值压扁
    val rdd4 = rdd3.flatMapValues(list=>{
    list
    })

      8. 滤除数字的tag

    //5. 滤除数字的tag
    val rdd5 = rdd4.filter(t=>{
    try{
    //
    Integer.parseInt(t._2)
    false
    }
    catch {
    case _ => true
    }
    })

      9. 标1成对

    //6. 标1成对
    val rdd6 = rdd5.map(t=>{
    (t,1)
    })

      10. 聚合

    //7. 聚合
    val rdd7 = rdd6.reduceByKey(_+_)

      11. 重组

    //8. 重组
    val rdd8 = rdd7.map(t=>{
    (t._1._1,(t._1._2 , t._2)::Nil)
    })

      12. reduceByKey

    //9. reduceByKey
    val rdd9 =rdd8.reduceByKey(_ ::: _)

      13. 分组内排序

    //10. 分组内排序
    val rdd10=rdd9.mapValues(list=>{
    list.sortBy(t=>{
    -t._2
    }).take(5)
    })

      14. 商家间排序

    //11. 商家间排序
    val rdd11= rdd10.sortBy(t=>{
    t._2(0)._2
    } ,false)

      15. collect

    rdd11.collect()

      16. 查看 Web UI
      http://s101:8080/

      17. DAG 视图

      


    且将新火试新茶,诗酒趁年华。
  • 相关阅读:
    hdu 5534(dp)
    hdu 5533(几何水)
    hdu 5532(最长上升子序列)
    *hdu 5536(字典树的运用)
    hdu 5538(水)
    假如数组接收到一个null,那么应该怎么循环输出。百度结果,都需要提前判断。否则出现空指针异常。。我还是想在数组中实现保存和输出null。
    第一个登录页面 碰到的疑问
    浅谈堆和栈的区别
    Java中的基础----堆与栈的介绍、区别
    JS的Document属性和方法
  • 原文地址:https://www.cnblogs.com/share23/p/9768308.html
Copyright © 2011-2022 走看看