zoukankan      html  css  js  c++  java
  • 关于python协程中aiorwlock 使用问题

    最近工作中多个项目都开始用asyncio aiohttp aiomysql aioredis ,其实也是更好的用python的协程,但是使用的过程中也是遇到了很多问题,最近遇到的就是

    关于aiorwlock 的问题,在使用中碰到了当多个协程同时来请求锁的时候 在其中一个还没释放锁的时候,另外一个协程也获取到锁,这里进行整理,也希望知道问题你解决方法的,一起讨论一下,正好最近经常用到协程的东西,所以准备建一个群,也欢迎大家一起进来讨论python协程的内容,群号:692953542

    关于场景的描述

    数据库的要操作的表的信息为:

    id name nickname count flag crdate
    1 800100 aa 100 1 2018-11-18 10:07:22
    2 800101 bb 200 1 2018-11-18 10:07:23

    当多个请求都到数据库操作接口程序的时候,针对同一个name的count进行增加或者减少,就要保证操作的同一个时刻只有一个可以去获取count的值并进行update操作,所以我是在这一步增加了锁,因为使用aiohttp写的,所以想要在这里也用了aiorwlock,但是在我测试的过程中发现了,当一个协程获取锁还没释放锁的时候,另外一个协程也获取到锁,下面我是具体的代码

    程序代码

    核心的处理类:

    class CntHandler(object):
    
        def __init__(self, db, loop):
            self.db = db
            self.loop = loop
            self.company_lock = {}
    
        def response(self, request, msg):
            peer = request.transport.get_extra_info('peername')
            logging.info("request url[%s] from[%s]: %s", request.raw_path, peer, msg)
            origin = request.headers.get("Origin")
            if origin is not None:
                headers = {"Access-Control-Allow-Origin": origin, "Access-Control-Allow-Credentials": "true"}
                resp = web.Response(text=util.dictToJson(msg), content_type='application/json', headers=headers)
            else:
                resp = web.Response(text=util.dictToJson(msg), content_type='application/json')
            return resp
    
        async def cnt_set(self, request):
            """
            用于设置company表中的count值
            :param request: 
            :return: 
            """
            post = await request.post()
            logging.info('post %s', post)
            company_name = post.get("company")
            cnt = post.get("cnt")
            sql = "update shield.company set count=%s where name=%s"
            args_values = [cnt, company_name]
            rwlock = self.company_lock.get(company_name, "")
            if not rwlock:
                rwlock = aiorwlock.RWLock(loop=self.loop)
                self.company_lock[company_name] = rwlock
            async with rwlock.writer:
                msg = dict()
                po_sql = "select * from shield.company where name=%s"
                po = await self.db.get(po_sql, company_name)
                if not po:  # 找不到企业
                    logging.error("not found company name [%s]", company_name)
                    msg["code"] = 404
                    msg["code"] = "not found company"
                    return self.response(request, msg)
                res = await self.db.execute(sql, args_values)
                if not isinstance(res, int):
                    logging.error("sql update is err:", res)
                    msg["code"] = 403
                    msg["reason"] = "set fail"
                    return self.response(request, msg)
                logging.info("company [%s] set cnt [%s] is success", company_name, cnt)
                msg["code"] = 200
                msg["reason"] = "ok"
                return self.response(request, msg)
    
        async def cnt_inc(self, request):
            """
            用于增加company表中的count值
            :param request: 
            :return: 
            """
            post = await request.post()
            logging.info('post %s', post)
            company_name = post.get("company")
            cnt = int(post.get("cnt", 0))
            rwlock = self.company_lock.get(company_name, "")
            if not rwlock:
                rwlock = aiorwlock.RWLock(loop=self.loop)
                self.company_lock[company_name] = rwlock
            async with rwlock.writer:
                uuid_s = uuid.uuid1().hex
                logging.debug("[%s]---[%s]", uuid_s, id(rwlock))
                msg = dict()
                sql = "select * from shield.company where name=%s"
                po = await self.db.get(sql, company_name)
                if not po:  # 找不到企业
                    logging.error("not found company name [%s]", company_name)
                    msg["code"] = 404
                    msg["code"] = "not found company"
                    return self.response(request, msg)
                old_cnt = po.get("count")
                po_cnt = int(po.get("count"))
                res = po_cnt + cnt
                update_sql = "update shield.company set count=%s where name=%s"
                args_values = [res, company_name]
                update_res = await self.db.execute(update_sql, args_values)
                if not isinstance(update_res, int):  # 数据库update失败
                    logging.error("sql update is err:", update_res)
                    msg["code"] = 403
                    msg["reason"] = "inc fail"
                    return self.response(request, msg)
                logging.info("uuid [%s] lock [%s] company [%s] inc cnt [%s] old cnt [%s]  true will is [%s] success", uuid_s,id(rwlock), company_name, cnt, old_cnt, res)
                msg["code"] = 200
                msg["reason"] = "ok"
                return self.response(request, msg)
    
        async def cnt_dec(self, request):
            """
            用于减少company表中count的值
            :param request: 
            :return: 
            """
            post = await request.post()
            logging.info('post %s', post)
            company_name = post.get("company")
            cnt = int(post.get("cnt", 0))
            rwlock = self.company_lock.get(company_name, "")
            if not rwlock:
                rwlock = aiorwlock.RWLock(loop=self.loop)
                self.company_lock[company_name] = rwlock
            async with rwlock.writer:
                uuid_s = uuid.uuid1().hex
                logging.debug("[%s]---[%s]", uuid_s, id(rwlock))
                msg = dict()
                sql = "select * from shield.company where name=%s"
                po = await self.db.get(sql, company_name)
                if not po:      # 找不到企业
                    logging.error("not found company name [%s]", company_name)
                    msg["code"] = 404
                    msg["code"] = "not found company"
                    return self.response(request, msg)
                po_cnt = int(po.get("count"))
                old_cnt = po.get("count")
                if po_cnt == 0:
                    logging.error("company [%s] cnt is 0", company_name)
                    msg["code"] = 400
                    msg["reason"] = "cnt is 0"
                    return self.response(request, msg)
                if po_cnt < cnt:  # 数据库余额不足
                    logging.error("company [%s] count is not enough", company_name)
                    msg["code"] = 405
                    msg["reason"] = "count is not enough"
                    return self.response(request, msg)
                res = po_cnt - cnt
                update_sql = "update shield.company set count=%s where name=%s"
                args_values = [res, company_name]
                update_res = await self.db.execute(update_sql, args_values)
                if not isinstance(update_res, int): # 执行update 失败
                    logging.error("sql update is err:", update_res)
                    msg["code"] = 403
                    msg["reason"] = "inc fail"
                    return self.response(request, msg)
                logging.info("uuid [%s] lock [%s] company [%s] dec cnt [%s] old cnt [%s] true will is [%s] success",uuid_s,id(rwlock), company_name, cnt, old_cnt, res)
    
                msg["code"] = 200
                msg["reason"] = "ok"
                return self.response(request, msg)

    上面代码出问题的代码是在增加和减少的时候:

    async with rwlock.writer:

    在一个协程还没有释放锁的时候,另外一个操作也就进来了,到之后我在测试并发的时候,对同一个name的count进行操作导致最后的count值不符合的问题

    可能是我本身代码的问题,或者我哪里处理的不对,欢迎大家一起讨论

    这个完整的代码地址:https://github.com/pythonsite/test_aiorwlock

  • 相关阅读:
    SQLMAP注入教程-11种常见SQLMAP使用方法详解
    VS2012/2013/2015/Visual Studio 2017 关闭单击文件进行预览的功能
    解决 IIS 反向代理ARR URLREWRITE 设置后,不能跨域跳转 return Redirect 问题
    Spring Data JPA one to one 共享主键关联
    JHipster 问题集中
    Spring Data JPA 定义超类
    Spring Data JPA查询关联数据
    maven命名
    maven仓库
    Jackson读取列表
  • 原文地址:https://www.cnblogs.com/zhaof/p/9977179.html
Copyright © 2011-2022 走看看