zoukankan      html  css  js  c++  java
  • 并发编程(二)线程

    1.队列的概念

    队列:先进先出

    堆栈:先进后出(塞衣服)

    Python中用Queue()代表队列。需要导入multiprocessing模块

    创建队列

    from multiprocessing import Queue
    q=Queue(5)
    

    队列:管道+锁。数据被取走,就没了

    from multiprocessing import Queue
    q = Queue(5)  # 括号内可以传参数 表示的是这个队列的最大存储数
    # 往队列中添加数据
    q.put(1) 
    q.put(2)
    # print(q.full())  # 判断队列是否满了
    q.put(3)
    q.put(4)
    q.put(5)
    # print(q.full())
    # q.put(6)  # 当队列满了之后 再放入数据 不会报错 会原地等待 直到队列中有数据被取走(阻塞态)
    
    Queue的括号内参数:
    class Queue(object):
    #点开源码这是一个类,不填,默认的数值非常大
    
    full()判断队列是否满了
    q.put(1)
    q.put(2)
    print(q.full())  # 判断队列是否满了,注意他的摆放顺序
    
    put是一个一个添加值.当队列满了一个不报错,进入阻塞太进行等待
    q.put(1)
    q.put(2)
    q.put(3)
    q.put(4)
    q.put(5)
    
    取值get()、get_nowait()

    get()有个特点:等待:当队列中的数据被取完之后 再次获取 程序会阻塞 直到有人往队列中放入值

    get_nowait()顾名思义,只要没值就报错。

    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())  # 判断队列中的数据是否取完
    print(q.get())
    print(q.get())
    print(q.empty())
    print(q.get_nowait())# 取值 没有值不等待直接报错
    

    empty:判断队列中的数据是否取完

    总结:

    full、get_nowait、empty都不适用多进程的情况,因为都是判断当前时间队列有无值

    进程间通信IPC机制(通过队列实现通信!)

    之前讲到进程间数据是分隔的,不在同一内存空间,但是提到过可以使用其他方法进行通信
    该方法就是"队列"

    from multiprocessing import Process,Queue
    
    def producer(q):
        q.put('hello GF~')
    
    def consumer(q):
        print(q.get())
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=producer,args=(q,))
        c = Process(target=consumer, args=(q,))
        p.start()
        c.start()
        
    

    子进程放数据 主进程获取数据
    两个子进程相互放 取数据

    上述程序结果为:hello GF~
    

    消费者生产者模型

    生产者:生产/制造数据的
    消费者:消费/处理数据的
    例子:做包子的,买包子的
    1.做包子远比买包子的多
    2.做包子的远比包子的少
    供需不平衡的问题
    最开始版本

    from multiprocessing import Process,Queue,JoinableQueue
    import random
    import time
    def producer(name,food,q):
        for i in range(10):
            data = '%s生产了%s%s'%(name,food,i)
            time.sleep(random.random())
            q.put(data)
            print(data)
    def consumer(name,q):
        while True:
            data = q.get()
            print('%s吃了%s'%(name,data))
            time.sleep(random.random())
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=producer,args=('大厨egon','馒头',q))
        p1 = Process(target=producer,args=('跟班tank','生蚝',q))
        c = Process(target=consumer,args=('许兆龙',q))
        c1 = Process(target=consumer,args=('吃货jerry',q))
        p.start()
        p1.start()
    
        c.start()
        c1.start()
    

    出现了停不了程序的情况。
    解决1:往队列里面塞一个None,get到一个None就停止
    但是又出现问题,队列:管道+锁。数据被取走,就没了,有两个消费者,因此仍然停止不了。因此要两个q.put(None),如果有多个消费者,需要多个put(None)
    总结:方案不可取

    解决2 :joinableQueue能够等待的队列
    检测队列是否有值(多进程意思是,现在有值将来有没有值),如果没值,通过守护进程,主进程结束,子进程挂掉
    q.join(专门用在多进程,empty只能用在单进程)里面有一个自检的机制能够检测队列中有多少个数据。等待队列数据已经被完全取出

    from multiprocessing import Process,Queue,JoinableQueue
    import random
    import time
    
    def producer(name,food,q):
        for i in range(10):
            data = '%s生产了%s%s'%(name,food,i)
            time.sleep(random.random())
            q.put(data)
            print(data)
    
    def consumer(name,q):
        while True:
            data = q.get()
            if data == None:break
            print('%s吃了%s'%(name,data))
            time.sleep(random.random())
            q.task_done()  # 告诉队列你已经从队列中取出了一个数据 并且处理完毕了
    
    
    
    if __name__ == '__main__':
        q = JoinableQueue()
    
        p = Process(target=producer,args=('大厨egon','馒头',q))
        p1 = Process(target=producer,args=('跟班tank','生蚝',q))
        c = Process(target=consumer,args=('许兆龙',q))
        c1 = Process(target=consumer,args=('吃货jerry',q))
        p.start()
        p1.start()
        c.daemon = True
        c1.daemon = True
        c.start()
        c1.start()
        p.join()
        p1.join()#主进程代码等待子进程运行结束 才继续运行
    
        q.join()  # 等到队列中数据全部取出
    

    创建进程:

    p = Process(target=producer,args=('大厨egon','馒头',q))
    p.start()
    

    线程

    什么是线程?

    进程线程其实都是虚拟单位,都是用来帮助我们形象的描述某种事物

    进程:资源单位
    线程:执行单位
        将内存比如成工厂
        那么进程就相当于是工厂里面的车间
        而你的线程就相当于是车间里面的流水线
    ps:每个进程都自带一个线程,线程才是真正的执行单位,进程只是在线程运行过程中
    提供代码运行所需要的资源
    
    为什么要有线程?
    开进程
        1.申请内存空间  耗资源
        2."拷贝代码"    耗资源
    
    开线程
        一个进程内可以起多个线程,并且线程与线程之间数据是共享的(同一内存空间)
    ps:开启线程的开销要远远小于开启进程的开销
    
    如何使用线程?

    创建线程的两种方式:

    方式一:
    t = Thread(target = task ,args = ('enon',))
    t.start()

    from threading import Thread
    import time
    
    def task(name):
    	print('%s is running'%name)
    	time.sleep(3)
    	print('%s is over'%name)
    
    开线程不需要在__main__代码块内 但是习惯性的还是写在__main__代码块内
    实例化:
    t = Thread(target = task ,args = ('enon',))
    t.start()#告诉操作系统开辟一个线程  线程的开销远远小于进程
    
    打印结果有个特点:
    egon is running
    主
    egon is over
    #egon先出来意味着创建进程的速度要比代码速度快
    
    egon先出来意味着创建进程的速度要比代码速度快

    方式二:

    通过继承类,来创建。
    t = MyThread('egon')
    t.start()

    
    from threading import Thread
    
    class MyThread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            print('%s is running' % self.name)
            time.sleep(3)
            print('%s is over' % self.name)
    
    
    t = MyThread('egon')
    t.start()#p.start()只是告诉计算机创建一个进程。是否先打印主线程由操作系统来决定
    print('主')
    

    特别提醒

    只是告诉计算机创建一个进程。是否先打印主线程由操作系统来决定

    线程对象及其他方法

    验证多个进程是不是在同一个进程下的

    from threading import Thread, current_thread, active_count
    import time
    import os
    
    def task(name):
        print('%s is running '%name)
        print('zicurrent_thread:',current_thread().name)
        print('zi',os.getpid())
        time.sleep(1)
        print('%s is over'%name)
    
    
    t = Thread(target=task,args = ('egon',))
    t.start()
    print('主')
    print('主current_thread:', current_thread().name)
    print('主', os.getpid())
    '''
    egon is running 
    zicurrent_thread: Thread-1
    zi 9144
    主
    主current_thread: MainThread
    主 9144
    egon is over
    '''
    
    
    from threading import Thread,current_thread,active_count
    import time
    import os
    def task(name,i):
        print('%s is running'%name)
        time.sleep(i)
        print('%s is over'%name)
    t = Thread(target=task,args=('egon',1))
    t1 = Thread(target=task,args=('jason',2))
    t.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
    t1.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
    t1.join()  # 主线程等待子线程运行完毕
    print('当前正在活跃的线程数',active_count())
    print('主')
    

    此时的多线程join与最后面的习题一样,关键在于停的哪个线程

    守护线程(线程的守护与进程守护有区别)

    先看守护进程

    主进程守护一个子进程 ,主进程挂了,子进程也就挂了

    p.daemon = True  # 将该进程设置为守护进程   这一句话必须放在start语句之前 否则报错
    p.start()
    time.sleep(0.3)
    print('皇帝jason寿正终寝')
    

    守护线程

    守护线程要想被主线程(线程没有主次之分)结束,必须等待非守护进程的结束,等待的过程中,子进程的代码不会因此停止!

    容易迷惑的小案例

    from threading import Thread
    from multiprocessing import Process
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    if __name__ == '__main__':
        t1=Thread(target=foo)
        t2=Thread(target=bar)
        t1.daemon=True#t1睡的时间比t2短,程序实际上走到t2,发现时间还早,因此去跑t1了
        t1.start()
        t2.start()
        print("main-------")
    

    该题的总结:

    在线程中,主线程必须等待非守护线程结束才能结束,此时,daemon并不是将守护的进程杀死,而是继续代码的运行!!!!,如果没有其他子线程,那么执行主线程就结束了
    

    如果代码换成t2,那么打印结果,那么end456不会打印

    
    
    如果没有守护进程daemon那么正常运行所有程序
    
    ## 线程间通信
    这个案例证明:线程之间数据是互通的。在一个内存空间中
    ```python
    from threading import Thread
    money = 666
    
    def task():
        global money
        money = 999
    t= Thread(target=task)
    t.start()
    t.join()
    print(money)
    

    线程互斥锁

    多个线程同时操作一个数据,此时数据就是不安全的。解决方案:加锁

    from threading import Thread,Lock
    import time
    
    
    n = 100
    
    def task(mutex):
        global  n
        mutex.acquire()
        tmp = n
        time.sleep(0.1)
        n = tmp - 1
        mutex.release()
    
    t_list = []
    mutex = Lock()
    for i in range(100):
        t = Thread(target=task,args=(mutex,))
        t.start()
        t_list.append(t)
    for t in t_list:#
        t.join()#必须加这一句,等待一百个进程运行完毕之后再往下走
    print(n)
    
    

    t.join()#必须加这一句,等待一百个进程运行完毕之后再往下走

  • 相关阅读:
    向net core 3.0进击——多平台项目发布与部署
    Linux配置部署_新手向(五)——Docker的安装与使用
    Ubuntu 18.04使用OpenSSL自签证书(证书支持多IP及多域名,谷歌浏览器无警告)
    学习makefile的一个工程示例
    Centos7下设置ceph 12.2.1 (luminous)dashboard UI监控功能
    一个小例子学习makefile
    Centos7下部署ceph 12.2.1 (luminous)集群及RBD使用
    VMware Centos7 桥接 DHCP无法获得IP
    VMware Centos7 NAT 无法上网的解决方法
    jerasure 2.0译文
  • 原文地址:https://www.cnblogs.com/ZDQ1/p/11341862.html
Copyright © 2011-2022 走看看