zoukankan      html  css  js  c++  java
  • SparkStream:4)foreachRDD详解

    转载自:http://blog.csdn.net/jiangpeng59/article/details/53318761

     

    foreachRDD通常用来把SparkStream运行得到的结果保存到外部系统比如HDFS、Mysql、Redis等等。了解下面的知识可以帮助我们避免很多误区

     

    误区1:实例化外部连接对象的位置不正确,比如下面代码

    
    
    1. dstream.foreachRDD { rdd =>  
    2.   val connection = createNewConnection()  // executed at the driver  
    3.   rdd.foreach { record =>  
    4.     connection.send(record) // executed at the worker  
    5.   }  
    6. }  
    其实例化的连接对象在driver中,然后通过序列化的方式发送到各个Worker,但实际上Connection的序列化通常是无法正确序列化的

     

    误区2:为每条记录都创建一个连接对象

    
    
    1. dstream.foreachRDD { rdd =>  
    2.   rdd.foreach { record =>  
    3.     val connection = createNewConnection()  
    4.     connection.send(record)  
    5.     connection.close()  
    6.   }  
    7. }  
    虽然误区1的问题得到了解决,但通常情况下,外部系统如mysql,其连接对象是非常可贵的,如果一条记录就申请一个连接资源,系统性能会非常糟糕

     

    然后,给出了一个比较好的方法,为每一个分区创建一个连接对象,其具体代码如下

     
    
    
    1. dstream.foreachRDD { rdd =>  
    2.   rdd.foreachPartition { partitionOfRecords =>  
    3.     val connection = createNewConnection()  
    4.     partitionOfRecords.foreach(record => connection.send(record))  
    5.     connection.close()  
    6.   }  
    7. }  
    最后给出一个较优的方案,使用一个连接池来维护连接对象
     
    
    
    1. dstream.foreachRDD { rdd =>  
    2.   rdd.foreachPartition { partitionOfRecords =>  
    3.     // ConnectionPool is a static, lazily initialized pool of connections  
    4.     val connection = ConnectionPool.getConnection()  
    5.     partitionOfRecords.foreach(record => connection.send(record))  
    6.     ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  
    7.   }  
    8. }  
    正如上面代码阐述的,连接对象推荐是使用lazy关键字来修饰,用到的时候才去实例化

     

    下面给出网上一段把SparkStream的结果保存到Mysql中的代码示例

    
    
    1. package spark.examples.streaming  
    2.   
    3. import java.sql.{PreparedStatement, Connection, DriverManager}  
    4. import java.util.concurrent.atomic.AtomicInteger  
    5.   
    6. import org.apache.spark.SparkConf  
    7. import org.apache.spark.streaming.{Seconds, StreamingContext}  
    8. import org.apache.spark.streaming._  
    9. import org.apache.spark.streaming.StreamingContext._  
    10.   
    11. object SparkStreamingForPartition {  
    12.   def main(args: Array[String]) {  
    13.     val conf = new SparkConf().setAppName("NetCatWordCount")  
    14.     conf.setMaster("local[3]")  
    15.     val ssc = new StreamingContext(conf, Seconds(5))  
    16.     //The DStream is a collection of RDD, which makes the method foreachRDD reasonable  
    17.     val dstream = ssc.socketTextStream("192.168.26.140", 9999)  
    18.     dstream.foreachRDD(rdd => {  
    19.       //embedded function  
    20.       def func(records: Iterator[String]) {  
    21.         var conn: Connection = null  
    22.         var stmt: PreparedStatement = null  
    23.         try {  
    24.           val url = "jdbc:mysql://192.168.26.140:3306/person";  
    25.           val user = "root";  
    26.           val password = ""  
    27.           conn = DriverManager.getConnection(url, user, password)  
    28.           records.flatMap(_.split(" ")).foreach(word => {  
    29.             val sql = "insert into TBL_WORDS(word) values (?)";  
    30.             stmt = conn.prepareStatement(sql);  
    31.             stmt.setString(1, word)  
    32.             stmt.executeUpdate();  
    33.           })  
    34.         } catch {  
    35.           case e: Exception => e.printStackTrace()  
    36.         } finally {  
    37.           if (stmt != null) {  
    38.             stmt.close()  
    39.           }  
    40.           if (conn != null) {  
    41.             conn.close()  
    42.           }  
    43.         }  
    44.       }  
    45.       val repartitionedRDD = rdd.repartition(3)  
    46.       repartitionedRDD.foreachPartition(func)  
    47.     })  
    48.     ssc.start()  
    49.     ssc.awaitTermination()  
    50.   }  
    51. }  

    注意的细节:

    Dstream和RDD一样是延迟执行,只有遇到action操作才会真正去计算。因此在Dstream的内部RDD必须包含Action操作才能是接受到的数据得到处理。即使代码中包含foreachRDD,但在内部却没有action的RDD,SparkStream只会简单地接受数据数据而不进行处理

     
  • 相关阅读:
    乱码解决
    Collection接口
    YTU EDG Vince Day Training -- 训练赛赛后总结
    Codeforces Round #751 (Div. 2) A. Two Subsequences
    Codeforces Round #750 (Div. 2) C. Grandma Capa Knits a Scarf
    Codeforces Round #745 (Div. 2) B. Diameter of Graph
    Codeforces Round #745 (Div. 2) A. CQXYM Count Permutations
    ytuoj-3328 快速幂
    Codeforces Round #746 (Div. 2) C. Bakry and Partitioning
    Codeforces Round #747 (Div. 2) B. Special Numbers
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723828.html
Copyright © 2011-2022 走看看