zoukankan      html  css  js  c++  java
  • 并发编程

    计算机基础

    io操作:

    复制代码
     i input  输入 相对内存 进入内存
        # read
        # recv
        # input 
        # load
    o output 输出 相对内存 从内存出去
        # write
        # send
        # dump
    复制代码

    多道操作系统:

    • 提高了cpu的利用率
    • 第一次出现了一个概念:任务状态的保存
    • 数据的隔离的概念(由于同时在执行的多个程序之间的数据不能混为一谈)

    单处理机系统中的多道程序运行是的特点:

    • 多道:计算机内存中同时存放几道相互独立的程序
    • 宏观上并行:同时进入系统的几道相互独立的程序
    • 微观上串行:实际上,各道程序轮流的用cpu,并交替运行

    进程:

    • 是计算机中资源分配的最小单位
    • 并且进程与进程之间的数据是隔离的

     

     

     

    同步和异步:

    • 同步:有几件事情,先做一件,做完一件在做一件
    • 异步:有几件事情,同时完成
    • 阻塞:程序由于不符合某个条件或者要等待某个条件满足在一个某一个地方进入等待状态
    • 非阻塞:所有不阻塞的程序
    • 如果想让阻塞的程序不阻塞,就调用setblocking()

    同步阻塞非阻塞和异步阻塞非阻塞:

     View Code

    进程模块

    进程id:

    • os.getpid()  ---->获取当前进程号
    • os.getppid() ----->获取当期父进程号

    在python代码中开启子进程:

     开启子进程

    进程中的一些操作:

    • 子进程对于主进程中的全局变量的修改是不生效的
    • join的功能就是阻塞  只有一个条件是能够让我继续执行 这个条件就是子进程结束

     join操作的扩展:

    • join(timeout = 时间)  如果没有写这个参数 join会一直阻塞到子进程结束 如果设置的超市时间,那么意味着如果不足时间(你自己写的)子进程结束了,程序结束阻塞,如果超过时间还没有结束,那么也结束阻塞
    • terminate() 也可以强制结束一个子进程

    守护进程:

    • 设置子进程为守护进程,守护进程会随着主进程的代码结束而结束
    • 由于主进程要负责给所有的子进程收尸,所以主进程必须是最后结束,守护进程只能在主进程的代码结束之后就认为主进程结束了
    • 守护进程在主进程的代码结束之后就结束了,不会等待其他子进程结束
    • 守护进程会等待主进程的代码结束而结束,不会等待其他子进程的结束
    • 要想守护进程等待其他子进程,只需要在主进程中加上join

    操作多个子进程的结束和join阻塞:

    复制代码
    # lst = []
    # for i in range(n):
    #     p = Process()
    #     lst.append(p)
    #     p.start()
    # for p in lst:
        # p.join()
        # p.terminate()
    复制代码

    开启子进程的另一种方式:

    复制代码
    import os
    from multiprocessing import Process
    class Myprocess(Process):
        def __init__(self,参数):
            super().__init__()
            self.一个属性 = 参数
        def run(self):
            print(子进程中要执行的代码)
    if __name__ == '__main__':
        conn = "一个链接"
        mp = Myprocess(conn)
        mp.start()
    复制代码

    锁的概念:

    复制代码
    import json
    import time
    from multiprocessing import Process,Lock
    
    def search(name):
        '''查询余票的功能'''
        with open('ticket') as f:
            dic = json.load(f)
            print(name , dic['count'])
    
    def buy(name):
        with open('ticket') as f:
            dic = json.load(f)
        time.sleep(0.1)
        if dic['count'] > 0:
            print(name,'买到票了')
            dic['count'] -=1
        time.sleep(0.1)
        with open('ticket','w') as f:
            json.dump(dic,f)
    
    def get_ticket(name,lock):
        search(name)
        lock.acquire()  # 只有第一个到达的进程才能获取锁,剩下的其他人都需要在这里阻塞
        buy(name)
        lock.release()  # 有一个人还锁,会有一个人再结束阻塞拿到钥匙
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            p = Process(target=get_ticket,args=('name%s'%i,lock))
            p.start()
    复制代码

     信号量:

    • 对于锁  保证一段代码同一时刻只能有一个进程执行
    • 对于信号量 保证一段代码同一时刻只能有n个进程执行
    • 信号量 :锁+计数器实现的
     关于信号量的事例

    事件:

    • Event  事件类
    • 事件本身就带着标识:False
    • wait 阻塞 它的阻塞条件是 对象标识为False 结束阻塞条件是True
    • 与对象标识相关的
      • set 将对象的标识设置为True
      • clear 将对象的标识设置为False
      • is_set 查看对象的标识是否为True

     

    复制代码
    # import time
    # import random
    # from multiprocessing import Event,Process
    # def traffic_light(e):
    #     print('33[1;31m红灯亮33[0m')
    #     while True:
    #         time.sleep(2)
    #         if e.is_set():   # 如果当前是绿灯
    #             print('33[1;31m红灯亮33[0m') # 先打印红灯亮
    #             e.clear()                        # 再把灯改成红色
    #         else :           # 当前是红灯
    #             print('33[1;32m绿灯亮33[0m') # 先打印绿灯亮
    #             e.set()                          # 再把灯变绿色
    #
    # def car(e,carname):
    #     if not e.is_set():
    #         print('%s正在等待通过'%carname)
    #         e.wait()
    #     print('%s正在通过'%carname)
    #
    # if __name__ == '__main__':
    #     e = Event()
    #     p = Process(target=traffic_light,args = (e,))
    #     p.start()
    #     for i in range(100):
    #         time.sleep(random.randrange(0,3))
    #         p = Process(target=car, args=(e,'car%s'%i))
    #         p.start()
    复制代码

    进程之间的通信(IPC):

    • 进程间的通信 Inter-Process Communication
    • 实现进程之间通信的两种机制: 1:管道 Pipe 2.队列:Queue  
    复制代码
    from multiprocessing import Queue,Process
    def  consumer(q):
        print('子进程':,q.get())
    if __name__ == '__main__':
        q = Queue()
        p = Process(target = consumer,args = (q,))
        p.start()
        q.put("hello,world")
    复制代码

    生产者消费者模型:

    复制代码
    import time
    from multiprocessing import Queue,Process
    
    def producer(name,food,num,q):
        '''生产者'''
        for i in range(num):
            time.sleep(0.3)
            foodi = food + str(i)
            print('%s生产了%s'%(name,foodi))
            q.put(foodi)
    
    def consumer(name,q):
        while True:
            food = q.get()   # 等待接收数据
            if food == None:break
            print('%s吃了%s'%(name,food))
            time.sleep(1)
    
    if __name__ == '__main__':
        q = Queue(maxsize=10)
        p1 = Process(target=producer,args = ('宝元','泔水',20,q))
        p2 = Process(target=producer,args = ('战山','鱼刺',10,q))
        c1 = Process(target=consumer, args=('alex', q))
        c2 = Process(target=consumer, args=('wusir', q))
        p1.start()   # 开始生产
        p2.start()   # 开始生产
        c1.start()
        c2.start()
        p1.join()    # 生产者结束生产了
        p2.join()    # 生产者结束生产了
        q.put(None)  # put None 操作永远放在所有的生产者结束生产之后
        q.put(None) 
    复制代码

    joinablequeue实现的生产者消费者模型:

    复制代码
    import  time
    from multiprocessing import JoinableQueue,Process
    
    def consumer(name,q):
        while True:
            food = q.get()
            time.sleep(1)
            print('%s消费了%s'%(name,food))
            q.task_done()
    
    def producer(name,food,num,q):
        '''生产者'''
        for i in range(num):
            time.sleep(0.3)
            foodi = food + str(i)
            print('%s生产了%s'%(name,foodi))
            q.put(foodi)
        q.join()   # 消费者消费完毕之后会结束阻塞
    if __name__ == '__main__':
        q = JoinableQueue()
        p1 = Process(target=producer, args=('宝元', '泔水', 20, q))
        c1 = Process(target=consumer, args=('alex', q))
        c2 = Process(target=consumer, args=('wusir', q))
        c1.daemon = True
        c2.daemon = True
        p1.start()
        c1.start()
        c2.start()
        p1.join()
    复制代码

    进程之间的数据共享:

    复制代码
    from multiprocessing import Manager,Process,Lock
    def work(d,lock):
        # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
        #     d['count']-=1
        lock.acquire()
        d['count'] -= 1
        lock.release()
    
    if __name__ == '__main__':
        lock=Lock()
        m = Manager()
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
    复制代码
    • Manager是一个类 内部有一些数据类型能够实现数据类型能够实现进程之间的数据共享dict,list这样的数据 内部的数字进行自加 自减 是会引起数据不安全的,这种情况下需要我们手动加锁完成 因此 我们一般情况下 不适用这种方式来进行进程之间的通信 我们宁可使用Queue队列或者其他消息中间件来实现消息的传递 保证数据的安全性

    进程池:

    • 为什么要使用进程池

      • 任务很多 cpu个数*五个任务以上

      • 为了节省创建和销毁进程的时间和操作系统的资源
    • 一般进程池中进程的个数:

      • cpu的1-2倍
      • 如果是高计算,完全没有io和,那么就用cpu的个数
      • 随着io操作越多,可能池中的进程个数也可以相应增加 
  • 相关阅读:
    dp,路径保存,最大公共上升子序列——ZOJ
    简单dp——HDU
    扫描线,线段树,离散化——HDU
    c++版本的opencv3.4.1里分类器输出值异常
    win10家庭版openssh连接远程服务器显示connetion reset
    生活感悟一
    Teradata简介
    kettle 无法正常启动问题
    kettle实现同构单表增量同步
    kettle数据库连接使用变量
  • 原文地址:https://www.cnblogs.com/hard-up/p/10099287.html
Copyright © 2011-2022 走看看