zoukankan      html  css  js  c++  java
  • python中的并发执行

    一.Gevent

    1. Gevent实例

    import gevent
    import requests
    from gevent import monkey
    # socket发送请求以后就会进入等待状态,gevent更改了这个机制
    # socket.setblocking(False)  -->发送请求后就不会等待服务器响应
    monkey.patch_all()  # 找到内置的socket并更改为gevent自己的东西
     
    def fetch_async(method, url, req_kwargs):
        print(method, url, req_kwargs)
        response = requests.request(method=method, url=url, **req_kwargs)
        print(response.url, response.content)
     
    # ##### 发送请求 #####
    gevent.joinall([
        # 这里spawn是3个任务[实际是3个协程],每个任务都会执行fetch_async函数
        gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
        gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
        gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
    ])

    2. grequests实例

    grequests实际上就是封装了gevent里面的方法,然后配合requests实现异步的IO
    grequests = gevent + request
    grequests.map() 内部实现
    import grequests  # 实际上就是requests + gevent
    request_list = [
        # 发送get请求
        grequests.get('https://www.baidu.com/', timeout=10.001),
        grequests.get('https://www.taobao.com/'),
        grequests.get('https://hao.360.cn/')
    ]
    # ##### 执行并获取响应列表 #####
    response_list = grequests.map(request_list)  # 实际上内部循环执行gevent内部的joinall()方法
    print(response_list)
     
    # ##### 执行并获取响应列表(处理异常) #####
    # def exception_handler(request, exception):
    # print(request,exception)
    #     print("Request failed")
     
    # response_list = grequests.map(request_list, exception_handler=exception_handler)
    # print(response_list)

    3. 项目中的应用

        def user_batch_import(self, token, file, data):
            v = self._user_role(token, {})
            if "errcode" in v.keys():
                return v
    
            if 'user[college_id]' not in v:
                return {"errcode": 2, "errmsg": str(v)}
    
            import xlrd
            show_url = Graphics.upload_image(file, data["context_type"], data["content_type"])
            data_path = Graphics.format_nginx_data_url(show_url)
            book = xlrd.open_workbook(data_path)
            names = book.sheet_names()
    
            res = None
            for name in names:
                sheet = book.sheet_by_name(name)
                nrows = sheet.nrows  #
                if nrows == 0:
                    print(nrows)
                    continue
                else:
                    q = queue.Queue(maxsize=nrows)
                    threading.Thread(args=(q,))
    
                    first_line = sheet.row_values(0)
                    if first_line != ['编号', '姓名', '邮箱', '密码', '电话', '地址', '角色'] and len(first_line) != 7:
                        return {"errcode": 400, "errmsg": u"不支持此格式顺序的输入;正确格式:['编号', '姓名', '邮箱', '密码', '电话', '地址', '角色']"}
    
                    for temp in range(1, nrows):
                        rows = sheet.row_values(temp, start_colx=0, end_colx=None)
    
                        if rows[6] == "老师":
                            rows[6] = "TeacherEnrollment"
                        elif rows[6] == "学生":
                            rows[6] = "StudentEnrollment"
                        elif rows[6] == "助教":
                            rows[6] = "TaEnrollment"
                        elif rows[6] == "设计者":
                            rows[6] = "DesignerEnrollment"
                        elif rows[6] == "观察者":
                            rows[6] = "ObserverEnrollment"
                        else:
                            return {"errcode": 400, "errmsg": u"所写角色不存在,可选:【老师, 学生, 助教, 观察者, 设计者】"}
    
                        query = {"user[name]": rows[1], "pseudonym[unique_id]": rows[2], "pseudonym[password]": rows[3],
                                 "user[tel]": rows[4], "user[company]": rows[5], "user[time_zone]": "Beijing",
                                 "user[locale]": "zh-Hans", "user[terms_of_use]": True, "user[role_name]": rows[6],
                                 "user[college_id]": v["user[college_id]"]}
    
                        q.put(query)
    
                    res = self._user_batch_import(q)
            return {"errcode": 0, "errmsg": str(res)}
    
        def _user_batch_import(self, q):
            from app.api.apis import very_email, very_password, very_phone
            header = {"Authorization": "Bearer %s" % AccessToken}
            request_res = []
            while q.qsize() > 0:
                query = q.get()
                if very_email(query["pseudonym[unique_id]"]) and very_phone(query["user[tel]"], "tel") and very_password(
                        query["pseudonym[password]"], "password"):
                    pass
                else:
                    del query
    
                response = grequests.post(API_CANVAS_HOST + '/api/v1/accounts/{account_id}/users'.format(account_id=1),
                                          headers=header, data=query)
                request_res.append(response)
    
            res = grequests.map(request_res)
            return res

    二.asyncio

    1.应用

    import asyncio
    import requests
    asyncio.set_event_loop(asyncio.new_event_loop())
    loop = asyncio.get_event_loop()
    #loop = asyncio.new_event_loop()
    x = []
    def ccc():
        c = requests.get('https://baidu.com')
        x.append(c)
    async def t():
        temp = await loop.run_in_executor(None, ccc)
        print(temp)
    
    
    try:
        loop.run_until_complete(asyncio.gather(*[t() for i in range(3)]))
    finally:
        loop.close()
    print(x)

    2.实例

        def user_batch_import(self, file, data):
            college_id = g.user_info["college_id"]
            res_account_id = self.get_table_data_first(College, dict(id=college_id))
            account_id = res_account_id["account_id"]
            import xlrd
            show_url = Graphics.upload_image(file, data["context_type"], data["content_type"])
            data_path = Graphics.format_nginx_data_url(show_url)
            book = xlrd.open_workbook(data_path)
            names = book.sheet_names()
    
            res = None
            for name in names:
                sheet = book.sheet_by_name(name)
                nrows = sheet.nrows  #
                if nrows == 0:
                    print(nrows)
                    continue
                else:
                    q = queue.Queue(maxsize=nrows)
                    threading.Thread(args=(q,))
    
                    first_line = sheet.row_values(0)
                    if first_line != ["*姓名", "*本人手机", "性别", "出生年月", "民族", "籍贯:省/市", "学校所在地区", "学校全称",
                                      "监护人姓名", "与学生的关系", "监护人联系电话"] and len(first_line) != 11:
                        return {"errcode": 400, "errmsg": u"不支持此格式顺序的输入;正确格式:['*姓名', '*本人手机', '性别', "
                                                          u"'出生年月', '民族', '籍贯:省/市', '学校所在地区', '学校全称', "
                                                          u"'监护人姓名', '与学生的关系', '监护人联系电话']"}
    
                    for temp in range(1, nrows):
                        rows = sheet.row_values(temp, start_colx=0, end_colx=None)
    
                        # if rows[13] == "老师":
                        #     rows[13] = "TeacherEnrollment"
                        # elif rows[13] == "学生":
                        #     rows[13] = "StudentEnrollment"
                        # elif rows[13] == "助教":
                        #     rows[13] = "TaEnrollment"
                        # elif rows[13] == "设计者":
                        #     rows[13] = "DesignerEnrollment"
                        # elif rows[13] == "观察者":
                        #     rows[13] = "ObserverEnrollment"
                        # else:
                        #     return {"errcode": 400, "errmsg": u"所写角色不存在,可选:【老师, 学生, 助教, 观察者, 设计者】"}
    
                        if rows[2] == "":
                            rows[2] = 1
                        else:
                            rows[2] = 0
    
                        guardian_dict = [{
                            "guardian_name": rows[8], "relationship_with_student": rows[9],
                            "guardian_tel": str(rows[10]).split(".")[0]}]
    
                        query = {"user[name]": rows[0], "pseudonym[unique_id]": str(rows[1]).split(".")[0],
                                 "user[tel]": str(rows[1]).split(".")[0], "user[gender]": rows[2],
                                 "user[birthdate]": rows[3], "user[nation]": rows[4],
                                 "user[province]": rows[5], "user[school_position]": rows[6], "user[school_name]": rows[7],
                                 "pseudonym[password]": str(rows[1]).split(".")[0], "parent": json.dumps(guardian_dict),
                                 "user[time_zone]": "Beijing", "user[locale]": "zh-Hans",
                                 "user[terms_of_use]": True, "user[college_id]": college_id,
                                 }
    
                        q.put(query)
    
                    res = self._user_batch_import(q, account_id)
            # return {"errcode": 0, "errmsg": res}
            return res
    View Code
        def _user_batch_import(self, q, account_id):
            from flask import request
            from app.api.apis import import_user_phone, import_user_birthdate
    
            failure_list = list()   # 所有错误结果的数据
            failure_dict = dict()   # unique_id错误的字典
            success = list()        # 所有正确结果的数据
            wrong_format = list()   # 所有格式错误的数据
            res_dic = dict()        # return返回的数据
    
            # headers_token = request.headers["Authorization"] if 'Authorization' in request.headers and len(
            #     request.headers["Authorization"].split(" ")) == 2 else ''
            headers_token = AccessToken
            header = {"Authorization": "Bearer %s" % headers_token}
    
            request_query = []
            while q.qsize() > 0:
                query = q.get()
                # res_phone = impot_user_phone(query["pseudonym[unique_id]"])
                res_phone = import_user_phone(query["pseudonym[unique_id]"])
                res_birthdate = import_user_birthdate(query["user[birthdate]"])
                # res_password = impot_user_password(query["pseudonym[password]"])
                # res_id_card = import_user_id_card(query["user[id_card]"])
                # if res_id_card in ["", None]:
                #     res_user_id_card_num = 0
                # else:
                #     res_user_id_card_num = User.query.filter(User.id_card == query["user[id_card]"], User.workflow_state != "deleted").count()
    
                # if isinstance(res_phone, dict) or isinstance(res_password, dict) or isinstance(res_id_card, dict) or res_user_id_card_num != 0:
                # if isinstance(res_phone, dict) or isinstance(res_id_card, dict) or res_user_id_card_num != 0:
                if isinstance(res_phone, dict) or isinstance(res_birthdate, dict):
                    wrong_format.append(query["pseudonym[unique_id]"])
                    query["errcode"] = 1
    
                if "errcode" not in query.keys():
                    request_query.append(query)
    
            # 将loop对象设置为全局对象
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
    
            canvas_url = API_CANVAS_HOST + '/api/v1/accounts/{account_id}/users'.format(account_id=account_id)
            results = loop.run_until_complete(self.url_res(canvas_url, header, request_query))
    
            # 关闭loop对象
            loop.close()
            print("results----------------")
            print(results)
            for result in results:
                print(result)
                print(result.json())
                if result.status_code == 403:
                    return {"errcode": 400, "errmsg": u"无此权限"}
    
                res_json = result.json()
                if "errors" in res_json.keys():
                    try:
                        failure_dict["phone"] = res_json["errors"]["unique_id"]
                        failure_dict["message"] = res_json["errors"]["pseudonym"]["unique_id"][0]["message"]
                        failure_list.append(failure_dict.copy())
                    except Exception as e:
                        failure_list.append({"canvas_errors": res_json, "errmsg_e": str(e)})
                else:
                    success.append(res_json["login_id"])
    
            res_dic["wrong_format"] = wrong_format
            res_dic["success_event"] = success
            res_dic["failure_event"] = failure_list
            return {"errcode": 0, "errmsg": "ok", "data": res_dic}
            # if len(res_dic["wrong_format"]) == 0 and len(res_dic["failure_event"]) == 0:
            #     return {"errcode": 0, "errmsg": "ok"}
            # else:
            #     return {"errcode": 400, "errmsg": "failure", "data": res_dic}
    
        async def do_some_work(self, url, header, query):
            return requests.post(url, headers=header, data=query)
    
        async def url_res(self, url, header, request_query):
            tasks = []
            for query in request_query:
                coroutine = self.do_some_work(url, header, query)
                temp = asyncio.ensure_future(coroutine)
                tasks.append(temp)
            return await asyncio.gather(*tasks)
    View Code
  • 相关阅读:
    Mac下启动Apache
    Mac OS X中配置Apache
    catransition type
    Block
    mysql 复制表结构和表数据的区别 like 和 select
    mysql kill掉所有的锁表的进程 未验证
    MySQL所有函数及操作符
    linux各种复制命令
    Mac mysql 导入导出数据库
    数据库总结
  • 原文地址:https://www.cnblogs.com/rixian/p/11495697.html
Copyright © 2011-2022 走看看