zoukankan      html  css  js  c++  java
  • 进程间通信

    进程间通信

    """
    管道
    队列:先进先出(管道+锁)
    堆栈:先进后出
    """
    from multiprocessing import Queue
    
    
    q = Queue(5)  # 括号内可以传参数 表示的是这个队列的最大存储数
    # 往队列中添加数据
    q.put(1)
    q.put(2)
    # print(q.full())  # 判断队列是否满了
    q.put(3)
    q.put(4)
    q.put(5)
    # print(q.full())
    # q.put(6)  # 当队列满了之后 再放入数据 不会报错 会原地等待 直到队列中有数据被取走(阻塞态)
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())  # 判断队列中的数据是否取完
    print(q.get())
    print(q.get())
    print(q.empty())
    # print(q.get_nowait())  # 取值 没有值不等待直接报错
    # print(q.get())  # 当队列中的数据被取完之后 再次获取 程序会阻塞 直到有人往队列中放入值
    """
    full
    get_nowait
    empty
    都不适用于多进程的情况
    """

    进程间通信IPC机制

    IPC简介

    (1)IPC是Inter-Process Communication的缩写,含义为进程间通信或者跨进程通信,是指两个进程之间进行数据交换的过程。

    from multiprocessing import Process,Queue
    
    def producer(q):
        q.put('hello GF~')
    
    def consumer(q):
        print(q.get())
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=producer,args=(q,))
        c = Process(target=consumer, args=(q,))
        p.start()
        c.start()
    
    
    """
    子进程放数据 主进程获取数据
    两个子进程相互放 取数据
    """

    生产者消费者模型

    生产者:生产数据;消费者:消费数据

    举个例子,生产者是做包子的,而消费者是买包子,当生产做的包子比消费者买的包子要多时就会出来供大于求的情况,而如果消费者要买的包子比生产者生产的多时,则会出现供不应求的情况。在程序中也会出现这样的问题,当生产数据的进程比消费数据的进程快和当消费数据的进程比生产数据的进程快时也会出现和上面买卖包子一样的问题,在程序中这两种情况都会影响程序对数据的处理速度,生产者消费者模型的概念就是为了平衡这个问题。

    #建立一个吃包子模型
    from multiprocessing import Process,Queue,JoinableQueue
    import random
    import time
    
    def producer(name,food,q):
        for i in range(5):
            data = '%s生产力%s%s'%(name,food,i)
            time.sleep(random.random())
            q.put(data)
            print(data)
    
    def consumer(name,q):
        while True:
            data = q.get()
            print('%s吃了%s'%(name,data))
            time.sleep(random.random())
    
    if __name__ == '__main__':
        q = Queue()
    
        p = Process(target=producer,args=('egon','馒头',q))
        p1 = Process(target=producer,args=('tank','生蚝',q))
        c = Process(target=consumer,args=('龙胖',q))
        c1 = Process(target=consumer,args=('jerry',q))
        p.start()
        p1.start()
        c.start()
        c1.start()
    #这个模型的问题在于队列本身的一个问题,当生产者生产完包子后就结束了,但消费者在取完包子之后,则进入了等待的状态,所以被卡死在了get()这一步
    #解决的办法是可以在生产者生产完毕后,往队列再发一个结束信号,这样可以让消费者在接到这个信号后停止循环。
    from multiprocessing import Process,Queue,JoinableQueue
    import random
    import time
    
    def producer(name,food,q):
        for i in range(5):
            data = '%s生产力%s%s'%(name,food,i)
            time.sleep(random.random())
            q.put(data)
            print(data)
    
    def consumer(name,q):
        while True:
            data = q.get()
            if data == None:break
            print('%s吃了%s'%(name,data))
            time.sleep(random.random())
    
    if __name__ == '__main__':
        q = Queue()
    
        p = Process(target=producer,args=('egon','馒头',q))
        p1 = Process(target=producer,args=('tank','生蚝',q))
        c = Process(target=consumer,args=('龙胖',q))
        c1 = Process(target=consumer,args=('jerry',q))
        p.start()
        p1.start()
        c.start()
        c1.start()
        p.join()
        p1.join()
        q.put(None)
        q.put(None)

    但是,上述方式在有多个生产者和多个消费者时会变得很麻烦,于是我们要用到一个新的东西JoinableQueue.

    """
    生产者:生产/制造数据的
    消费者:消费/处理数据的
    例子:做包子的,买包子的
            1.做包子远比买包子的多
            2.做包子的远比包子的少
            供需不平衡的问题
    """
    from multiprocessing import Process,Queue,JoinableQueue
    import random
    import time
    
    
    def producer(name,food,q):
        for i in range(10):
            data = '%s生产了%s%s'%(name,food,i)
            time.sleep(random.random())
            q.put(data)
            print(data)
    
    def consumer(name,q):
        while True:
            data = q.get()
            # if data == None:break
            print('%s吃了%s'%(name,data))
            time.sleep(random.random())
            q.task_done()  # 告诉队列你已经从队列中取出了一个数据 并且处理完毕了
    
    
    
    if __name__ == '__main__':
        q = JoinableQueue()
    
        p = Process(target=producer,args=('大厨egon','馒头',q))
        p1 = Process(target=producer,args=('跟班tank','生蚝',q))
        c = Process(target=consumer,args=('许兆龙',q))
        c1 = Process(target=consumer,args=('吃货jerry',q))
        p.start()
        p1.start()
        c.daemon = True
        c1.daemon = True
        c.start()
        c1.start()
        p.join()
        p1.join()
    
        q.join()  # 等到队列中数据全部取出
        # q.put(None)
        # q.put(None)

    线程(代码运行的过程)

    什么是线程?

    线程就是进程的虚拟单位,都是用来帮助我们形象的描述某种事物。

    进程:资源单位(资源分配的最小单位)

    线程:执行单位(线程CPU调度的最小单位)

    如果将内存比喻为工厂,那么进程就相当于工厂车间,而进程则是车间里的流水线,每个进程都自带一个线程(一般把这个自带的线程称为主线程),线程才是真正的执行单位,进程只是在线程运行过程中提供代码运行所需要的资源。

    为什么要有线程:

    开进程:1.申请内存空间,耗资源

        2."拷贝代码",耗资源

    开线程:一个进程内可以起多个线程,并且线程与线程之间数据是共享的。(开线程的开销远远小于开进程的开销)

    进程是资源分配的最小单位,线程是CPU调度的最小单位,每一个进程中至少有一个线程。

    """
    什么是线程
        进程线程其实都是虚拟单位,都是用来帮助我们形象的描述某种事物
    
        进程:资源单位
        线程:执行单位
            将内存比如成工厂
            那么进程就相当于是工厂里面的车间
            而你的线程就相当于是车间里面的流水线
        ps:每个进程都自带一个线程,线程才是真正的执行单位,进程只是在线程运行过程中
        提供代码运行所需要的资源
    
    
    为什么要有线程
        开进程
            1.申请内存空间  耗资源
            2."拷贝代码"    耗资源
    
        开线程
            一个进程内可以起多个线程,并且线程与线程之间数据是共享的
        ps:开启线程的开销要远远小于开启进程的开销
    
    如何使用线程
    
    
    
    """

    进程和线程的关系 

    线程与进程的区别可以归纳为以下4点:

      1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
      2)通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
      3)调度和切换:线程上下文切换比进程上下文切换要快得多。
      4)在多线程操作系统中,进程不是一个可执行的实体。

     

    创建线程的两种方式

    from threading import Thread
    import time
    
    def task(name):
        print('%s is running'%name)
        time.sleep(3)
        print('%s is over'%name)
    # 开线程不需要在__main__代码块内 但是习惯性的还是写在__main__代码块内
    t = Thread(target=task,args=('egon',))
    t.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
    # 小的代码执行完 线程就已经开启了
    print('')
    
    
    
    from threading import Thread
    import time
    
    class MyThread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name = name
    
        def run(self):
            print('%s is running'%self.name)
            time.sleep(3)
            print('%s is over'%self.name)
    
    t = MyThread('egon')
    t.start()
    print('')

    线程对象及其他方法

    from threading import Thread,current_thread,active_count
    import time
    import os
    
    def task(name,i):
        print('%s is running'%name)
        # print('子current_thread:',current_thread().name)
        # print('子',os.getpid())
        time.sleep(i)
    
        print('%s is over'%name)
    # 开线程不需要在__main__代码块内 但是习惯性的还是写在__main__代码块内
    t = Thread(target=task,args=('egon',1))
    t1 = Thread(target=task,args=('jason',2))
    t.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
    t1.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
    t.join()  # 主线程等待子线程运行完毕
    print('当前正在活跃的线程数',active_count())
    # 小的代码执行完 线程就已经开启了
    print('')
    # print('主current_thread:',current_thread().name)
    # print('主',os.getpid())

    守护线程

    from threading import Thread,current_thread
    import time
    
    
    
    def task(i):
        print(current_thread().name)
        time.sleep(i)
        print('GG')
    # for i in range(3):
    #     t = Thread(target=task,args=(i,))
    #     t.start()
    t = Thread(target=task,args=(1,))
    t.daemon = True
    t.start()
    print('')
    # 主线程运行结束之后需要等待子线程结束才能结束呢?
    """
    主线程的结束也就意味着进程的结束
    主线程必须等待其他非守护线程的结束才能结束
    (意味子线程在运行的时候需要使用进程中的资源,而主线程一旦结束了资源也就销毁了)
    """
    #例子
    from threading import Thread
    from multiprocessing import Process
    import time
    def foo():
        print(123)
        time.sleep(1)  # 睡眠一秒
        print('end123')
    
    def bar():
        print(456)
        time.sleep(3)  # 睡眠三秒
        print('end456')
    
    if __name__ == '__main__':
        t1=Thread(target=foo)
        t2=Thread(target=bar)
        t1.daemon=True  # 当主线程结束后子线程也会结束
        t1.start()
        t2.start()
        print('main------------')
    >>>:123
            main------------
            end123
            end456
    ##运行完毕并不是终止运行了,主线程会等到其它的非守护线程运行结束之后再结束

    线程间通信

    结论,同一进程下,线程共用的是一个进程中的资源。

    from threading import Thread
    
    
    money = 666
    
    def task():
        global money
        money = 999
    
    t = Thread(target=task)
    t.start()
    t.join()
    print(money)

    互斥锁


    和进程中的锁一样,每次只允许一个线程对数据进行操作,抢到锁的可以进去操作,操作完之后会释放锁,之后其他的进程再进行抢锁,然后操作数据。

    from threading import Thread,Lock
    import time
    
    
    n = 100
    
    def task(mutex):
        global  n
        mutex.acquire()
        tmp = n
        time.sleep(0.1)
        n = tmp - 1
        mutex.release()
    
    t_list = []
    mutex = Lock()
    for i in range(100):
        t = Thread(target=task,args=(mutex,))
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(n)

    小例子

    from threading import Thread
    from multiprocessing import Process
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    if __name__ == '__main__':
        t1=Thread(target=foo)
        t2=Thread(target=bar)
        t1.daemon=True
        t1.start()
        t2.start()
        print("main-------")
  • 相关阅读:
    05.Zabbix自动化监控
    k8s容器编排
    docker容器
    第一章·ELKstack介绍及Elasticsearch部署
    第二章·Elasticsearch内部分片及分片处理机制介绍
    第三章·Logstash入门-部署与测试
    第四章·Kibana入门-安装,索引添加及界面功能
    第五章·Logstash深入-日志收集
    第六章·Logstash深入-收集java日志
    第七章·Logstash深入-收集NGINX日志
  • 原文地址:https://www.cnblogs.com/AbrahamChen/p/11341975.html
Copyright © 2011-2022 走看看