以后在写一些Python并发的时候参考下面这个模块,小西总结的挺全的,直接搬砖了。
进程并发
from multiprocessing import Pool, Manager
def func(d, results):
res = d + 1
print(res)
results.append(res)
if __name__ == "__main__":
num = 5
data = range(40)
print(data)
pool = Pool(processes=num)
manager = Manager()
results = manager.list()
jobs = []
for d in data:
job = pool.apply_async(func, (d, results))
jobs.append(job)
pool.close()
pool.join()
print(results)
from multiprocessing import Pool, Manager
def func(d, results):
res = d + 1
print(res)
results.append(res)
if __name__ == "__main__":
num = 5
data = range(40)
print(data)
pool = Pool(processes=num)
manager = Manager()
results = manager.list()
jobs = []
for d in data:
job = pool.apply_async(func, (d, results))
jobs.append(job)
pool.close()
pool.join()
print(results)
线程并发
from multiprocessing.pool import ThreadPool
def func(d):
res = d + 1
print(res)
return res
def ThreadPools():
num = 5
data = range(40)
print(data)
jobs = []
results = []
pool = ThreadPool(num)
for d in data:
job = pool.apply_async(func, (d,))
jobs.append(job)
pool.close()
pool.join()
for i in jobs:
results.append(i.get())
print(results)
if __name__ == '__main__':
ThreadPools()
协程并发
python3版本利用gevent库
import gevent
from gevent import monkey, pool; monkey.patch_all()
from gevent import Timeout
def func(d):
res = d + 1
print(res)
return res
def GeventPools():
num = 8
data = range(40)
print(data)
results = []
p = pool.Pool(num)
timer = Timeout(60 * 1200).start() # Execute up to 120 minutes per coroutine
jobs = []
for d in data:
job = p.spawn(func, d)
jobs.append(job)
try:
gevent.joinall(jobs) # wait all jobs done
except Timeout:
print("[-] Time out....")
except Exception as e:
print("[-] error:{}".format(e))
finally:
pass
for i in jobs:
results.append(i.get())
print(results)
if __name__=='__main__':
GeventPools()
python3版本利用asyncpool库
import asyncio
import asyncpool
import logging
import functools
def func(d):
res = d + 1
print(res)
return res
def asyncmul():
async def worker_coro(data, result_queue):
# print("Processing Value! -> {}".format(data))
results = await loop.run_in_executor(None, functools.partial(func, data))
await result_queue.put(results)
async def result_reader(queue):
while True:
value = await queue.get()
if value is None:
break
results.append(value)
# print("Got value! -> {}".format(value))
async def run():
result_queue = asyncio.Queue()
reader_future = asyncio.ensure_future(result_reader(result_queue), loop=loop)
# Start a worker pool with 10 coroutines, invokes `example_coro` and waits for it to complete or 5 minutes to pass.
async with asyncpool.AsyncPool(loop, num_workers=num, name="WorkerPool",
logger=logging.getLogger("WorkerPool"),
worker_co=worker_coro, max_task_time=5 * 60,
log_every_n=10) as pool:
for d in data:
await pool.push(d, result_queue)
await result_queue.put(None)
await reader_future
num = 8
data = range(40)
print(data)
results = []
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
print(results)
asyncmul()
python常用正则:
import re
html='aa11bb22cc'
pattern=re.compile(r'aa(.+?)b')
print pattern.findall(html)
import re
html='aa11bb22cc'
print re.findall(r'aa(.+?)b',html)
requests的session会话对象来进行处理。会话对象让你能够跨请求保持某些参数。它也会在同一个 Session 实例发出的所有请求之间保持 cookie
import requests
def login():
'''登录接口:/auth/login'''
s=requests.Session()
r=s.post(
url='http://11X.39.63.XX:20080/auth/login',
data={'username':'system','password':'123456'})
return s
def selectable():
r=login().get(
url='http://11X.39.63.XX:20080/depot/parks/selectable')
print r.status_code
print r.text
selectable()
上传图片
import requests
upload_url="http://baidu.com"
header={"ct":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"}
proxies={"http":"http://127.0.0.1:8082"}
files = {'file':('hh.jpg',open('hh.jpg','rb'),'image/jpeg')}
upload_data={"parentId":"","fileCategory":"personal","fileSize":179,"fileName":"summer_text_0920.txt","uoType":1}
upload_res=requests.post(url=upload_url,data=upload_data,files=files,headers=header,proxies=proxies)
进程+线程并发,进程+协程并发参考下面链接,先留个坑,以后用的时候遇到问题再来改
http://momomoxiaoxi.com/python/2019/03/12/python/