zoukankan      html  css  js  c++  java
  • python 结合redis 队列 做一个例子

    结合redis 队列 做了一个例子

    #!/usr/bin/env python
    # coding: utf-8
    # @Time    : 2018/12/21 0021 13:57
    # @Site    : 
    # @File    : demos.py
    # @Software: PyCharm
    import MySQLdb
    import redis
    import json
    import os, time
    import threading
    from multiprocessing import Pool, Process
    import os, time, random
    import sys
    
    reload(sys)
    sys.setdefaultencoding('utf8')
    
    
    class InsertData():
        def __init__(self):
            # 去掉一些无用信息
            self.__list_industry = []
            self.__has_many = []
            self.__list_xczx = []
            self.__list_cxcy = []
            self.__list_industry_dict = {'test': self.__list_xczx }
            self.__dict_industry = {'test': 212}
            self.db = MySQLdb.connect(host="127.0.0.1", port=3306, user="root", passwd="123456", db="ww",
                                      charset='utf8')
            redisPool = redis.ConnectionPool(host='localhost', port=6379)
            self.re_queue = redis.Redis(connection_pool=redisPool)
            self.re_queue2 = redis.Redis(connection_pool=redisPool)
    
        def __get_dict_industry(self):
            industry_name = self.__list_industry_dict.keys()
            if len(industry_name) == 1:
                industry_name = str(tuple(industry_name)).replace(",","")
            elif len(industry_name) > 1:
                industry_name = str(tuple(industry_name))
            else:
                return
            sql_industry = "select industry_name,industry_id from zzh_industry where industry_name in {}".format(industry_name)
            cursor3 = self.db.cursor()
            cursor3.execute(sql_industry)
            result_list = cursor3.fetchall()
            for result in result_list:
                self.__dict_industry[result[0]] = result[1]
            cursor3.close()
    
        def inser_industry(self):
            dta = """xx、xxx"""
            data = dta.split("、")
            for index, da in enumerate(data):
                industry_code = 100001 + index
                sqlStr = """insert into xx(industry_name,industry_pid,industry_code,industry_sort,is_lock) VALUES('{industry_name}',211,'{industry_code}',{industry_sort},1) ;""".format(
                    industry_name=da, industry_code=industry_code, industry_sort=index + 1)
                print sqlStr
    
        def put_redis(self):
            cursor = self.db.cursor()
            item_sql = """SELECT item_title,item_id from xxx"""
            cursor.execute(item_sql)
            result_list = cursor.fetchall()
            num = 1
            for result in result_list:
                data = {"itemTitle": result[0], "itemId": result[1]}
                self.re_queue.lpush("item", json.dumps(data))
                num += 1
            print ("put over", num)
    
        def get_redis(self):
            nums = 1
            resultNum = 0
            cursor_get = self.db.cursor()
    
            while True:
                result = self.re_queue.rpop("item")
                if not result:
                    time.sleep(1)
                    if resultNum == 10:
                        break
                    else:
                        print "resultNum", resultNum
                        resultNum += 1
                    continue
                try:
                    resultNum = 0
                    result = json.loads(result)
                    value_list = []
                    for strkey in self.__list_industry_dict.keys():
                        if strkey in self.__has_many:
                            for __strkey in self.__list_industry_dict[strkey]:
                                if __strkey in result["itemTitle"]:
                                    value_list.append(strkey)
                                    break
                        if strkey in result["itemTitle"]:
                            value_list.append(strkey)
                    value_list = set(value_list)
                    item_id = result["itemId"]
                    if value_list:
                        print result["itemTitle"]
                    for value in value_list:
                        nums += 1
                        # select_sql = "select id from zzh_industry_item where item_id={} and industry_id={} limit 1".format(item_id,self.__dict_industry[value])
                        # cursor_get.execute(select_sql)
                        # if cursor_get.fetchone():
                        #     print ("reseat",select_sql)
                        #     continue
                        sql_insert = "insert into zzh_industry_item(item_id,industry_id)values ({item_id},{industry_id})".format(
                            item_id=item_id, industry_id=self.__dict_industry[value])
                        self.re_queue2.lpush("sqls", str(sql_insert))
                except Exception as e:
                    print e
            cursor_get.close()
            print ("put over")
    
        def test(self):
            cursor2 = self.db.cursor()
            count = 0
            breakNum = 0
            num = 0
            try:
                while True:
                    sql = self.re_queue2.rpop("sqls")
                    if sql:
                        num += 1
                        breakNum = 0
                        print sql
                        try:
                            cursor2.execute(sql)
                            if count == 500:
                                self.db.commit()
                                count = 0
                            else:
                                count += 1
                        except Exception as e:
                            print e
                    if not sql:
                        time.sleep(1)
                        if breakNum == 10:
                            break
                        else:
                            print "breakNum", breakNum
                            breakNum += 1
            finally:
                print ("insertSql", num)
                self.db.commit()
                self.db.close()
    
    
    if __name__ == '__main__':
        items = InsertData()
        print('Parent process %s.' % os.getpid())
        t1 = threading.Thread(target=items.put_redis)
        t2 = threading.Thread(target=items.get_redis)
        t3 = threading.Thread(target=items.test)
        t1.start()
        t2.start()
        t3.start()
        t1.join()
        t2.join()
        t3.join()
    
    
  • 相关阅读:
    看完了红米5 Plus发布会,我觉得魅蓝Note6降价降多了
    红米5/红米5 Plus逼出最强魅蓝Note6?降价后已成性价比神机
    java.lang.NoClassDefFoundError: org/apache/ibatis/mapping/DatabaseIdProvider
    mac tree命令
    spring boot集成dubbo
    玩转Spring Boot 集成Dubbo
    Linux下复制粘贴快捷键
    用创业舞动飞扬的青春
    SpringMVC 学习笔记(一) Hello World
    从 Kubernetes 谈容器网络
  • 原文地址:https://www.cnblogs.com/libaibuaidufu/p/10170773.html
Copyright © 2011-2022 走看看