zoukankan      html  css  js  c++  java
  • python并发编程之进程

    python并发编程之进程

    part 1:

    操作系统基础--I/O操作

    I/O操作:相对内存来说

    • 输入Input输出Output
    • 输入是怎么输入: 键盘input ead ecv
    • 输出是怎么输出: 显示器打印机播放音乐printwritesend
    • 文件操作: read write
    • 网络操作: send recv recvfrom
    • 函数:print input

    计算机的工作分为两个状态

    • CPU工作:做计算(对内存中的数据进行操作)的时候工作
    • CPU不工作:IO操作的时候
    • CPU的工作效率500000条指令/ms

    多道操作系统/分时操作系统

    多道操作系统:一个程序遇到IO就把CPU让给别人

    • 顺序的一个一个执行的思路变成
    • 共同存在在一台计算机中,其中一个程序执行让出cpu之后,另一个程序能继续使用cpu
    • 来提高cpu的利用率+
    • 单纯的切换会不会占用时间:会
    • 但是多道操作系统的原理整体上还是节省了时间,提高了CPU的利用率
      时空复用的概念

    分时操作系统:

    • 单cpu分时操作系统:把时间分成很小很小的段,每一个时间都是一个时间片,
    • 每一个程序轮流执行一个时间片的时间,自己的时间片到了就轮到下一个程序执行--时间片的轮转
    • 没有提高CPU的利用率提高了用户体验
    例子:
        老教授24h全是计算没有io
    	  先来先服务FCFS
    	  研究生5min全是计算没有io
    	  短作业优先
        研究生25min全是计算没有io
    

    并发概念

    • 进程:进行中的程序就是一个进程
      • 占用资源需要操作系统调度
      • pid:能够唯一标识一个进程
      • 计算机中最小的资源分配单位
    • 线程: 首先由进程开辟空间等,线程是执行具体的代码
      • 线程是进程中的一个单位,不能脱离进程存在
      • 线程是计算机中能够被CPU调度的最小单位
    • 并发:
      • 多个程序同时执行:只有一个cpu,多个程序轮流在一个cpu上执行
      • 宏观上:多个程序在同时执行
      • 微观上:多个程序轮流在一个cpu上执行本质上还是串行
    • 并行:
      • 多个程序同时执行,并且同时在多个CPU上执行(能不能利用多核)
    • 同步:
      • 在做A事情的时候发起B件事,必须等待B件事情结束之后才能继续做A件事情
    • 异步:
      • 在做A事的时候发起B时间,不需要等待B事件结束就可以继续A事件
    • 阻塞:
      • 如果CPU不工作 input accept recv recv from sleep connect
    • 非阻塞:
      • CPU工作

    part 2:

    进程的概念

    • 进程的三状态图:
    • windows操作系统下
    • Linux系统操作下:

    multiprocessing模块:

    • 为什么要用if__name__=='main'?
    • 能不能给子进程传递参数?能
    • 能不能获取子进程的返回值?不能
    • 能不能同时开启多个子进程?可以
    • 同步阻塞异步非阻塞
    • 同步阻塞:join()
    • 异步非阻塞:start()
    from  multiprocessing  import  Process
    
    import  os
    import  time
    def  func(name,age):
    	print(f'{name}start')
    	time.sleep(1)
    	print(os.getpid(),os.getppid(),name,age)#pid(进程id):processidppid(父进程id):parentprocessid
    #print('123')
    if  __name__=='__main__':
    	#只会在主进程中执行的所有的代码写在name=main下
    	print('main:',os.getpid(),os.getppid())
    	arg_lst=[('alex',84),('taibai',73),('wusir',96)]
    	for  arg  in  arg_lst:
    		p=Process(target=func,args=arg)#可以给子进程传递参数
    		p.start()#开启子进程,异步非阻塞
    	#p=Process(target=func,args=('haha',23))#可以开启多个子进程
    #p.start()
    

    join()的用法:

    from  multiprocessing  import  Process
    import  os
    import  time
    def  func(name,age):
    	print(f'发送一封邮件给{age}的{name}')
    	time.sleep(1)
    	print('发送完毕')
    if  __name__=="__main__":
    	arg_lst=[('alex',84),('taibai',73),('wusir',96)]
    	p_lst=[]
    	for  arg  in  arg_lst:
    		p=Process(target=func,args=arg)
    		p.start()
    		p_lst.append(p)
    	for  p  in  p_lst:
    		p.join()#阻塞:直到p这个进程执行完毕才继续执行代码
    print('所有邮件已经发送完毕')#要求:这句话最后打印,并且要求上面的能够异步执行
    

    多进程之间的数据是否隔离

    from  multiprocessing  import  Process
    n=0
    def  func():
    	global  n
    	n  +=  1
    
    if  __name__=='__main__':
    	p_l=[]
    	for  i  in  range(50):
    		p=Process(target=func)
    		p.start()
    		p_l.append(p)
    	for  p  in  p_l:
    		p.join()
    print(n)#0对主进程进行了隔离
    

    使用多进程实现一个并发的socketserver

    1. 服务器端:
    import  socket
    from  multiprocessing  import  Process
    
    def  talk(conn):
    	while  True:
    		msg=conn.recv(1024).decode('utf-8')
    		ret=msg.upper().encode('utf-8')
    		conn.send(ret)
    	conn.close()
    
    if  __name__=='__main__':
    	sk = socket.socket()
    	sk.bind(('127.0.0.1',9001))
    	sk.listen()
    	while  True:
    		conn,addr=sk.accept()
    		Process(target=talk,args=(conn,)).start()# args为元组
    sk.close()
    
    1. 客户端:
    import  socket
    import  time
    sk=socket.socket()
    sk.connect(('127.0.0.1',9001))
    while  True:
    	sk.send(b'hello')
    	msg = sk.recv(1024).decode('utf-8')
    	print(msg)
    	time.sleep(0.5)
    
    sk.close()
    

    part 3:

    开启进程的例外一种方式

    • 面向对象的方法,通过继承和重写run方法完成了启动子进程
    • 通过重写init和调用父类的inin完成了给子进程传参数
    import  os
    import  time
    from  multiprocessing  import  Process
    
    class  MyProcess(Process):
    	def  __init__(self,a,b,c):#如果需要传参数,需要写__init__方法
    		self.a=a
    		self.b=b
    		self.c=c
    		super().__init__()#调用上级的__init__方法
    	def  run(self):#实现run的同名方法
    		time.sleep(1)
    		print(os.getppid(),os.getpid(),self.a)
    
    
    if  __name__=='__main__':
    	print('主进程-->',os.getpid())
    	for  i  in  range(10):
    		p=MyProcess(1,2,3)#传参数
    p.start()
    

    Process类的其他方法

    name  pid  ident  daemon(在start之前设置)
    terminate()   is_alive()
    
    
    import  os
    import  time
    from  multiprocessing  import  Process
    
    class  MyProcess(Process):
    	def  __init__(self,a,b,c):#如果需要传参数,需要写__init__方法
    		self.a=a
    		self.b=b
    		self.c=c
    		super().__init__()#调用上级的__init__方法
    	def  run(self):#实现run的同名方法
    		time.sleep(1)
    		print(os.getppid(),os.getpid(),self.a)
    
    
    if  __name__=='__main__':
    	p=MyProcess(1,2,3)
    	p.start()
    	print(p.pid,  p.ident)#查看子进程的pid  22672    22672(同)
    	print(p.name)#查看子进程的名字
    	print(p.is_alive())#查看进程是否活着
    	p.terminate()#强制结束一个子进程异步非阻塞
    	print(p.is_alive())  #True?因为操作系统响应需要一定时间
    	time.sleep(0.01)
    print(p.is_alive())
    

    守护进程

    • 守护进程
      • 在start一个进程之前设置daemon=True
      • 守护进程会等待主进程的代码结束就立即结束
        • 为什么守护进程只守护主进程的代码?而不是等主进程结束之后才结束
        • 因为主进程要最后结束,为了给守护进程回收资源
        • 守护进程会等待其他子进程结束么?不会
    import  time
    from  multiprocessing  import  Process
    
    def  son1():
    	while  True:
    		print('--->inson1')
    		time.sleep(1)
    
    
    def  son2():#10s
    	for  i  in  range(10):
    		print('inson2')
    		time.sleep(1)
    
    
    if  __name__=='__main__':#3s
    	p1=Process(target=son1)
    	p1.daemon=True#表示设置p1是一个守护进程需要在start之前设置
    	p1.start()
    	p2=Process(target=son2)
    	p2.start()
    	time.sleep(3)
    	print('>>>>inmain')
    p2.join()#等待p2结束之后才结束
    
    • 等待p2结束-->主进程的代码才结束-->守护进程结束
    • 主进程会等待所有的子进程结束,是为了回收子进程的资源
    • 守护进程会等待主进程的代码执行结束之后再结束,而不是等待整个主进程结束
    • 主进程的代码什么时候结束,守护进程就什么时候结束,和其他子进程的执行进度无关

    进程同步---Lock 锁 *****

    • 进程之间数据安全的问题

    锁:

    import  time
    from  multiprocessing  import  Lock,Process
    def  func(i,lock):
    	lock.acquire()#拿钥匙
    	print(f'{i}被锁起来的代码')
    	#time.sleep(1)
    	lock.release()#还钥匙
    
    if  __name__=='__main__':
    	lock=Lock()
    	for  i  in  range(10):
    		p=Process(target=func,args=(i,lock))
    p.start()
    

    抢票的例子:加锁

    import  json
    import  time
    from  multiprocessing  import  Process,Lock
    
    
    def  search(i):
    	#查询余票
    	with  open('ticket',encoding='utf-8')  as  f:
    		ticket=json.load(f)
    	print(f'{i}:当前的余票为{ticket["count"]}张')
    
    
    def  buy_ticket(name):
    	#查询余票
    	with  open('ticket',encoding='utf-8')  as  f:
    		ticket=json.load(f)
    	if  ticket['count']>0:
    		ticket['count'] -= 1
    		print(f'{name}买到票了')
    	time.sleep(0.1)
    	with  open('ticket',mode='w',encoding='utf-8')  as  f:
    		json.dump(ticket,f)
    
    def  get_ticket(i,lock):
    	search(i)
    	with  lock:#代替acquire和release并在此基础上做一些异常处理,保证即便一个进程的代码出错了,也会归还钥匙
    		buy_ticket(i)
    	#lock.acquire()
    	#buy_ticket(i)
    	#lock.release()
    if  __name__=='__main__':
    	lock=Lock()#互斥锁
    	for  i  in  range(10):
    		p=Process(target=get_ticket,args=(i,lock))
    p.start()
    

    为啥叫互斥锁?

    from  multiprocessing  import  Lock#互斥锁不能再同一个进程中连续acquire多次
    lock=Lock()
    lock.acquire()
    print(1)    # 只能打印一个
    lock.acquire()
    print(2)
    

    进程之间数据隔离---队列

    • 进程之间通信(IPC) Inter Process Communication
      • 生产者消费者模型
      • 基于文件:同一台机器上的多个进程之间的通信
        • Queue队列
          • 基于socket的文件级别的通信来完成数据传递的
      • 基于网络:同一台机器或者多台机器上的多进程间通信
        • 第三方工具(消息中间件)
          • memcache
          • redis
          • rabbitmq
          • kafka
    from  multiprocessing  import  Queue, Process
    
    def  pro(q):
    	for  i  in  range(11):
    		print('--->',q.get())
    def  son(q):
    	for  i  in  range(10):
    		q.put(f'hello{i}')
    
    if  __name__=='__main__':
    	q=Queue()
    	Process(target=son,args=(q,)).start()
    Process(target=pro,args=(q,)).start()
    
    • 生产者消费者模型背下来
      • 爬虫的时候
      • 分布式操作:celery
    • 本质:就是让生产数据和消费数据的效率达到平衡并且最大化的效率
    from  multiprocessing  import  Queue,Process
    import  time
    import  random
    
    def  consumer(q,name):#消费者:通常取到数据之后还要进行某些操作
    	while  True:
    		food=q.get()
    		if  food:
    			print(f'{name}吃了{q.get()}')
    		else:
    			break
    
    
    def  producer(q,name,food):#生产者:通常再放数据之前先通过某些代码来获取数据
    	for i in range(10):
    		foodi = f'{food}{i}'
    		print(f'{name}生产了{foodi}')
    		time.sleep(random.random())
    		q.put(foodi)
    
    if __name__ == '__main__':
    	q=Queue()
    	c1=Process(target=consumer,args=(q,'alex'))
    	p1=Process(target=producer,args=(q,'大壮','苹果'))
    	p2=Process(target=producer,args=(q,'b哥','梨子'))
    	c1.start()
    	p1.start()
    	p2.start()
    	p1.join()
    	p2.join()
    q.put(None)#有多少个消费者就put多少个None进来
    
  • 相关阅读:
    pip本地源搭建
    linux 创建 bootable iso 文件
    yum 源本地化 (two)
    linux 网络配置
    linux 设置root可以远程登陆
    察看linux 发行版
    mysql bin-log 设置
    samba 奇怪问题
    delphi中的临界区
    ligerGrid 取得选中行的数据
  • 原文地址:https://www.cnblogs.com/zranguai/p/13822571.html
Copyright © 2011-2022 走看看