zoukankan      html  css  js  c++  java
  • spark广播变量

    Spark-广播变量

    • 当我们产生了几百个或是几千个task这些task后期都需要使用到一份共同的数据,假如这个数据量有1G,这些task后期运行完成需要内存开销 几百或几千乘以1g,内存开销还是特别大的,特别浪费资源。而spark提供一个叫数据共享机制广播变量。可以把共同数据从Driver段下发到每一个参与计算的worker节点上,每个worker节点保留该数据一个副本(该副本是只读的,不可改变),后面在每一个worker上运行大量task都共享该副本数据。这样,假如我们有2个worker参与计算,该数据会下发2份,这里就大大减少内存开销。

    1.通过spark实现IP地址查询

    package cn.wc
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object ip_ocation {
      // ip转换
      def ip2Long(ip:String):Long = {
        val ips:Array[String] = ip.split("\.")
        var ipNum:Long = 0L
        for (i <- ips) {
          ipNum = i.toLong | ipNum << 8L
        }
        ipNum
      }
      // 二分查
      def binarySearch(ipNum:Long, city_ip_Array:Array[(String,String,String,String)]):Int = {
        var start = 0
        var end = city_ip_Array.length - 1
        while (start <= end) {
          val middle = (start + end) / 2
          if (ipNum >= city_ip_Array(middle)._1.toLong && ipNum <= city_ip_Array(middle)._2.toLong) {
            return middle
          }
          if (ipNum < city_ip_Array(middle)._1.toLong) {
            end = middle - 1
          }
          if (ipNum > city_ip_Array(middle)._2.toLong) {
            start = middle + 1
          }
        }
        -1
      }
      def main(args: Array[String]): Unit = {
        val sparkConf:SparkConf = new SparkConf().setAppName("IpOcation").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        sc.setLogLevel("warn")
        // 读取城市IP信息文件
        val city_id_rdd:RDD[(String,String,String,String)] = sc.textFile("J:\ips.txt").map(x => x.split("\|")).map(x => (x(2), x(3), x(x.length - 2), x(x.length - 1)))
        // 广播变量使用:把城市ip信息数据,下发到每个worker节点
        // 广播无法广播RDD,需要通过collect转换
        val cityTpBroadcase: Broadcast[Array[(String,String,String,String)]] = sc.broadcast(city_id_rdd.collect())
        // 读取运营商日志数据
        val ipsRDD:RDD[String] = sc.textFile("J:\flow.format").map(x => x.split("\|")(1))
        // 遍历ipsDD获取每个IP地址,然后去city_ip_rdd去匹配,获取该ip对应经纬度
        val result:RDD[((String,String), Int)] = ipsRDD.mapPartitions(iter => {
          // 获取广播变量的值
          val city_ip_Array:Array[(String,String,String,String)] = cityTpBroadcase.value
          iter.map(ip => {
            // 将ip地址转换成Long类型数值
            val ipNum:Long = ip2Long(ip)
            // 通过ipNum去广播变量去匹配,获取ipNum,在广播变量数组中下标
            val index:Int = binarySearch(ipNum, city_ip_Array)
            // 获取该数据
            val value: (String,String,String,String) = city_ip_Array(index)
            // 获取经纬度,封装返回数据
            ((value._3,value._4), 1)
          })
        })
        val finalResult: RDD[((String,String), Int)] = result.reduceByKey(_+_)
    
        finalResult.foreach(println)
        // 保存数据到数据库
        finalResult.foreachPartition(iter => {
          val connection: Connection  = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/spark", "root", "123")
          val sql = "insert into flow(longitude, latitude, total) values (?,?,?)"
    
          try {
            val ps: PreparedStatement = connection.prepareStatement(sql)
            iter.foreach(line => {
              ps.setString(1, line._1._1)
              ps.setString(2, line._1._2)
              ps.setInt(3, line._2)
              ps.execute()
            })
          } catch {
            case e: Exception => e.printStackTrace()
          } finally {
            if (connection!= null) {
              connection.close()
            }
          }
        })
        sc.stop()
      }
    }
    
    

    2.spark读取文件数据保存到hbase中

    • pom.xml添加hbase依赖
    <dependency>
    	<groupId>org.apache.hbase</groupId>
    	<artifactId>hbase-client</artifactId>
    	<version>1.2.1</version>
    </dependency>
    
  • 相关阅读:
    spring cloud 和 阿里微服务spring cloud Alibaba
    为WPF中的ContentControl设置背景色
    java RSA 解密
    java OA系统 自定义表单 流程审批 电子印章 手写文字识别 电子签名 即时通讯
    Hystrix 配置参数全解析
    spring cloud 2020 gateway 报错503
    Spring Boot 配置 Quartz 定时任务
    Mybatis 整合 ehcache缓存
    Springboot 整合阿里数据库连接池 druid
    java OA系统 自定义表单 流程审批 电子印章 手写文字识别 电子签名 即时通讯
  • 原文地址:https://www.cnblogs.com/xujunkai/p/14916344.html
Copyright © 2011-2022 走看看