zoukankan      html  css  js  c++  java
  • 通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中

    转载自:http://blog.csdn.net/erfucun/article/details/52312682

    本博文主要内容包括:

    • 技术实现foreachRDD与foreachPartition解析
    • foreachRDD与foreachPartition实现实战

    一:技术实现foreach解析:

    1、首先我们看一下Output Operations on DStreams提供的API: 
    这里写图片描述 
    这里写图片描述

    SparkStreaming的DStream提供了一个dstream.foreachRDD方法,该方法是一个功能强大的原始的API,它允许将数据发送到外部系统。然而,重要的是要了解如何正确有效地使用这种原始方法。一些常见的错误,以避免如下: 
    写数据到外部系统,需要建立一个数据连接对象(例如TCP连接到远程的服务器),使用它将数据发送到外部存储系统。为此开发者可能会在Driver中尝试创建一个连接,然后在worker中使用它来保存记录到外部数据。代码如下:

    
    
    1. dstream.foreachRDD { rdd =>
    2.   val connection = createNewConnection()  // executed at the driver
    3.   rdd.foreach { record =>
    4.     connection.send(record) // executed at the worker
    5.   }}

    上面的代码是一个错误的演示,因为连接是在Driver中创建的,而写数据是在worker中完成的。此时连接就需要被序列化然后发送到worker中。但是我们知道,连接的信息是不能被序列化和反序列化的(不同的机器连接服务器需要使用不同的服务器端口,即便连接被序列化了也不能使用)

    进而我们可以将连接移动到worker中实现,代码如下:

    
    
    1. dstream.foreachRDD { rdd =>
    2.   rdd.foreach { record =>
    3.     val connection = createNewConnection()
    4.     connection.send(record)
    5.     connection.close()
    6.   }}

     

    但是此时,每处理一条数据记录,就需要连接一次外部系统,对于性能来说是个严重的问题。这也不是一个完美的实现。

    Spark基于RDD进行编程,RDD的数据不能改变,如果擅长foreachPartition底层的数据可能改变,做到的方式foreachPartition操作一个数据结构,RDD里面一条条数据,但是一条条的记录是可以改变的spark也可以运行在动态数据源上。(就像数组的数据不变,但是指向的索引可以改变) 
    我们可以将代码做如下的改进:

    
    
    1. dstream.foreachRDD { rdd =>
    2.   rdd.foreachPartition { partitionOfRecords =>
    3.     val connection = createNewConnection()
    4.     partitionOfRecords.foreach(record => connection.send(record))
    5.     connection.close()
    6.   }}

     

    这样一个partition,只需连接一次外部存储。性能上有大幅度的提高。但是不同的partition之间不能复用连接。我们可以使用连接池的方式,使得partition之间可以共享连接。代码如下:

    
    
    1. stream.foreachRDD { rdd =>
    2.   rdd.foreachPartition { partitionOfRecords =>
    3.     // ConnectionPool is a static, lazily initialized pool of connections
    4.     val connection = ConnectionPool.getConnection()
    5.     partitionOfRecords.foreach(record => connection.send(record))
    6.     ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
    7.   }}

     

    二:foreachRDD与foreachPartition实现实战

    1、需要注意的是: 
    (1)、你最好使用forEachPartition函数来遍历RDD,并且在每台Work上面创建数据库的connection。 
    (2)、如果你的数据库并发受限,可以通过控制数据的分区来减少并发。 
    (3)、在插入MySQL的时候最好使用批量插入。 
    (4),确保你写入的数据库过程能够处理失败,因为你插入数据库的过程可能会经过网络,这可能导致数据插入数据库失败。 
    (5)、不建议将你的RDD数据写入到MySQL等关系型数据库中。

    2、下面我们使用SparkStreaming实现将数据写到MySQL中:

    (1)在pom.xml中加入如下依赖包

    
    
    1. <dependency>
    2.     <groupId>mysql</groupId>
    3.     <artifactId>mysql-connector-java</artifactId>
    4.     <version>5.1.38</version>
    5. </dependency>
    6. <dependency>
    7.     <groupId>commons-dbcp</groupId>
    8.     <artifactId>commons-dbcp</artifactId>
    9.     <version>1.4</version>
    10. </dependency>

    (2)在MySql中创建数据库和表,命令操作如下:

    
    
    1. mysql -uroot -p
    2. create database spark;
    3. use spark;
    4. show tables;
    5. create table streaming_itemcount(keyword varchar(30));

     

    使用Java编写一个数据库连接池类

    
    
    1. import java.sql.Connection;
    2. import java.sql.DriverManager;
    3. import java.util.LinkedList;
    4.  
    5. /**
    6.  * Created by zpf on 2016/8/26.
    7.  */
    8. public class ConnectionPool {
    9.     private static LinkedList<Connection> connectionQueue;
    10.  
    11.     static {
    12.         try {
    13.             Class.forName("com.mysql.jdbc.Driver");
    14.         } catch (ClassNotFoundException e) {
    15.             e.printStackTrace();
    16.         }
    17.     }
    18.  
    19.     public synchronized static Connection getConnection() {
    20.         try {
    21.             if (connectionQueue == null) {
    22.                 connectionQueue = new LinkedList<Connection>();
    23.                 for (int i = 0; i < 5; i++) {
    24.                     Connection conn = DriverManager.getConnection(
    25.                             "jdbc:mysql://Master:3306/sparkstreaming",
    26.                             "root",
    27.                             "12345");
    28.                     connectionQueue.push(conn);
    29.                 }
    30.             }
    31.         } catch (Exception e) {
    32.             e.printStackTrace();
    33.         }
    34.         return connectionQueue.poll();
    35.  
    36.     }
    37.     public  static void returnConnection(Connection conn){
    38.      connectionQueue.push(conn);
    39.     }
    40. }

     

    编写Spark代码:

    
    
    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, StreamingContext}
    3.  
    4. /**
    5.   * Created by zpf on 2016/8/26.
    6.   */
    7. object OnlineForeachRDD2DB {
    8.   def main(args: Array[String]) {
    9.     val conf = new SparkConf().setAppName("OnlineForeachRDD2DB").setMaster("local[2]")
    10.     val ssc = new StreamingContext(conf, Seconds(5))
    11.  
    12.     val lines = ssc.socketTextStream("Master", 9999)
    13.     val words = lines.flatMap(_.split(" "))
    14.     val wordCounts = words.map(=> (x, 1)).reduceByKey(+ _)
    15.     wordCounts.foreachRDD { rdd =>
    16.       rdd.foreachPartition { partitionOfRecords => {
    17.         val connection = ConnectionPool.getConnection()
    18.         partitionOfRecords.foreach(record => {
    19.           val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
    20.           val stmt = connection.createStatement
    21.           stmt.executeUpdate(sql)
    22.         })
    23.         ConnectionPool.returnConnection(connection)
    24.  
    25.       }
    26.  
    27.       }
    28.     }
    29.   }
    30. }

     

    打开netcat发送数据

    
    
    1. root@spark-master:~# nc -lk 9999
    2. spark hadoop kafka spark hadoop kafka spark hadoop kafka spark hadoop

     

    打包运行spark代码

    
    
    1. /usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.35-bin.jar /root/Documents/SparkApps/SparkStreamApps.jar

     

    查看数据库中的结果:

    博文内容源自DT大数据梦工厂Spark课程总结的笔记相关课程内容视频可以参考: 百度网盘链接:http://pan.baidu.com/s/1slvODe1(如果链接失效或需要后续的更多资源,请联系QQ460507491或者微信号:DT1219477246 获取上述资料)。

  • 相关阅读:
    uniapp
    vue -element admin 修改request,headers添加参数
    uniapp
    css
    uniapp
    uniapp
    vue
    vue
    vue -element 修复select下拉框在移动端需要点击两次才能选中的问题
    vue
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723831.html
Copyright © 2011-2022 走看看