zoukankan      html  css  js  c++  java
  • IPC生产者与消费者模型加线程

    共享内存方式(IPC进程间通信)

    一个进程在内存中就是对应着一块独立的内存空间

    进程间通信的方式:

    1.管道:只能单向通讯,数据都是二进制

    2.文件:在硬盘上创建共享文件

      优点:数据量无限制

      缺点:传输速度慢

    3.socket:编程复杂度高

    4.共享内存:必须由操作系统来分配

      优点;传输速度快

      缺点:数据量较小

    队列:

      是一种特殊的数据结构,先进先出

    堆栈:先进后出,像叠衣服一样

    扩展:函数嵌套调用时,顺序是先进后出,也称之为函数栈

    from multiprocessing import Queue
    
    q = Queue(3)    # 建队列  不指定maxsize 则没有数量限制
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())   # 1
    q.put(4)    # 如果容量已经满了,在调用put时将进入阻塞状态 直到有人从队列中拿走数据有空位置 才会继续执行
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())    # 如果队列已经空了,在调用get时将进入阻塞状态 直到有人从存储了新的数据到队列中 才会继续
    
    #block 表示是否阻塞 默认是阻塞的   # 当设置为False 并且队列为空时 抛出异常
    q.get(block=True,timeout=2)
    # block 表示是否阻塞 默认是阻塞的   # 当设置为False 并且队列满了时 抛出异常
    # q.put("123",block=False,)
    # timeout 表示阻塞的超时时间 ,超过时间还是没有值或还是没位置则抛出异常  仅在block为True有效
    View Code

    生产者消费者模型:

    生产者:生产数据的一方

    消费者:处理数据的一方

    出现问题的原因:就是因为生产者与消费者之间的供需不平衡

    解决问题:用来双方是耦合,消费者必须等待生产者生产完毕才能够开始处理。

    选择将双方分开来,一方只负责生产,另一方指负责取,需要一个共同的容器作为中间介质。

    import time
    import random
    from multiprocessing import Process,Queue
    def task1(q):
        for i in range(10):
            time.sleep(random.randint(0, 2))
            print("正在烧%s盘菜" % i)
    
            rose = "%s盘菜" % i
            q.put(rose)
    
    
    def task2(q):
        for i in range(10):
            rose = q.get()
            print(rose,"正在消费")
            time.sleep(random.randint(0, 2))
            print(rose,"消费完成")
    
    
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=task1, args=(q,))
        p2 = Process(target=task2, args=(q,))
        p1.start()
        p2.start()
    View Code

    还有问题:

    生产者结束后,消费者还在循环吃,并不知道已经结束了。会在原地一致等待生产者生产数据。

    解决:

    用到了joinablequeue这个模块

    意思:可以被join的队列

    import time
    import random
    from multiprocessing import JoinableQueue, Process
    
    
    def make_hotdog(name, q):
        for i in range(5):
            time.sleep(random.randint(1, 3))
            print("%s生产了热狗%s" % (name, i))
            q.put("%s的%s号热狗" % (name, i))
    
    def eat_hotdog(name, q):
        while True:
            hotdog = q.get()
            time.sleep(random.randint(1, 3))
            print("%s吃掉了%s" % (name, hotdog))
            # 每次处理完成一个数据,必须记录该数据
            q.task_done()
    
    if __name__ == '__main__':
        q = JoinableQueue()
    
        p1 = Process(target=make_hotdog, args=("jerry", q))
        p2 = Process(target=make_hotdog, args=("monkey", q))
        p3 = Process(target=make_hotdog, args=("owen", q))
    
        c1 = Process(target=eat_hotdog, args=("思聪", q))
        c1.daemon = True
        c2 = Process(target=eat_hotdog, args=("健林", q))
        c2.daemon = True
    
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        # 明确商家生产完毕,在明确消费者吃完了,就算结束
        p1.join()
        print("第一家生产完毕")
    
        p2.join()
        print("第二家生产完毕")
    
        p3.join()
        print("第三家生产完毕")
    
        # 消费者吃完了
        q.join()
        print("消费者吃完了")

    多线程:

    什么是线程:线程指的是一条流水线的工作过程。

    线程是cpu最小的执行单位,是操作系统最小的调度单位。

    对比于进程而言,进程只是一个资源单位,为线程在执行过程需要用到的数据提供资源,就好比进程是一个车间,而线程就是车间里面的流水线。

    特点:
    1.每个进程都会至少有一个线程,是由操作系统分配的。

    2.进程内可以有多个线程

    3.一个进程内的多个线程资源是共享的

    4.线程的创建开销远比进程小的多。

    主线程与子线程的区别:

    线程之间是平等的

    2.主线程由操作系统分配,子线程由程序开启

    3.主线程的代码执行完毕,主线程并没有结束而是会等其他子线程代码运行完毕后,线程才会结束。

     开启线程的2种方式(和进程一样的,只是不用写在__main__里面

    # 方式一 直接实例化Thread类
    from threading import Thread
    
    def task():
        print("子线程 is running")
    
    t = Thread(target=task)
    t.start()
    # 执行的顺序不固定,一般来说,开启子线程的速度远远高于继续执行主线程的速度
    print("主线程 is runnning")
    #
    # # 方式二:自定义类继承thread类
    class MyThread(Thread):
        def run(self):
            print("子线程 is running")
    
    t = MyThread()
    t.start()
    print("主线程 is running")
    View Code

    同一个进程之间数据共享

    import time,os
    from threading import Thread
    
    x = 100
    
    def task():
        global x
        x = 10
        print("线程",os.getpid())   # 与主线程的pid相同
    
    
    t = Thread(target=task)
    t.start()
    # 主线程等待子线程执行完毕
    time.sleep(3)
    print(x)   # 10
    print("主线程",os.getpid())

    守护线程:

    与守护进程的区别就是,守护进程在被守护进程结束后会立即结束并不会等其他非守护进程,

    而守护线程则是会等待其他非守护线程结束而结束,当然2个都可以提前结束、

    import time
    from threading import Thread
    
    
    def task1():
        print("子进程1 is running")
        time.sleep(3)
        print("子进程1 is over")
    
    def task2():
        print("子进程2 is running")
        time.sleep(2)
        print("子进程2 is over")
    
    t1 = Thread(target=task1)
    t2 = Thread(target=task2)
    t1.daemon = True
    t1.start()
    t2.start()
    print("主进程 Game Over")

    互斥锁:

    多线程最主要的特征之一就是:一个进程内的多个线程资源共享。

    资源共享就是带来竞争问题,所以要加锁

    from threading import Lock, enumerate,Thread
    
    import time
    
    num = 10
    lock = Lock()
    def task():
    
    for i in range(10):
        t = Thread(target=tas
        global num
        lock.acquire()
        a = num
        time.sleep(0.1)
        num = a - 1
        lock.release()k)
        t.start()
    for t in enumerate()[1:]:
        t.join()
    print(num)
  • 相关阅读:
    Python 描述符(descriptor) 杂记
    Celery 使用简介
    异步任务神器 Celery 简明笔记
    高性能框架gevent和gunicorn在web上的应用及性能测试
    Flask + Gunicorn + Nginx 部署
    Mysql查看最大连接数和修改最大连接数
    配置 influxDB 鉴权及 HTTP API 写数据的方法
    Java 字符串拼接 五种方法的性能比较分析 从执行100次到90万次
    linux端口开放指定端口的两种方法
    java自带的监控工具VisualVM一
  • 原文地址:https://www.cnblogs.com/xinfan1/p/11340441.html
Copyright © 2011-2022 走看看