#!/usr/bin/python
# -*- coding: UTF-8 -*-
'''@auther :mr.qin
@IDE:pycharm'''
from tool.Common import *
from tool.request_data_change_to_StrEncode import request_data_change_to_str
from tool.tencent_cloud_mysql_connect import Mysql_operation
import time
import requests
from multiprocessing import Process,Queue,Pool,Manager,Lock
import os
my_op = Mysql_operation()
get_data = '''select order_id,order_sn from order_info limit 100;'''
data = my_op.sql_operation(get_data)
data_to_str = request_data_change_to_str()
lock=Lock()
def testDoWeChatNotify(q):
fwh_order_dict = {}
fwh_order_id_list, fwh_order_sn_list = [], []
if data!=():
for a in data:
fwh_order_id=a['order_id']
fwh_order_sn=a['order_sn']
fwh_order_id_list.append(fwh_order_id)
fwh_order_sn_list.append(fwh_order_sn)
fwh_order_dict['fwh_order_id']=fwh_order_id_list
fwh_order_dict['fwh_order_sn']=fwh_order_sn_list
# print("put %s to queue...." % (fwh_order_dict))
q.put(fwh_order_dict)
return q.qsize()#返回队列数量
def asynchronousPay_process(q,order_id,order_sn):
if (data!=()):
url_wechat_success_huidiao=fwh_test_api+'/index/Order/doWeChatNotify'
data_wechat_success_huidiao=data_to_str.requestDataToStr_firefoxAndChrome_fwh('''order_sn:{}
order_id:{}
meth_id:4
timestamp:157129653969
sign:0687b01b300b9e300d3996a9d2173f1380973e5a'''.format(order_sn,order_id))
# print(data_wechat_success_huidiao)
request_wechat_success_huidiao=requests.post(url=url_wechat_success_huidiao,headers=headers_form_urlencoded,data=data_wechat_success_huidiao)
response_wechat_success_huidiao=request_wechat_success_huidiao.json()
if '订单状态错误,非待支付订单' in response_wechat_success_huidiao['msg']:
print(data_wechat_success_huidiao)
# print(request_wechat_success_huidiao.json())
else:
print('待支付订单为空')
def run_multiprocess():
p = Pool(9)
q = Manager().Queue()
size = list(range(testDoWeChatNotify(q)))
info = q.get()
for i in range(len(info['fwh_order_id'])):
lock.acquire()#加互斥锁
# print('当前进程{},父进程id{}'.format(os.getpid(),os.getppid()))
p.apply_async(func=asynchronousPay_process,
args=(q, info['fwh_order_id'][i], info['fwh_order_sn'][i]))
lock.release()#解锁
p.close()
p.join()
if __name__=="__main__":
start_time = time.time() # 计算程序开始时间
run_multiprocess()
# q=Manager().Queue()
# size = list(range(testDoWeChatNotify(q)))
# info = q.get()
# for i in range(len(info['fwh_order_id'])):
# asynchronousPay_process(q,info['fwh_order_id'][i], info['fwh_order_sn'][i])
print('程序耗时{:.2f}'.format(time.time() - start_time)) # 计算程序总耗时
思路:与多线程实现思路差不多,但是所用模块不同,多进程用multiprocessing模拟,多线程用therading模块
多进程队列使用multiprocessing.Queue()或者multiprocessing.Manager().Queue(),多线程使用queue.Queue(),apply_async方法是apply方法的非阻塞式(异步)版本,Queue引入确实提高了执行速度,亲测:为使用多进程前支付100个订单用了四十多秒,使用之后用了5.x秒,
这根电脑的性能会有一些关系!
问题:
1.在调试的过程中我想用一个封装一个类来完成,但是调试了很久,都有问题(队列无法公用的问题),忘大佬指教;
2.线程启动的数量更多(进程启动了8个,线程启动了100个),为啥会没有多进程的执行速度快?(同时使用join的情况下),这里修正一下,多线程执行速度会快一些,因为代码跟运行速度不一致所以造成了这种错觉
3.会出现重复请求的请求,但是只会有一次(我创建了八个进程,为什么只会重复一次)