zoukankan      html  css  js  c++  java
  • How to implement connection pool in spark streaming

    在spark streaming的文档里,有这么一段:

    def sendPartition(iter):
        # ConnectionPool is a static, lazily initialized pool of connections
        connection = ConnectionPool.getConnection()
        for record in iter:
            connection.send(record)
        # return to the pool for future reuse
        ConnectionPool.returnConnection(connection)
    
    dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

    但是怎么让worker得到一个ConectionPool呢?简单的想法是在使用static变量指向一个ConnectionPool。但这里有一个讲究:怎么保证这个ConnectionPool是worker上的,而不是driver上的?

    用pyhton为例:

    在ConnectionPool.py里实现一个pool

    #/usr/bin/python
    #connection_pool.py
    import
    psycopg2 import settings from DBUtils.PooledDB import PooledDB pool = PooledDB(psycopg2, settings.connection_pool_size, host=settings.db_host, database=settings.database, user=settings.db_user, password=settings.db_password)
    def getConnection():
    return pool.connection()

    假设stream的主代码在main.py里,提交spark

    spark-submit --py-files connection_pool.py main.py

    这样connection_pool.py将被发送到worker执行,main.py里的 sendPartition 在worker节点上执行的时候就可以获得ConnectionPool.getConnection()调用。

    这里的关键是明白哪些代码在driver上跑,哪些在worker上跑。

     
  • 相关阅读:
    Grunt中批量无损压缩图片插件--Grunt-contrib-imagemin
    移动端前端题
    前端面试题,不解释...
    HTMLFormElement获取表单里面所有的值然后以json形式返回
    关于 CommonJS AMD CMD UMD 规范的差异总结
    mysql库安装
    安装mysql到ubuntu
    一个致命的操作
    vim编辑器常规配置
    samba的安装与配置
  • 原文地址:https://www.cnblogs.com/englefly/p/4579863.html
Copyright © 2011-2022 走看看