zoukankan      html  css  js  c++  java
  • 并发编程 进程基础

    操作系统

    • 多道 、分时、实时

    • 同步异步

      • 同步:一件事情完成后再做另一件事
      • 异步:同时做多件事
    • 阻塞和非阻塞

      • 阻塞:recv,accept,recvfrom
        • 会让整个进程进入阻塞队列
      • 非阻塞:进程只会在就绪和 运行状态中切换
    • 进程三状态:就绪 运行 阻塞

    • 并发并行

      • 并发是包含并行的
      • 并发:宏观上多个程序同时运行,实际是同一时间只运运行了一次
      • 并行:微观上多个程序同时运行
    • 子进程和主进程

      • pid ppid
    • 多并发的tcp服务端

      • import socket
        from multiprocessing import Process
        def communicate(conn):
            while True:
                conn.send("hello".encode("utf-8"))
                print(conn.recv(1024))
        if __name__ == '__main__':
            sk = socket.socket()
            sk.bind(('127.0.0.1',9001))
            sk.listen()
            while True:
                conn,addr = sk.accept()
                Process(target=communicate,args=(conn,)).start()
        
        import socket
        sk = socket.socket()
        sk.connect(('127.0.0.1',9001))
        while True:
            print(sk.recv(1024))
            mv = input(">>>>>>>>>>:").strip()
            sk.send(mv.encode("utf-8"))
        
    • 进程是操作系统中最小的资源分配单位

    • 进程

      • multiprocessing
      • multiprocessing.Process
      • 如何开启一个子进程
    • Process 开启子进程

      • 第二种开启子进程的方式

        • def func(index):
              time.sleep(random.random())
              print('第%s个邮件已经发送完毕'%index)
          if __name__ == '__main__':
              p_lst = []
              for i in range(10):
                  p = Process(target=func,args=(i,))
                  p.start()
                  p_lst.append(p)
              for p in p_lst:
                  p.join()
              print('全部发送完毕')
          
      • join控制子进程

        • #子进程同步,执行完毕后才执行主程序后面的程序
          # import time
          # from multiprocessing import Process
          # def f(name):
          #     print("hello",name)
          #     time.sleep(1)
          # if __name__ == '__main__':
          #     for i in range(5):
          #         p = Process(target=f,args=(i,))
          #         p.start()
          #         p.join()      #阻塞,
          #     print("主进程执行")
          
          #子程序异步执行,执行完了阻塞结束
          import time
          from multiprocessing import Process
          def f(name):
              print("hello",name)
              time.sleep(1)
          if __name__ == '__main__':
              p_list = []
              for i in range(10):
                  p = Process(target=f,args=(i,))
                  p.start()
                  p_list.append(p)
              for i in p_list:
                  i.join()
              print("主进程执行完毕")
          
      • 守护进程 daemon

        • 守护进程会随着主进程代码执行完毕而结束

        • 守护进程内无法再开启子进程,否则会抛出异常

        • 注意:进程之间是相互独立的,主进程代码运行结束,守护进程也会随即终止

        • import time
          from multiprocessing import Process
          def func1():
              count = 1
              while True:
                  time.sleep(0.5)
                  print(count*"*")
                  count += 1
          def func2():
              print("func strat")
              time.sleep(5)
              print("func2 end")
          if __name__ == '__main__':
              p1 = Process(target=func1)
              p1.daemon = True      #定义为守护进程
              p1.start()          #执行
              Process(target=func2).start()
              time.sleep(3)
              print("主进程")
          #输出
          # func strat
          # *
          # **
          # ***
          # ****
          # *****
          # 主进程
          # func2 end
          

          如果主进程执行完毕那么守护进程也会结束,但是其他子进程如果没执行完还会继续执行

      • 作业:在进程之间保证数据安全性

      • from multiprocessing import Process,Lock

      • lock= Lock()实例对象

      • lock.acquire() 取钥匙开门

      • lock.release() 关门放钥匙

      • 例题 模拟抢票

      • import time
        import json
        from multiprocessing import Process,Lock
        def search(person):         #查票
            with open("ticket") as f:   #文件中保存着一个字典{"count":4}
                dic = json.load(f)   #读出文件中的字典
            time.sleep(0.2)
            print("%s查询余票"%person,dic["count"])
        def get_ticket(person):         #抢票
            with open("ticket") as f:
                dic = json.load(f)
            time.sleep(0.2)             #模拟延迟
            if dic["count"] >0:
                print("%s买到票了"%person)
                dic["count"] -= 1
                time.sleep(0.2)
                with open("ticket","w") as f:
                    json.dump(dic,f)    #写回文件
            else:
                print("%s没买到票"%person)
        def ticket(person,lock):
            search(person)
            lock.acquire()      #开门,一次只能进一个
            get_ticket(person)
            lock.release()      #关门
        if __name__ == '__main__':
            lock = Lock()
            for i in range(10):
                p = Process(target=ticket,args=("person%s"%i,lock))
                p.start()
        

        为了保证数据的安全,在异步的情况下,多个进程又可能同时修改同一份数据的时候,需要给这个数据上锁

      • 加锁的作用

        • 降低了程序的效率,让原来能够同时执行的代码编程顺序执行了,异步变同步的过程,保证了数据的安全
    • 同步控制

      • import time
        from multiprocessing import Process,Lock
        def func(num,lock):
            time.sleep(1)
            print("异步执行",num)
            lock.acquire()
            time.sleep(0.5)
            print("同步执行",num)
            lock.release()      #同步执行是依次执行,间隔0.5秒
        if __name__ == '__main__':
            lock = Lock()
            for i in range(10):
                p = Process(target=func,args=(i,lock))
                p.start()
        
    • 信号量 机制:计数器+锁实现的 Semaphore

      • 主程序控制一定数量的子程序同时执行,这些数量的子程序执行完一个就会有下一个子程序补充进来

      • import time
        import random
        from multiprocessing import Process,Semaphore
        def ktv(person,sem):
            sem.acquire()       #进
            print("%s走进KTV"%person)
            time.sleep(random.randint(1,3))     #随机延迟一到三秒
            print("%s走出ktv"%person)
            sem.release()       #出
        if __name__ == '__main__':
            sem = Semaphore(4)      #信号量为4,默认为1
            for i in range(10):
                Process(target=ktv,args=(i,sem)).start()
        
    • 事件 Event

      • 阻塞事件 wait() 方法

        • wait 是否阻塞是看event对象你不的一个属性
      • 控制这个属性的值

        • set()将这个属性的值改成True

        • clear() 将这个属性的值改成False

        • is_set() 判断当前属性是否为True

        • #模拟红绿灯,只有全部车通过后才停止
          import time
          import random
          from multiprocessing import Process,Event
          def traffic_light(e):
              print("红灯亮")
              while True:
                  if e.is_set():
                      time.sleep(2)
                      print("红灯亮")
                      e.clear()
                  else:
                      time.sleep(2)
                      print("绿灯亮")
                      e.set()
          def car(e,i):
              if not e.is_set():
                  print("car%s在等待"%i)
                  e.wait()
              print("car%s通过了"%i)
          if __name__ == '__main__':
              e = Event()
              p = Process(target=traffic_light,args=(e,))
              p.daemon =True    #变成守护进程
              p.start()
              p_list = []
              for i in range(10):
                  time.sleep(random.randrange(0,3,2))
                  p = Process(target=car,args=(e,i))
                  p.start()
                  p_list.append(p)
              for p in p_list:p.join()
          
    • 进程之间的通信(IPC)
      • 多个进程之间有一些固定的通信内容

      • socket给予文件家族通信

      • 进程之间虽然内存不共享,但是可以通信,

      • 进程队列 Queue
        • 进程之间数据是安全的,先进先出
      • 队列是基于管道 + 锁 实现的

      • 管道(Pipe)是基于socket,pickle实现的

      • def consume(q):
            print('son-->',q.get())
            q.put('abc')
        if __name__ == '__main__':
            q = Queue()
            p = Process(target=consume,args=(q,))
            p.start()
            q.put({'123':123})
            p.join()
            print('Foo-->',q.get())
        
      • 简单的生产消费模型

        def consume(q):
            print('son-->',q.get())
            q.put('abc')
        if __name__ == '__main__':
            q = Queue()
            p = Process(target=consume,args=(q,))
            p.start()
            q.put({'123':123})
            p.join()
            print('Foo-->',q.get())
        
      • 相同的原理 JoinableQueue

        • task_done 通知队列已经有一个数据被处理了

        • q.join() 阻塞直到放入队列中所有的数据都被处理掉(有多少个数据就接受到多少taskdone)

        • import time
          import random
          from multiprocessing import Process,JoinableQueue
          def consumer(q,name):
              while True:
                  food = q.get()
                  time.sleep(random.uniform(0.3,0.8))
                  print("%s吃了一个%s"%(name,food))
                  q.task_done()
          def producer(q,name,food):
              for i in range(10):
                  time.sleep(random.uniform(0.3,0.8))
                  print("%s生产了%s%s"%(name,food,i))
                  q.put(food+str(i))
          if __name__ == '__main__':
              jq = JoinableQueue()
              c1 = Process(target=consumer,args=(jq,"alex"))
              c1.daemon = True
              p1 = Process(target=producer,args=(jq,"libai","包子"))
              c1.start()
              p1.start()
              p1.join()
              jq.join()
          
    • 管道 进程之间数据不安全 且存取数据复杂

    • 进程池(Pool) multiprocessing起进程池

      • 进程池开启的个数:默认是CPU的个数

      • 开启过多的进程并不能提高你的效率,反而会降低效率

      • 计算密集型 充分占用CPU 多进程可以充分利用多核 适合开启多进程,但是不适合开启很多多进程

      • IO密集型 大部分时间都在阻塞队列,而不是在运行状态 根本不太适合开启多进程

      • 提交任务:

        • 1.同步提交 apply

          • 返回值:子进程对应函数的返回值

          • 一个一个顺序执行的,并没有任何的并发效果

          • # import os
            # import time
            # from multiprocessing import Process,Pool
            # def task(num):
            #     time.sleep(0.5)
            #     print("%s: %s"%(num,os.getpid()))
            #     return num ** 2
            # if __name__ == '__main__':
            #     p = Pool(4)
            #     for i in range(20):
            #         res = p.apply(task,args=(i,)) #apply   提交任务方法,同步提交
            #         print("--->",res)
            #四个任务依次执行,轮换
            
        • 2.异步提交 apply_async

          • 没有返回值,要想所有任务能够顺利的执行完毕
            • p.close()
            • p.join() 必须先close在join,阻塞直到进程池中所有任务都执行完毕
          • 有返回值的情况下
            • res.get() #get不能再提交任务之后立刻执行,应该是先提交所有的任务再通过get获取结果
      - ```Python
        import os
        import time
        from multiprocessing import Pool
        def task(num):
            time.sleep(1)
            print("%s: %s"%(num,os.getpid()))
            return num **2
        if __name__ == '__main__':
            p = Pool(4)
            for i in range(20):
                res = p.apply_async(task,args=(i,))     #apply_async   异步提交
            p.close()
            p.join()
        #输出结果同时四个认识执行
        ```
    
    - 3.map()方法
    
      - 异步提交的简化版本
      - 自带close和join方法
      - 直接拿到返回值的可迭代对象
      - 循环可以拿到返回值
    
    • 数据共享 Manager

      • 把所有实现了数据共享的比较便捷的类都重新又封装了一遍,并且在原有的multiprocessing基础上

      • 支持的数据类型有限

      • list dict都不是安全的数据,你需要自己加锁来保证数据的安全

      • from multiprocessing import Manager,Process,Lock
        def work(d,lock):
            with lock:
                d["count"] -= 1
        if __name__ == '__main__':
            lock = Lock()
            with Manager() as m:            #使用之后数据就会变成共享
                dic = m.dict({"count":100})
                p_l = []
                for i in range(100):
                    p = Process(target=work,args=(dic,lock))
                    p_l.append(p)
                    p.start()
                for p in p_l:
                    p.join()
                print(dic)
        
    • 进程池-----回调函数

      • 当func执行完毕后执行callback函数

      • func的返回值作为callback的参数

      • 回调函数是在主进程实现的

      • 子进程有大量的计算要去做,回调函数等待结果做简单处理

      • import os
        from multiprocessing import Pool
        def func(i):
            print("第一个任务",os.getpid())      
            return "*"*i
        def call_back(res):
            print("回调函数",os.getpid())       ##pid号为11420  与主进程pid号相同
            print("res---->",res)
        if __name__ == '__main__':
            p = Pool()
            print("主进程",os.getpid())        #pid为11420   说明回调函数是主进程实现的
            p.apply_async(func,args=(1,),callback=call_back)
            p.close()
            p.join()
        
        #基于多进程的共享数据的小爬虫
        import re
        from urllib.request import urlopen
        url_lst = [
            'http://www.baidu.com',
            'http://www.sohu.com',
            'http://www.sogou.com',
            'http://www.4399.com',
            'http://www.cnblogs.com',
        ]
        from multiprocessing import Pool
        def get_url(url):
            response = urlopen(url)
            ret = re.search("www.(.*?).com",url)
        
            print("%s finished"%ret.group(1),ret.group())
            return ret.group(1),response.read()
        
        def call(content):
            url,con = content
            with open(url+".html","wb") as f:
                f.write(con)
                
        if __name__ == '__main__':
            p = Pool()
            for url in url_lst:
                p.apply_async(get_url,args=(url,),callback=call)
            p.close()
            p.join()
        
  • 相关阅读:
    电脑缺少网卡驱动不能上网
    eclipse的package, folder, source folder 异同以及相互转化
    向数据库表插入查询的数据
    更换项目jdk版本
    linux安装jdk(非rpm命令)
    主机ping不同虚拟机
    如何实现VoIP中大并发应用
    简单设置几个参数让你的电脑无人可染指(只有你能用)
    aliyun阿里云Maven仓库地址——加速你的maven构建
    nodejs持续学习--必须关注4网站
  • 原文地址:https://www.cnblogs.com/yuncong/p/9683881.html
Copyright © 2011-2022 走看看