zoukankan      html  css  js  c++  java
  • 赵凡导师并发知识第一次分享观后感

    主要是渗透了解 asyncio  相关概念

    提及部分描述:1.线程(阻塞,非阻塞)

           2.线程之间通信(线程之间的传递(import queue))

              3.进程之间通信(进程之间通信from multiprocessing import Process,Queue  用进程Queue)单独详解

    举例图:

    场景描述:

    数据库的要操作的表的信息为:

         当多个请求都到数据库操作接口程序的时候,针对同一个name的count进行增加或者减少,就要保证操作的同一个时刻只有一个可以去获取count的值并进行update操作,所以我是在这一步增加了锁,因为使用aiohttp写的,所以想要在这里也用了aiorwlock,但是在我测试的过程中发现了,当一个协程获取锁还没释放锁的时候,另外一个协程也获取到锁,下面我是具体的代码

    核心程序:

     

    class CntHandler(object):
    
        def __init__(self, db, loop):
            self.db = db
            self.loop = loop
            self.company_lock = {}
    
        def response(self, request, msg):
            peer = request.transport.get_extra_info('peername')
            logging.info("request url[%s] from[%s]: %s", request.raw_path, peer, msg)
            origin = request.headers.get("Origin")
            if origin is not None:
                headers = {"Access-Control-Allow-Origin": origin, "Access-Control-Allow-Credentials": "true"}
                resp = web.Response(text=util.dictToJson(msg), content_type='application/json', headers=headers)
            else:
                resp = web.Response(text=util.dictToJson(msg), content_type='application/json')
            return resp
    
        async def cnt_set(self, request):
            """
            用于设置company表中的count值
            :param request: 
            :return: 
            """
            post = await request.post()
            logging.info('post %s', post)
            company_name = post.get("company")
            cnt = post.get("cnt")
            sql = "update shield.company set count=%s where name=%s"
            args_values = [cnt, company_name]
            rwlock = self.company_lock.get(company_name, "")
            if not rwlock:
                rwlock = aiorwlock.RWLock(loop=self.loop)
                self.company_lock[company_name] = rwlock
            async with rwlock.writer:
                msg = dict()
                po_sql = "select * from shield.company where name=%s"
                po = await self.db.get(po_sql, company_name)
                if not po:  # 找不到企业
                    logging.error("not found company name [%s]", company_name)
                    msg["code"] = 404
                    msg["code"] = "not found company"
                    return self.response(request, msg)
                res = await self.db.execute(sql, args_values)
                if not isinstance(res, int):
                    logging.error("sql update is err:", res)
                    msg["code"] = 403
                    msg["reason"] = "set fail"
                    return self.response(request, msg)
                logging.info("company [%s] set cnt [%s] is success", company_name, cnt)
                msg["code"] = 200
                msg["reason"] = "ok"
                return self.response(request, msg)
    
        async def cnt_inc(self, request):
            """
            用于增加company表中的count值
            :param request: 
            :return: 
            """
            post = await request.post()
            logging.info('post %s', post)
            company_name = post.get("company")
            cnt = int(post.get("cnt", 0))
            rwlock = self.company_lock.get(company_name, "")
            if not rwlock:
                rwlock = aiorwlock.RWLock(loop=self.loop)
                self.company_lock[company_name] = rwlock
            async with rwlock.writer:
                uuid_s = uuid.uuid1().hex
                logging.debug("[%s]---[%s]", uuid_s, id(rwlock))
                msg = dict()
                sql = "select * from shield.company where name=%s"
                po = await self.db.get(sql, company_name)
                if not po:  # 找不到企业
                    logging.error("not found company name [%s]", company_name)
                    msg["code"] = 404
                    msg["code"] = "not found company"
                    return self.response(request, msg)
                old_cnt = po.get("count")
                po_cnt = int(po.get("count"))
                res = po_cnt + cnt
                update_sql = "update shield.company set count=%s where name=%s"
                args_values = [res, company_name]
                update_res = await self.db.execute(update_sql, args_values)
                if not isinstance(update_res, int):  # 数据库update失败
                    logging.error("sql update is err:", update_res)
                    msg["code"] = 403
                    msg["reason"] = "inc fail"
                    return self.response(request, msg)
                logging.info("uuid [%s] lock [%s] company [%s] inc cnt [%s] old cnt [%s]  true will is [%s] success", uuid_s,id(rwlock), company_name, cnt, old_cnt, res)
                msg["code"] = 200
                msg["reason"] = "ok"
                return self.response(request, msg)
    
        async def cnt_dec(self, request):
            """
            用于减少company表中count的值
            :param request: 
            :return: 
            """
            post = await request.post()
            logging.info('post %s', post)
            company_name = post.get("company")
            cnt = int(post.get("cnt", 0))
            rwlock = self.company_lock.get(company_name, "")
            if not rwlock:
                rwlock = aiorwlock.RWLock(loop=self.loop)
                self.company_lock[company_name] = rwlock
            async with rwlock.writer:
                uuid_s = uuid.uuid1().hex
                logging.debug("[%s]---[%s]", uuid_s, id(rwlock))
                msg = dict()
                sql = "select * from shield.company where name=%s"
                po = await self.db.get(sql, company_name)
                if not po:      # 找不到企业
                    logging.error("not found company name [%s]", company_name)
                    msg["code"] = 404
                    msg["code"] = "not found company"
                    return self.response(request, msg)
                po_cnt = int(po.get("count"))
                old_cnt = po.get("count")
                if po_cnt == 0:
                    logging.error("company [%s] cnt is 0", company_name)
                    msg["code"] = 400
                    msg["reason"] = "cnt is 0"
                    return self.response(request, msg)
                if po_cnt < cnt:  # 数据库余额不足
                    logging.error("company [%s] count is not enough", company_name)
                    msg["code"] = 405
                    msg["reason"] = "count is not enough"
                    return self.response(request, msg)
                res = po_cnt - cnt
                update_sql = "update shield.company set count=%s where name=%s"
                args_values = [res, company_name]
                update_res = await self.db.execute(update_sql, args_values)
                if not isinstance(update_res, int): # 执行update 失败
                    logging.error("sql update is err:", update_res)
                    msg["code"] = 403
                    msg["reason"] = "inc fail"
                    return self.response(request, msg)
                logging.info("uuid [%s] lock [%s] company [%s] dec cnt [%s] old cnt [%s] true will is [%s] success",uuid_s,id(rwlock), company_name, cnt, old_cnt, res)
    
                msg["code"] = 200
                msg["reason"] = "ok"
                return self.response(request, msg)

    上面代码出问题的代码是在增加和减少的时候:

    async with rwlock.writer:

    在一个协程还没有释放锁的时候,另外一个操作也就进来了,到之后在测试并发的时候,对同一个name的count进行操作导致最后的count值不符合的问题

    这个完整的代码地址:https://github.com/pythonsite/test_aiorwlock

  • 相关阅读:
    C#利用反射动态调用类及方法
    系统程序监控软件
    SQL server 2008 安装和远程访问的问题
    sql server 创建临时表
    IIS 时间问题
    windows 2008 安装 sql server 2008
    sql server xml nodes 的使用
    Window 7sp1 安装vs2010 sp1 打开xaml文件崩溃
    CSS资源网址
    Could not load type 'System.ServiceModel.Activation.HttpModule' from assembly 'System.ServiceModel, Version=3.0.0.0
  • 原文地址:https://www.cnblogs.com/cjj-zyj/p/10018743.html
Copyright © 2011-2022 走看看