zoukankan      html  css  js  c++  java
  • SparkStreaming(一)--核心概念及算子

    1.环境

    CDH 5.16.1
    Spark 2.3.0.cloudera4

    2.核心概念

    官网: https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html

    GitHub: https://github.com/apache/spark

    2.1 StreamingContext

    第一点
    class StreamingContext private[streaming] (
        _sc: SparkContext,
        _cp: Checkpoint,
        _batchDur: Duration
      ) extends Logging {
    
      def this(sparkContext: SparkContext, batchDuration: Duration) = {
        this(sparkContext, null, batchDuration)
      }
    
      def this(conf: SparkConf, batchDuration: Duration) = {
        this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
      }
    
    第二点
    def stop(
          stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
         ): Unit = synchronized {
        stop(stopSparkContext, false)
      }
    
    • streamingContext.stop 默认会停止 SparkContex 和 StreamingContext,可通过设置 spark.streaming.stopSparkContextByDefault 为false 让其只停止 StreamingContext
    • SparkContext 可以创建多个 StreamingContext

    2.2 Input Dstreams 和 Recivers

    第一点
    Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.
    

    每一个 input DStream 都依赖于 Reciver (除文件系统外),接收数据,存放在Spark的内存中,以供处理。


    streamingContext.socketTextStream 的返回值是 ReceiverInputDStream[String]
    streamingContext.textFileStream 的返回值是 DStream[String]
    说明:从文件系统中获取数据流是不需要启动 receiver线程 接收数据存放到 Spark中

    第二点
    When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).
    

    当从用本地模式时,不能使用 local 和 local[1],因为 Input Dstream 基于 Receiver 接收数据(除文件系统外),是需要启动 receiver 线程的;如果设置 local[1],唯一的一个线程会被用来运行 receiver,主程序将没有线程去运行

    3. 案例

    3.1 updateStateByKey

    有状态转化

    package com.monk.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * @className: StatefulWordCount
      * @description: TODO
      * @author wu ning
      * @date 2019/12/17 0:11
      */
    object StatefulWordCount {
    
      def main(args: Array[String]): Unit = {
    
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)
        
        val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(10))
    
        //生产环境中 checkpoint 应当设置在HDFS上
        ssc.checkpoint(".")
    
        val lineStream: ReceiverInputDStream[String] = ssc.socketTextStream("cdh02", 6789, StorageLevel.MEMORY_AND_DISK)
    
        //(key,1)
        val wordToOne: DStream[(String, Int)] = lineStream.flatMap(_.split(" "))
          .map((_, 1))
    
        val wordCount: DStream[(String, Int)] = wordToOne.updateStateByKey(updateFunction)
    
        wordCount.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
      /**
        * 创建updateStateByKey的函数
        * @param current 在当前阶段 一个新的key对应的value组成的序列
        * @param pre 上一个阶段 这个key对应的value
        * @return
        */
    
      def updateFunction(current: Seq[Int], pre: Option[Int]): Option[Int] = {
        val currentValue:Int = current.sum
    
        //如果pre中没有值,那么使用默认值 0
        val preValue: Int = pre.getOrElse(0)
    
        Some(currentValue + preValue)
      }
    }
    

    本地调试报错:org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/F
    原因:在 C:WindowsSystem32目录下有 hadoop.dll 文件或环境变量Path中配置了 %Hadoop_Home%/bin 目录而导致的;简而言之,是因为配置的系统环境变量Path的任意目录下存在 hadoop.dll 文件,从而被认为这是一个hadoop集群环境,但是hadoop集群又不支持 window 环境而产生的异常。
    解决方法:删除环境变量中的 hadoop.dll,确保环境变量中没有 hadoop.dll 文件即可。

    3.2 foreachRDD

    wordcount写入到Mysql

    package com.monk.spark
    
    import java.sql.Connection
    
    import com.monk.utils.{DataSourceUtil, SqlProxy}
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * @className: ForeachRddWordCount
      * @description: TODO
      * @author wu ning
      * @date 2019/12/17 21:06
      */
    object ForeachRddWordCount {
    
      def main(args: Array[String]): Unit = {
    
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)
    
        val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(10))
    
        ssc.checkpoint(".")
    
        val lineStream: ReceiverInputDStream[String] = ssc.socketTextStream("cdh02", 9876, StorageLevel.MEMORY_AND_DISK)
    
    
        val wordToOne: DStream[(String, Int)] = lineStream.flatMap(_.split(" "))
          .map((_, 1))
    
        val wordCount: DStream[(String, Int)] = wordToOne.updateStateByKey(updateFunction)
    
        wordCount.print()
    
        wordCount.foreachRDD {
          rdd => {
            rdd.foreachPartition {
              partition => {
                //获取 Connection
                val connection: Connection = DataSourceUtil.getConnection()
    
                val sqlProxy = new SqlProxy
                
                //使用 replace into 语法,表中必须有 primary key
                val sql = "replace into wordcount(word,counts) values(?,?)"
                partition.foreach {
                  record => {
                    sqlProxy.excuteUpdate(connection, sql, Array(record._1, record._2))
                  }
                }
                sqlProxy.shutdown(connection)
              }
            }
          }
        }
        ssc.start()
        ssc.awaitTermination()
      }
    
      /**
        * 创建updateStateByKey函数
        *
        * @param current 当前 key 对应的 value 的 Seq 序列
        * @param pre     之前 key 对应的 value 的 值
        * @return
        */
      def updateFunction(current: Seq[Int], pre: Option[Int]): Option[Int] = {
        val currentValue: Int = current.sum
    
        //如果 pre 存在值就取出来,不存在就设置默认值为 0
        val preValue: Int = pre.getOrElse(0)
    
        Some(preValue + currentValue)
      }
    }
    

    Druid 连接池

    package com.monk.utils
    
    import java.sql.{Connection, PreparedStatement, ResultSet}
    import java.util.Properties
    
    import com.alibaba.druid.pool.DruidDataSourceFactory
    import javax.sql.DataSource
    import org.apache.log4j.Logger
    
    /**
      * @className: DataSourceUtil
      * @description: TODO
      * @author wu ning
      * @date 2019/12/17 21:23
      */
    object DataSourceUtil {
    
      //@transient注解将字段标记为瞬态的,不会被序列化,只会作为临时的缓存数据
    
      @transient
      lazy val logger = Logger.getLogger(this.getClass)
    
      private var dataSource: DataSource = _
      private val url: String = PropertiesUtil.getProperties("jdbc.url")
      private val user_name = PropertiesUtil.getProperties("jdbc.user")
      private val pass_word = PropertiesUtil.getProperties("jdbc.passwd")
    
      try {
        val props = new Properties
        props.setProperty("url", url)
        props.setProperty("username", user_name)
        props.setProperty("password", pass_word)
        //初始化大小
        props.setProperty("initialSize", "5")
        //最大连接
        props.setProperty("maxActive", "10")
        //最小连接
        props.setProperty("minIdle", "5")
        //等待时长
        props.setProperty("maxWait", "60000")
        //配置多久进行一次检测,检测需要关闭的连接 单位毫秒
        props.setProperty("timeBetweenEvictionRunsMillis", "2000")
        //配置连接在连接池中最小生存时间 单位毫秒
        props.setProperty("minEvictableIdleTimeMillis", "600000")
        //配置连接在连接池中最大生存时间 单位毫秒
        props.setProperty("maxEvictableIdleTimeMillis", "900000")
        props.setProperty("validationQuery", "select 1")
        props.setProperty("testWhileIdle", "true")
        props.setProperty("testOnBorrow", "false")
        props.setProperty("testOnReturn", "false")
        props.setProperty("keepAlive", "true")
        props.setProperty("phyMaxUseCount", "100000")
        dataSource = DruidDataSourceFactory.createDataSource(props)
      } catch {
        case e: Exception =>
          logger.error(s"设置参数出现了问题{},$e")
      }
    
    
      //获取连接
      def getConnection(): Connection = {
        try {
          dataSource.getConnection()
        } catch {
          case e: Exception =>
            logger.error(s"获取 connection 出现了问题{},$e")
            null
        }
      }
    
      //关闭资源
      def closeResource(resultSet: ResultSet, preStatement: PreparedStatement, connection: Connection): Unit = {
        closeResultSet(resultSet)
        closePrePareStateMent(preStatement)
        closeConnection(connection)
      }
    
      //关闭 ResultSet
      def closeResultSet(resultSet: ResultSet): Unit = {
        if (resultSet != null) {
          try {
            resultSet.close()
          } catch {
            case e: Exception =>
              logger.error(s"关闭 ResultSet 出现了问题{},$e")
          }
        }
      }
    
      //关闭 PreparedStatement
      def closePrePareStateMent(preparedStatement: PreparedStatement): Unit = {
        if (preparedStatement != null) {
          try {
            preparedStatement.close()
          } catch {
            case e: Exception =>
              logger.error(s"关闭 PrepareStatement 出现了问题{},$e")
          }
        }
      }
    
      //关闭 Connection
      def closeConnection(connection:Connection): Unit ={
        if(connection != null){
          try {
            connection.close()
          } catch {
            case e:Exception =>
              logger.error(s"关闭 Connection 出现了问题{},$e")
          }
        }
      }
    }
    

    sql 代理类

    package com.monk.utils
    
    import java.sql.{Connection, PreparedStatement, ResultSet}
    
    import org.apache.log4j.Logger
    
    /**
      * @className: SqlProxy
      * @description: TODO
      * @author wu ning
      * @date 2019/12/17 22:06
      */
    class SqlProxy {
    
      private var rs: ResultSet = _
      private var psmt: PreparedStatement = _
    
      @transient lazy val logger = Logger.getLogger(this.getClass)
    
      /**
        * 执行更新语句
        *
        * @param connection
        * @param sql
        * @param params
        */
      def excuteUpdate(connection: Connection, sql: String, params: Array[Any]): Int = {
    
        var result: Int = 0
    
        try {
          psmt = connection.prepareStatement(sql)
    
          if (params != null && params.length > 0) {
            //左闭右开
            for (i <- 0 until params.length) {
              psmt.setObject(i + 1, params(i))
            }
          }
    
          result = psmt.executeUpdate()
        } catch {
          case e: Exception =>
            logger.error(s"执行更新语句报错:{},$e")
        }
    
        result
      }
    
      /**
        * 执行查询语句
        * @param connection
        * @param sql
        * @param params
        * @param queryCallback
        */
      def executeQuery(connection: Connection, sql: String, params: Array[Any], queryCallback: QueryCallback): Unit = {
    
        try {
          psmt = connection.prepareStatement(sql)
    
          if (params != null && params.length > 0) {
            for (i <- 0 until params.length) {
              psmt.setObject(i + 1, params(i))
            }
          }
          rs = psmt.executeQuery()
    
          queryCallback.process(rs)
        } catch {
          case e:Exception =>
            logger.error(s"执行查询语句报错:{},$e")
        }
      }
    
      def shutdown(connection: Connection): Unit = DataSourceUtil.closeResource(rs,psmt,connection)
    }
    
    package com.monk.utils
    
    import java.sql.ResultSet
    
    /**
      * @className: QueryCallback
      * @description: TODO
      * @author wu ning
      * @date 2019/12/17 22:04
      */
    trait QueryCallback {
      def process(rs:ResultSet)
    }
    

    3.3 SparkStreaming 整合 SparkSQL

    地址:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala

  • 相关阅读:
    第四周学习总结
    第三周学习总结
    第二周学习总结
    第一周学习总结
    解决pycharm中导入Wordcloud库的时候失败的问题
    开发报告5
    TypeError: _cache_value_encoder: not supported type: <class 'numpy.ndarray'>
    pandas中的rename_axis用法
    金叉死叉量化交易--matplotlib绘图案例
    一、EDA分析
  • 原文地址:https://www.cnblogs.com/wuning/p/12046829.html
Copyright © 2011-2022 走看看