zoukankan      html  css  js  c++  java
  • Python各类并发模版

    以后在写一些Python并发的时候参考下面这个模块,小西总结的挺全的,直接搬砖了。

    进程并发

    from multiprocessing import Pool, Manager
    def func(d, results):
        res = d + 1
        print(res)
        results.append(res)
    
    
    if __name__ == "__main__":
        num = 5
        data = range(40)
        print(data)
        pool = Pool(processes=num)
        manager = Manager()
        results = manager.list()
        jobs = []
        for d in data:
            job = pool.apply_async(func, (d, results))
            jobs.append(job)
        pool.close()
        pool.join()
        print(results)
    from multiprocessing import Pool, Manager
    
    
    def func(d, results):
        res = d + 1
        print(res)
        results.append(res)
    
    
    if __name__ == "__main__":
        num = 5
        data = range(40)
        print(data)
        pool = Pool(processes=num)
        manager = Manager()
        results = manager.list()
        jobs = []
        for d in data:
            job = pool.apply_async(func, (d, results))
            jobs.append(job)
        pool.close()
        pool.join()
        print(results)
    
    

    线程并发

    from multiprocessing.pool import ThreadPool
    
    def func(d):
        res = d + 1
        print(res)
        return res
    
    
    def ThreadPools():
        num = 5
        data = range(40)
        print(data)
        jobs = []
        results = []
        pool = ThreadPool(num)
        for d in data:
            job = pool.apply_async(func, (d,))
            jobs.append(job)
        pool.close()
        pool.join()
        for i in jobs:
            results.append(i.get())
        print(results)
    
    
    if __name__ == '__main__':
        ThreadPools()
    
    

    协程并发

    python3版本利用gevent库

    import gevent
    from gevent import monkey, pool; monkey.patch_all()
    from gevent import Timeout
    
    def func(d):
        res = d + 1
        print(res)
        return res
    
    
    def GeventPools():
        num = 8
        data = range(40)
        print(data)
        results = []
        p = pool.Pool(num)
        timer = Timeout(60 * 1200).start()  # Execute up to 120 minutes per coroutine
        jobs = []
        for d in data:
            job = p.spawn(func, d)
            jobs.append(job)
        try:
            gevent.joinall(jobs)  # wait all jobs done
        except Timeout:
            print("[-] Time out....")
        except Exception as e:
            print("[-] error:{}".format(e))
        finally:
            pass
        for i in jobs:
            results.append(i.get())
        print(results)
    
    if __name__=='__main__':
        GeventPools()
    

    python3版本利用asyncpool库

    import asyncio
    import asyncpool
    import logging
    import functools
    
    
    
    def func(d):
        res = d + 1
        print(res)
        return res
    
    
    def asyncmul():
        async def worker_coro(data, result_queue):
            # print("Processing Value! -> {}".format(data))
            results = await loop.run_in_executor(None, functools.partial(func, data))
            await result_queue.put(results)
    
        async def result_reader(queue):
            while True:
                value = await queue.get()
                if value is None:
                    break
                results.append(value)
                # print("Got value! -> {}".format(value))
    
        async def run():
            result_queue = asyncio.Queue()
            reader_future = asyncio.ensure_future(result_reader(result_queue), loop=loop)
            # Start a worker pool with 10 coroutines, invokes `example_coro` and waits for it to complete or 5 minutes to pass.
            async with asyncpool.AsyncPool(loop, num_workers=num, name="WorkerPool",
                                           logger=logging.getLogger("WorkerPool"),
                                           worker_co=worker_coro, max_task_time=5 * 60,
                                           log_every_n=10) as pool:
                for d in data:
                    await pool.push(d, result_queue)
    
            await result_queue.put(None)
            await reader_future
    
        num = 8
        data = range(40)
        print(data)
        results = []
        loop = asyncio.get_event_loop()
        loop.run_until_complete(run())
        print(results)
    
    asyncmul()
    

    python常用正则:

    import re
    html='aa11bb22cc'
    pattern=re.compile(r'aa(.+?)b')
    print pattern.findall(html)
    
    import re
    html='aa11bb22cc'
    print re.findall(r'aa(.+?)b',html)
    

    requests的session会话对象来进行处理。会话对象让你能够跨请求保持某些参数。它也会在同一个 Session 实例发出的所有请求之间保持 cookie

    import  requests
    
    def login():
       '''登录接口:/auth/login'''
       s=requests.Session()
       r=s.post(
          url='http://11X.39.63.XX:20080/auth/login',
          data={'username':'system','password':'123456'})
       return s
    
    def selectable():
       r=login().get(
          url='http://11X.39.63.XX:20080/depot/parks/selectable')
       print r.status_code
       print r.text
    
    selectable()
    

    上传图片

    import requests
    upload_url="http://baidu.com"
    header={"ct":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"}
    proxies={"http":"http://127.0.0.1:8082"}
    files = {'file':('hh.jpg',open('hh.jpg','rb'),'image/jpeg')}
    upload_data={"parentId":"","fileCategory":"personal","fileSize":179,"fileName":"summer_text_0920.txt","uoType":1}
    upload_res=requests.post(url=upload_url,data=upload_data,files=files,headers=header,proxies=proxies)
    

    进程+线程并发,进程+协程并发参考下面链接,先留个坑,以后用的时候遇到问题再来改
    http://momomoxiaoxi.com/python/2019/03/12/python/

  • 相关阅读:
    Java 常用工具类
    Shiro 分析
    Oracle 恢复表操作内容
    Struts2 中的配置文件 package name 、namespace 以及 对象方法调用
    MySql 修改字符集
    命名空间、静态函数、实例函数
    Eclipse Tomcate 热部署
    Java Json
    Mybatis 存储过程调用
    HDFS源码分析心跳汇报之数据结构初始化
  • 原文地址:https://www.cnblogs.com/afanti/p/10536963.html
Copyright © 2011-2022 走看看