zoukankan      html  css  js  c++  java
  • redis实现生产消费模式

    生产消费者模式与python+redis实例运用(基础篇)

    生产者消费者模式:img

    生产者消费者模式,那么必须要有一个消费者(consumer)和一个生产者(producer),设计时候需要考虑的问题:

    • 生产者的速度大于消费者的速度,存储中介中只能容纳一定的数据量

    • 消费者的速度大于生产者的速度

    解决以上问题:加锁或者sleep或者其他方式来解决。

    我们可以让消费者每次取的时候看看存储中介中是否有值,没有值的话就等待一会再取,生产者每次往存储中介中放数据的时候看一下是否快满了,如果快满了也一样睡眠一会再看是否可以放。针对消费者消费过慢的情况,我们可以在代码中开多进程和协程来解决这个问题。

    解决方案一、sleep

    生产者(producer)

    import time
    import redis
    pool = redis.ConnectionPool(host='localhost', port=6379, db=1, decode_responses=True)
    r = redis.Redis(connection_pool=pool)
    
    
    def producer(i):
        length = r.llen("goods2")
        print(length)
        if length > 5000:
            print("长度过大睡一会")
            time.sleep(1)
            product(i)
        else:
            #生产者
            r.lpush("goods2", "good1"+str(i))
            print("加入一个值睡一会")
            # time.sleep(5)
            
            
    if __name__ == '__main__':
        # 此处表示循环10000次,往redis里面放10000次数据
        for i in range(10000):
            product(i)
    producer

    消费者(consumer)

    import time
    import redis
    
    
    pool = redis.ConnectionPool(host='localhost', port=6379, db=1, decode_responses=True)
    r = redis.Redis(connection_pool=pool)
    
    
    def consumer():
        length = r.llen("goods2")
        print(length)
        while length > 0:
            # 消费者
            goods = r.lpop("goods2")
            print(goods)
            if str(goods) == "None":
                print("无值多等等")
                time.sleep(10)
        else:
            print("无值等等")
            time.sleep(10)
            consumer()
    
    
    if __name__ == '__main__':
        consumer()
    consumer

    生产消费者模式与python+redis实例运用(中级篇)

    如果消费者的速度跟不上生产者,会浪费大量的时间去等待。可以从多进程程或者协程去解决这个问题。

    简单模板

    from multiprocessing import Process
    import time
    
    
    def test(i):
        while True:
            print("我是子进程"+str(i))
            time.sleep(2)
    
            if i == 1:
                time.sleep(10)
            print("我是进程" + str(i))
    # print[(x,y) for x in range(10) if x%2 if x>3 for y in range(10) if y > 7 if y != 8]
    
    
    if __name__ == '__main__':
        processes = []
        for i in range(3):
            p = Process(target=test, args=(i,))
            p.start()
            processes.append(p)
    
        print(processes)
        for p in processes:
            p.join()
    多进程模板

    进程1和进程0,2是相对独立的, 上面的就是多进程+消费者模式的雏形 。

    生产者(producer):不变。还是使用基础篇中的producer

    消费者:

    消费者开启多进程:多个进程争抢同一个资源。给资源加锁, redis会话队列上,当某个进程拿资源的时候,redis会话队列加上锁,保证其他进程拿不到这个资源,当这个进程拿完资源后,释放锁,让其他进程去抢占资源。

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    
    import time
    import redis
    from multiprocessing import Process, Lock
    
    # 创建连接池
    pool = redis.ConnectionPool(host='localhost', port=6379,
                                db=1, decode_responses=True)
    r = redis.Redis(connection_pool=pool)
    
    
    def consumer(lock):
        length = r.llen("goods2")
        print(length)
        while True:
            # 获取锁 1号位置
            lock.acquire()
            if length > 0:
                # 3号位置
                goods = r.lpop("goods2")
                # 获得资源释放锁
                lock.release()
                try:
                    data = goods
                    print(data)
                    if str(goods) == "None":
                        print("无值多等等")
                        time.sleep(2)
                except:
                    print("无值等等...")
                    time.sleep(2)
                    consumer(lock)
            else:
                # 2号位置
                print('无值等等...')
                time.sleep(10)
                consumer(lock)
    
    
    if __name__ == "__main__":
        # 创建全局锁
        lock = Lock()
        # 进程列表
        processes = []
        for i in range(10):
            # 创建进程
            p = Process(target=consumer, args=(lock,))
            p.start()
            processes.append(p)
    
        for p in processes:
            p.join()
        print('处理完毕')
    有问题的consumer

    PS: 存在一个问题。就是当先启动消费者时,再启动生产者是会出现死锁的情况。修改过后的消费者如下:

    def consumer(lock):
        while True:
            # 获取锁
            lock.acquire()
            length = r.llen("goods2")
            print(length)
            if length > 0:
                goods = r.lpop("goods2")
                # 获得资源释放锁
                lock.release()
                try:
                    data = goods
                    print(data)
                    if str(goods) == "None":
                        print("无值多等等")
                        time.sleep(2)
                except:
                    print("无值等等...")
                    time.sleep(2)
                    consumer(lock)
            else:
                print('无值等等...')
                time.sleep(10)
                lock.release()
                consumer(lock)
    改正后的consumer
  • 相关阅读:
    2月11日
    亚特兰蒂斯
    080215 晴
    2月9日
    2月6日
    2月10日
    080208 晴(0,50)
    关于春晚
    (15,50)
    恍然大悟
  • 原文地址:https://www.cnblogs.com/854594834-YT/p/14014066.html
Copyright © 2011-2022 走看看