zoukankan      html  css  js  c++  java
  • Python 操作redis消息队列 多进程消费

    生产端

    import json
    import redis
    
    # 以下代码是向redis 发命令
    QUEUE = "code"  # 队列名称key
    
    # redisPool = redis.ConnectionPool(host=config.get_redis_host(), port=6379, db=config.get_redis_db())
    redisPool = redis.ConnectionPool(host='localhost', port=6379, db=8)
    client = redis.Redis(connection_pool=redisPool)
    
    
    def send_cmd(seaweed):
        json_cmd = json.dumps(seaweed, ensure_ascii=False)
        client.rpush(QUEUE, json_cmd)
    
    
    ll = list(range(100))
    # get_weekend('20180325')})
    if __name__ == "__main__":
        for k in ll:
            send_cmd({"label": k, 'timd': 20160503, 'timm': 20170430})

    消费端多进程消费

    import chardet
    import json
    import multiprocessing
    import redis
    
    # 以下代码是向redis 发命令
    QUEUE = "code"
    # redisPool = redis.ConnectionPool(host=config.get_redis_host(), port=6379, db=config.get_redis_db())
    redisPool = redis.ConnectionPool(host='localhost', port=6379, db=8)
    client = redis.Redis(connection_pool=redisPool)
    
    
    # 以下代码是向redis 取命令,并且采用多进程来实现计算
    def func(a, b, c):
        print(a, b)
    
    
    def worker(pname):
        client = redis.Redis(connection_pool=redisPool)
        # client_ = redis.ConnectionPool(host='localhost', port=6379, db=8)
    
        while True:
    
            # print(client)
            # print(cmd)
            try:
                cmd = client.lpop(QUEUE)
                encode1 = chardet.detect(cmd)["encoding"]
                cmd = cmd.decode(encode1)
            except:
                cmd = None
            if cmd is None:
                return
    
            else:
    
                cmd = format_cmd(cmd)
                try:
                    func(cmd["label"], cmd['timd'], cmd['timm'])
                    # price_fix.update(cmd["city"], cmd["region"], cmd["name"])
                    # print(pname + ":", cmd, "计算成功")
                except Exception as ex:
                    print(ex)
                    print(pname + ":", cmd, "计算失败")
    
    
    def format_cmd(cmd):
        return json.loads(cmd)
    
    
    if __name__ == "__main__":
        # 多进程消费
        pro_num = 5
        pool = multiprocessing.Pool(processes=pro_num)
        for pid in range(1, pro_num):
            pid = "PROC" + str(pid).zfill(3)
            pool.apply_async(worker, (pid,))
        pool.close()
        pool.join()
  • 相关阅读:
    04
    04 : Linux时间戳与日期相互转换
    docker rmi 删除镜像
    docker restart 命令使用
    docker rename 命令使用
    docker pull push命令使用
    docker ps 命令使用
    docker port 命令使用
    docker pause 命令使用
    docker logs 命令使用
  • 原文地址:https://www.cnblogs.com/zhaoyingjie/p/12742860.html
Copyright © 2011-2022 走看看