zoukankan      html  css  js  c++  java
  • python socketpool:通用连接池(转)

    简介

    在软件开发中经常要管理各种“连接”资源,通常我们会使用对应的连接池来管理,比如mysql数据库连接可以用sqlalchemy中的池来管理,thrift连接可以通过thriftpool管理,redis-py中的StrictRedis实现本身就是基于连接池的,等等。 而今天介绍的socketpool是一个通用的python连接池库,通过它可以实现任意类型连接的管理,虽然不是很完美,但在一些找不到合适连接池实现、而又不想自己造轮子的时候使用起来会节省很多精力。

    内部实现要点

    • 这个类库的代码其实并不是特别的漂亮,但结构设计的不错,关键留下了对拓展开放的钩子,能让使用者根据自己的需要定制自己的连接池
    • 内部主要的组件有ConnectionPool,Connector和backend_mod三个
      • ConnectionPool实现了一个连接池的通用逻辑,用一个优先级队列管理所有连接,另外支持connection的生命周期定制,有一个reap机制(可选),基本思想是每个conn有一个最大生命周期,比如600秒,过了这个时间,就必须回收掉,reap线程(也有可能是greenlet或eventlet)定期检查过期的conn并进行回收
      • Connector是一个接口,它可以看做是一个制造conn的工厂,ConnectionPool在需要新建conn的时候,会通过这个工厂来生成conn。所以我们只要实现Connector的接口方法就可以定制一个自己的连接工厂
      • backend_mod是为了支持不同的线程模型(比如python原生线程,gevent或者eventlet)抽象出来的后端模块,它统一封装了Socket, PriorityQueue, Semaphore等和并发模型相关的组件,在创造ConnectionPool对象时可以通过参数控制选用哪种backend

    部分代码阅读

    ConnectionPool的初始化函数

    复制代码
         def __init__(self, factory,
                      retry_max=3, retry_delay=.1,
                      timeout=-1, max_lifetime=600.,
                      max_size=10, options=None,
                      reap_connections=True, reap_delay=1,
                      backend="thread"):
    
             if isinstance(backend, str):
                 self.backend_mod = load_backend(backend)
                 self.backend = backend
             else:
                 self.backend_mod = backend
                 self.backend = str(getattr(backend, '__name__', backend))
             self.max_size = max_size
             self.pool = getattr(self.backend_mod, 'PriorityQueue')()
             self._free_conns = 0
             self.factory = factory
             self.retry_max = retry_max
             self.retry_delay = retry_delay
             self.timeout = timeout
             self.max_lifetime = max_lifetime
             if options is None:
                 self.options = {"backend_mod": self.backend_mod,
                                 "pool": self}
             else:
                 self.options = options
                 self.options["backend_mod"] = self.backend_mod
                 self.options["pool"] = self
    
             # bounded semaphore to make self._alive 'safe'
             self._sem = self.backend_mod.Semaphore(1)
    
             self._reaper = None
             if reap_connections:
                 self.reap_delay = reap_delay
                 self.start_reaper()
    复制代码
     

    这里几个参数的意义:

    • factory是类对象,需要实现Connector接口,用来生成conn,options是调用factory时传入的参数
    • retry_max是获取conn时如果出错最多重试几次
    • max_lifetime是规定每个conn最大生命时间,见上面说的reap机制
    • max_size是这个pool的大小上限
    • backend是线程模型
    • reap_connections控制是否启用reap机制

    被启动的reap就是一个单独的线程,定时调用下面的方法把过期的conn回收掉:

    复制代码
         def murder_connections(self):
             current_pool_size = self.pool.qsize()
             if current_pool_size > 0:
                 for priority, candidate in self.pool:
                     current_pool_size -= 1
                     if not self.too_old(candidate):
                         self.pool.put((priority, candidate))
                     else:
                         self._reap_connection(candidate)
                     if current_pool_size <= 0:
                         break
    复制代码

    _reap_connection最终会回调conn对象的invalidate方法(Connector的接口)进行销毁。每次使用完conn后会调用release_connection, 它的逻辑是

    复制代码
         def release_connection(self, conn):
             if self._reaper is not None:
                 self._reaper.ensure_started()
    
             with self._sem:
                 if self.pool.qsize() < self.max_size:
                     connected = conn.is_connected()
                     if connected and not self.too_old(conn):
                         self.pool.put((conn.get_lifetime(), conn))
                     else:
                         self._reap_connection(conn)
                 else:
                     self._reap_connection(conn)
    复制代码

    如果连接还没过期或断开,就会被重新放入优先级队列中,用户可以通过实现Connector接口的get_lifetime来控制这里放回的conn的优先级,priority最小的conn下次会被优先取出

    Connector定义了哪些接口呢?

    复制代码
     class Connector(object):
         def matches(self, **match_options):
             raise NotImplementedError()
    
         def is_connected(self):
             raise NotImplementedError()
    
         def handle_exception(self, exception):
             raise NotImplementedError()
    
         def get_lifetime(self):
             raise NotImplementedError()
    
         def invalidate(self):
             raise NotImplementedError()
    复制代码

    matches方法主要用在pool取出一个conn时,除了优先选择priority最小的conn,还需要这个conn和get(**options)传入的参数match,这个match就是回调conn的matches方法。其他几个接口前面都涉及到了。

    TcpConnector实现

    来看一下socketpool自带的TcpConnector的实现,实现tcp socket的工厂

    复制代码
     class TcpConnector(Connector):
    
         def __init__(self, host, port, backend_mod, pool=None):
             self._s = backend_mod.Socket(socket.AF_INET, socket.SOCK_STREAM)
             self._s.connect((host, port))
             self.host = host
             self.port = port
             self.backend_mod = backend_mod
             self._connected = True
             # use a 'jiggle' value to make sure there is some
             # randomization to expiry, to avoid many conns expiring very
             # closely together.
             self._life = time.time() - random.randint(0, 10)
             self._pool = pool
    
         def __del__(self):
             self.release()
    
         def matches(self, **match_options):
             target_host = match_options.get('host')
             target_port = match_options.get('port')
             return target_host == self.host and target_port == self.port
    
         def is_connected(self):
             if self._connected:
                 return util.is_connected(self._s)
             return False
    
         def handle_exception(self, exception):
             print('got an exception')
             print(str(exception))
    
         def get_lifetime(self):
             return self._life
    
         def invalidate(self):
             self._s.close()
             self._connected = False
             self._life = -1
    
         def release(self):
             if self._pool is not None:
                 if self._connected:
                     self._pool.release_connection(self)
                 else:
                     self._pool = None
    
         def send(self, data):
             return self._s.send(data)
    
         def recv(self, size=1024):
             return self._s.recv(size)
    复制代码

    不需要太多额外解释。

    拓展实现HiveConnector

    根据自身项目需要,我用pyhs2实现了一个hive连接池

    复制代码
     class HiveConnector(Connector):
    
         def __init__(self, host, port, backend_mod, pool=None, authMechanism='NOSASL',
                      **options):
             self.host = host
             self.port = port
             self.backend_mod = backend_mod
             self._pool = pool
             self._connected = False
             self._conn = pyhs2.connect(host=host,
                                        port=port,
                                        authMechanism=authMechanism
                                        )
             self._connected = True
             # use a 'jiggle' value to make sure there is some
             # randomization to expiry, to avoid many conns expiring very
             # closely together.
             self._life = time.time() - random.randint(0, 10)
    
         def __del__(self):
             self.release()
    
         def matches(self, **match_options):
             target_host = match_options.get('host')
             target_port = match_options.get('port')
             return target_host == self.host and target_port == self.port
    
         def is_connected(self):
             return self._connected
    
         def handle_exception(self, exception):
             logger.exception("error: %s" % str(exception))
    
         def get_lifetime(self):
             return self._life
    
         def invalidate(self):
             try:
                 self._conn.close()
             except:
                 pass
             finally:
                 self._connected = False
                 self._life = -1
    
         def release(self):
             if self._pool is not None:
                 if self._connected:
                     self._pool.release_connection(self)
                 else:
                     self._pool = None
    
         def cursor(self):
             return self._conn.cursor()
    
         def execute(self, hql):
             with self.curosr() as cur:
                 return cur.execute(hql)
    
     hive_pool = ConnectionPool(factory=HiveConnector, **HIVE_CONNECTOR_CONFIG)
    复制代码

    使用这个hive_pool去执行hql语句非常容易:

         with hive_pool.connection() as conn:
             with conn.cursor() as cur:
                 print cur.getDatabases()

    总结

    简绍了socketpool的内部实现,以及如何使用它构造自己的连接池。

    转自:https://www.cnblogs.com/quijote/p/4388900.html

  • 相关阅读:
    还是模块
    模块
    Django之中间件和Auth模块
    Django之form表单组件、cookie与session
    ORM表查询之F查询和Q查询以及事务
    django之单表和多表查询
    django之模板层
    Django之路由
    Django之前戏
    前端之Bootstrap框架
  • 原文地址:https://www.cnblogs.com/zl1991/p/10825724.html
Copyright © 2011-2022 走看看