zoukankan      html  css  js  c++  java
  • python下multiprocessing和gevent的组合使用

    python下multiprocessing和gevent的组合使用

    对于有些人来说Gevent和multiprocessing组合在一起使用算是个又高大上又奇葩的工作模式.  

    Python的多线程受制于GIL全局锁的特性,Gevent身为协程也是线程的一种,只是io调度上自己说了算而已。 

    那么如何使用多个cpu核心? 可以利用多进程mutliprocessing来进行多核并行工作,在多进程里面使用gevent协程框架可以更好的做io调度,相比线程来说减少了无谓的上下文切换.  

    废话少说,直接上个例子.  下面是多进程下生产者消费者的工作模式,代码本身很简单,自己跑一下就知道怎么一回事了. 

    # blog: xiaorui.cc
    
    from multiprocessing import Process, cpu_count, Queue, JoinableQueue
    from gevent import monkey;
    
    monkey.patch_all();
    import gevent
    import datetime
    from Queue import Empty
    
    
    class Consumer(object):
        def __init__(self, q, no_tasks, name):
            self._no_tasks = no_tasks
            self._queue = q
            self.name = name
            self._rungevent(self._queue, self._no_tasks)
    
        def _rungevent(self, q, no_tasks):
            jobs = [gevent.spawn(self._printq) for x in xrange(no_tasks)]
            gevent.joinall(jobs)
    
        def _printq(self):
            while 1:
                value = self._queue.get()
                if value is None:
                    self._queue.task_done()
                    break
                else:
                    print("{0} time: {1}, value: {2}".format(self.name, 
                                                             datetime.datetime.now(), value))
            return
    
    
    class Producer(object):
        def __init__(self, q, no_tasks, name, consumers_tasks):
            print(name)
            self._q = q
            self._no_tasks = no_tasks
            self.name = name
            self.consumer_tasks = consumers_tasks
            self._rungevent()
    
        def _rungevent(self):
            jobs = [gevent.spawn(self.produce) for x in xrange(self._no_tasks)]
            gevent.joinall(jobs)
            for x in xrange(self.consumer_tasks):
                self._q.put_nowait(None)
            self._q.close()
    
        def produce(self):
            for no in xrange(100):
                print no
                self._q.put(no, block=False)
            return
    
    
    def main():
        total_cores = cpu_count()
        total_processes = total_cores * 2
        q = JoinableQueue()
        print("Gevent on top multiprocessing with 17 gevent coroutines 10 producers gevent and 7 consumers gevent")
        producer_gevents = 10
        consumer_gevents = 7
        jobs = []
        start = datetime.datetime.now()
        for x in xrange(total_processes):
            if not x % 2:
                p = Process(target=Producer, args=(q, producer_gevents, "producer %d" % x, consumer_gevents))
                p.start()
                jobs.append(p)
            else:
                p = Process(target=Consumer, args=(q, consumer_gevents, "consumer %d" % x))
                p.start()
                jobs.append(p)
    
        for job in jobs:
            job.join()
    
        print("{0} process with {1} producer gevents and {2} consumer gevents took{3}
               seconds to produce {4} numbers and consume".format(total_processes, 
                                                                  producer_gevents * total_cores,
                                                                  consumer_gevents * total_cores, 
                                                                  datetime.datetime.now() - start,
                                                                  producer_gevents * total_cores * 100))
    
    
    if __name__ == '__main__':
        main()

    test

  • 相关阅读:
    MapReduce学习总结之简介
    Hive Cli相关操作
    使用Hive UDF和GeoIP库为Hive加入IP识别功能
    Google Maps-IP地址的可视化查询
    hive多表联合查询(GroupLens->Users,Movies,Ratings表)
    云计算平台管理的三大利器Nagios、Ganglia和Splunk
    机器大数据也离不开Hadoop
    hive与hbase的整合
    hive优化之------控制hive任务中的map数和reduce数
    Hadoop管理员的十个最佳实践(转)
  • 原文地址:https://www.cnblogs.com/leijiangtao/p/11944182.html
Copyright © 2011-2022 走看看