zoukankan      html  css  js  c++  java
  • Spark Streaming之妙用foreachRDD和foreachPartition —— 在foreachPartition去中去创建socket是明智的做法,直接rdd是有问题的

    Spark Streaming之妙用foreachRDD和foreachPartition

    https://blog.csdn.net/u013709270/article/details/78857802


    展开
    0. 前言
      DStream中的foreachRDD是一个非常强大函数,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。所以要掌握它,对它要有深入了解。下面有一些常用的错误需要理解。经常写数据到外部系统需要创建一个连接的对象(例如根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄)和发送数据到远程的系统。为此,开发者需要在Spark的driver创建一个对象用于连接。

    1. 问题
      为了达到这个目的,开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在 Spark worker 中尝试调用这个连接对象保存记录到RDD中。如下:

    dstream.foreachRDD { rdd =>
    val connection = createNewConnection() // executed at the driver
    rdd.foreach { record =>
    connection.send(record) // executed at the worker
    }
    }
    1
    2
    3
    4
    5
    6
    2. 解决方案
      这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。

      然而,这会造成另外一个常见的错误:为每一个记录创建了一个连接对象。例如:

    dstream.foreachRDD { rdd =>
    rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
    }
    }
    1
    2
    3
    4
    5
    6
    7
    3. 优化方案
      通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。一个更好的解决办法是利用rdd.foreachPartition方法。 为RDD的partition创建一个连接对象,用这个连接对象发送 partition 中的所有记录。

    dstream.foreachRDD { rdd =>
    rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record =>
    connection.send(record)
    )
    connection.close()
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    4. 进一步优化  
      最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支:

    dstream.foreachRDD { rdd =>
    rdd.foreachPartition { partitionOfRecords =>
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record =>
    connection.send(record)
    )
    ConnectionPool.returnConnection(connection)
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
      需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。

    5. 写在最后
    其它需要注意的地方:

    输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作dstream.foreachRDD(),但是没有任何RDD action操作在dstream.foreachRDD()里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。

    默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。
    ————————————————
    版权声明:本文为CSDN博主「JeemyJohn」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/u013709270/java/article/details/78857802

  • 相关阅读:
    Rails中使用图表(ChartJs)显示数据
    Ubuntu下sublime无法输入中文问题完美解决
    ruby--$:.unshift File.expand_path('..', __FILE__)(转载)
    消息闪现
    Rails中编写自己的任务
    Rails定时任务
    rails中设置配置信息 读取配置信息
    在lib下编写的自己的类需要用到model内相关方法
    rails_ajax 验证验证码
    rails添加验证码
  • 原文地址:https://www.cnblogs.com/bonelee/p/12919624.html
Copyright © 2011-2022 走看看