zoukankan      html  css  js  c++  java
  • 生产者消费者模型 线程相关

    知识点补充:
    1、操作系统的调度算法:
            时间片轮转法
            多级反馈队列
        2、同步异步(任务的提交方式):
            同步:提交任务之后原地等待任务的返回结果,才能进行下一步操作
            异步:提交任务之后不等待任务的返回结果,直接进行下一步操作
        3、阻塞非阻塞
            阻塞:程序处于阻塞态
            非阻塞:程序处于运行或就绪态
        4、进程理论:
            进程:正在运行的程序
            多道技术:
                    空间上的复用
                        多个程序共用一套硬件设备
                    时间上的复用
                        CPU来回切换:切换+保存状态
    
                        1、程序占用cpu时间过长会被操作系统强行剥夺走cpu的执行权限
                        2、程序的IO操作指的是:
                            input 网络请求(recv accept) time.sleep 文件读写 print
        5、创建进程的本质:在内存中申请一块独立的内存空间,需要申请内存空间,还需要拷贝代码
    

      

    #1、进程间通信
    #队列:先进先出
    #堆栈:先进后出
    #利用队列实现进程间通信
    
    from multiprocessing import Queue
    q = Queue(5)   #里面的参数代表最多放5个数据
    q.put(1)       #往队列里放数据
    q.put(1)
    q.put(1)
    q.put(1)
    print(q.full())     #判断队列是否放满了,此时显示False,还有一个位置没放数据
    q.put_nowait(2)    #往队列里放数据,
    # q.put_nowait(2)    #当超过队列里允许放置数据个数的最大值以后,系统会报错
    # q.put(1)          #当超过最大值,此时不会报错,只是程序会卡住,不会继续运行
    
    
    q.get()       #从队列里取数据
    q.get()
    q.get()
    print(q.empty())  #当前的队列里面是否还存在数据,此时显示False
    q.get()
    q.get_nowait()    #从队列里取值,因为还存在数据,所以正常取值
    # q.get_nowait()   #因为队列里面的值全部取完了,所以这次系统会报错
    # q.get()          #队列为空,get会在原地等待队列中放置数据
    print(q.empty())  #此时显示True,队列里面所存的数据全部被取出
    

      

    #2、基于队列实现进程间通信
    from multiprocessing import Queue,Process
    
    def producer(q):
        q.put('我是造数据的')
    
    def consumer(q):
        print(q.get())
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer,args=(q,))
        p2 = Process(target=consumer,args=(q,))
        p1.start()
        p2.start()
    

      

    #3、生产者消费者模型
    
    生产者:生成数据
           消费者:处理数据
           解决供需不平衡的问题:
               定义一个队列,用来存放固定数量的数据
               解决生产者和消费者不需直接打交道,双方通过队列实现数据传输的问题
           Queen,JoinableQueue :管道+锁,后者稍微高级一些
    
    
    from multiprocessing import JoinableQueue,Process
    import random,time
    def producer(name,food,q):
        for i in range(5):
            data = '%s拉出了%s,%s'%(name,food,i)
            time.sleep(random.randint(1,3))
            print(data)
            q.put(data)   #将数据放入队列中
    
    def consumer(name,q):
        while True:
            data = q.get()
            time.sleep(random.randint(1,3))
            print('%s消化了%s'%(name,data))
            q.task_done()  #告诉队列,已经将数据取出并消化完毕
    
    if __name__ == '__main__':
        q = JoinableQueue()   #生产一个队列对象
        a1 = Process(target=producer,args=('tank','披萨',q))
        a2 = Process(target=producer,args=('egon','面包',q))
        b1 = Process(target=consumer,args=('owen',q))
        b2 = Process(target=consumer,args=('fuck',q))
        a1.start()
        a2.start()
        b1.daemon = True
        b2.daemon = True
        b1.start()
        b2.start()
        #等待生产者生产完所有的数据
        a1.join()
        a2.join()
        #等待队列里的数据全部取出
        q.join()
        print('主进程')
        
    #可能是因为3.7版本的问题,导致终端输出的时候,多行数据再同一行打印,剩下一些空行???
    

      

    #4、线程
    
    1、什么是线程
        进程:资源单位
        线程:执行单位
        二者的联系:每一个进程中会自带一个线程
    2、线程存在的必要性
        开启一个进程:
            申请内存空间  耗时
            将代码拷贝到申请的内存空间中  耗时
        开启线程:
            不需要申请内存空间
        因此,开启线程的开销远远小于开进程!    
    

      

    5、开启线程的方式
    #方式一: 通过函数
    
    from threading import Thread
    import time
    def task(name):
        print('%s is running'%name)
        time.sleep(1)
        print('%s is over'%name)
    
    if __name__ == '__main__':  #因为线程不需要加载模块,所以这一步操作可有可无
        t = Thread(target=task,args=('michael',))
        t.start()  #开启线程的速度非常快,几乎代码执行完线程就已经开启
        # time.sleep(0.5)
        print('主线程')
    
    
    #方式二: 类的继承
    
    from threading import Thread
    import time
    
    class MyThread(Thread):
        def __init__(self,name):
            #涉及到父类方法的重写,最好先调用super()
            super().__init__()
            self.name = name
        def run(self):
            print('%s is running'%self.name)
            time.sleep(1)
            print('%s is over'%self.name)
    
    t = MyThread('michael')
    t.start()
    print('主线程')
    

      

    #6、线程之间数据共享
    
    from threading import Thread
    import os
    x = 100
    def task():
        global x
        x=99
        print('子线程所对应的进程的pid',os.getpid())  #7280
    t = Thread(target=task)
    t.start()
    t.join() #存在有无,都没有影响
    print(x)   #99
    print('主线程所对应进程的pid',os.getpid())   #7280
    
    #上述的操作可以说明这几点:
        # 1、线程之间共享的是进程的数据
        # 2、线程本身是不存在pid的,存在的是上一级进程的pid
    
    #7、线程互斥锁
    from threading import Thread,Lock
    import time
    import random
    
    
    #下面为没有加锁的,导致所有线程拿到的数据是同一份,没有达到预期的效果
    
    n = 10
    def task():
        global n
        tmp = n
        time.sleep(0.1)  #存在延迟,导致所有线程拿到的n都是10,处理以后所有的返回值都是9
        #若不存在延迟,最后的结果为n=0,  所有的线程都处理了数据,具体过程日后再谈  ????
        n = tmp - 1
    
    t_list = []
    for i in range(10):
        t = Thread(target=task)
        t.start()
        t_list.append(t)
    
    for t in t_list:
        t.join()    #让主线程等待10个子线程结束以后再运行自身
    print(n)  #n = 9
    
    
    
    #加锁
    mutex = Lock()
    n = 10
    def task():
        global n
        mutex.acquire()  #加锁   让线程从并行改变为了串行
        tmp = n
        time.sleep(0.1)
        n = tmp - 1
        mutex.release()  #释放锁   一个线程结束以后,其他的线程开始了竞争
    t_list = []
    for i in range(10):
        t = Thread(target=task)
        t.start()
        t_list.append(t)
    
    for t  in t_list:
        t.join()
    print(n)  #n=0  
    

      

    #8、线程的其他属性与方法
    
    from threading import Thread,active_count,current_thread
    
    def task(name):
        print('%s is running'%name)
        time.sleep(1)
        print('%s is over'%name)
    
    t1 = Thread(target=task,args=('michael',))
    t2 = Thread(target=task,args=('michael',))
    t1.start()
    t2.start()
    print(active_count())  #当前存活的线程数,加上主线程  3
    print(current_thread())  #产生了一个对象
    t1.join()
    t2.join()
    print(active_count())   #回收了两个子线程,还剩下一个主线程   1
    

      

    #9、守护进程
    from threading import Thread,active_count
    import time
    def task(name):
        print('%s is running'%naem)
        time.sleep(0.1)
        print('%s is  over'%name)
    
    t = Thread(target=task,args=('michael',))
    t.daemon = True  #主线程结束,守护线程也会立刻结束,然而在这场景,没什么具体的用处
    t.start()
    print(active_count())  #没有延迟时,理论上为1
    print('主')
    print(active_count()) #没有延迟时,理论上为1   可能是受限于计算机硬件的能力,软件比硬件发展快好多
    

      

      

  • 相关阅读:
    统计中的f检验和t检验的区别
    统计中的t检验
    降维工具箱drtool
    pca主成份分析方法
    经典相关分析,典型关分析, CCA,Canonical Correlation Analysis,多元变量分析,线性组合,相关系数最大化
    博客中插入公式——之在线数学公式生成
    Mathtype常用快捷键
    下标运算符重载
    赋值运算符的重载
    算术和关系运算符重载
  • 原文地址:https://www.cnblogs.com/changwenjun-666/p/10826675.html
Copyright © 2011-2022 走看看