转载自:http://blog.csdn.net/jiangpeng59/article/details/53318761
foreachRDD通常用来把SparkStream运行得到的结果保存到外部系统比如HDFS、Mysql、Redis等等。了解下面的知识可以帮助我们避免很多误区
误区1:实例化外部连接对象的位置不正确,比如下面代码
- dstream.foreachRDD { rdd =>
- val connection = createNewConnection() // executed at the driver
- rdd.foreach { record =>
- connection.send(record) // executed at the worker
- }
- }
误区2:为每条记录都创建一个连接对象
- dstream.foreachRDD { rdd =>
- rdd.foreach { record =>
- val connection = createNewConnection()
- connection.send(record)
- connection.close()
- }
- }
然后,给出了一个比较好的方法,为每一个分区创建一个连接对象,其具体代码如下
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- val connection = createNewConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- connection.close()
- }
- }
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- // ConnectionPool is a static, lazily initialized pool of connections
- val connection = ConnectionPool.getConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- ConnectionPool.returnConnection(connection) // return to the pool for future reuse
- }
- }
下面给出网上一段把SparkStream的结果保存到Mysql中的代码示例
- package spark.examples.streaming
- import java.sql.{PreparedStatement, Connection, DriverManager}
- import java.util.concurrent.atomic.AtomicInteger
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- object SparkStreamingForPartition {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("NetCatWordCount")
- conf.setMaster("local[3]")
- val ssc = new StreamingContext(conf, Seconds(5))
- //The DStream is a collection of RDD, which makes the method foreachRDD reasonable
- val dstream = ssc.socketTextStream("192.168.26.140", 9999)
- dstream.foreachRDD(rdd => {
- //embedded function
- def func(records: Iterator[String]) {
- var conn: Connection = null
- var stmt: PreparedStatement = null
- try {
- val url = "jdbc:mysql://192.168.26.140:3306/person";
- val user = "root";
- val password = ""
- conn = DriverManager.getConnection(url, user, password)
- records.flatMap(_.split(" ")).foreach(word => {
- val sql = "insert into TBL_WORDS(word) values (?)";
- stmt = conn.prepareStatement(sql);
- stmt.setString(1, word)
- stmt.executeUpdate();
- })
- } catch {
- case e: Exception => e.printStackTrace()
- } finally {
- if (stmt != null) {
- stmt.close()
- }
- if (conn != null) {
- conn.close()
- }
- }
- }
- val repartitionedRDD = rdd.repartition(3)
- repartitionedRDD.foreachPartition(func)
- })
- ssc.start()
- ssc.awaitTermination()
- }
- }
注意的细节:
Dstream和RDD一样是延迟执行,只有遇到action操作才会真正去计算。因此在Dstream的内部RDD必须包含Action操作才能是接受到的数据得到处理。即使代码中包含foreachRDD,但在内部却没有action的RDD,SparkStream只会简单地接受数据数据而不进行处理