zoukankan      html  css  js  c++  java
  • nine day-并发编程

    一、操作系统概念

     1 并发编程:
     2 一个程序 可以在同一时刻做多件事情
     3 解决程序中的IO操作影响程序效率的问题
     4 
     5 操作系统概念、来源
     6 输入输出--大部分时间都不会占用CPU,且会降低你程序的效率
     7 # input/print
     8 # 文件 读数据、取数据
     9 # 网络操作 : 往网线上 发送请求 写
    10 #            读 从网络上获取数据
    11 多道程序系统
    12 分时操作系统:通过时间片轮转算法去调度作业
    13 
    14 通用操作系统:
    15 
    16 现代操作系统:
    17 基于多道批处理系统和分时系统
    18 # 多个程序、作业在遇到IO操作的时候,操作系统会帮助你进行切换
    19 # 让CPU的利用率得到最大的提高
    操作系统概念

    二、进程初识

    进程:运行中的程序
    操作系统只负责管理调度进程
    进程是操作系统中资源分配的最小单位;--记住
    每一个运行中的程序都需要有自己的内存、资源,都分配给进程
    记录执行的状态,管理自己的内存资源

    在Python中,每一个运行中的程序,都是一个进程
    一个进程,就能做一件事
    如果有多个进程---就可以做多个事儿
    multiprocess 例子 异步同步
     1 import os
     2 import time
     3 # print(os.getpgid())
     4 # time.sleep(100)
     5 from multiprocessing import Process
     6 # def func(num):
     7 #     print(num,os.getpid())
     8 #     time.sleep(100)
     9 #
    10 # if __name__=='__main__': #windows需要增加,创建子进程import父进程内容不断引入
    11 #     print( os.getpid())
    12 #     p=Process(target=func,args=(10,)) #创造了一个进程,必须是元祖传参
    13 #     p.start() #开启进程    又申请了一个进程,与上面函数中不一致
    14 #     print(os.getpid()) #1中的
    15 # 2528
    16 # 2528 为何先打印
    17 # 10 7356   --- 开启进程是有时间开销的
    实例

    异步可以有效地提高程序的效率
    进程与进程之间都是异步的
    开启进程是有时间开销的

    几个概念:主进程、子、父
    三、进程进阶
    进程进阶:
    # 什么是进程 : 运行中的程序,计算机中最小的资源分配单位
    # 程序开始执行就会产生一个主进程
    # python中可以主进程中用代码启动一个进程 —— 子进程
    # 同时主进程也被称为父进程
    # 父子进程之间的代码执行是异步的,各自执行自己的
    # 父子进程之间的数据不可以共享
    # 主进程会等待子进程结束之后再结束

    父子进程之间的数据不可以共享--例子及说明
    n = 100
    def func():
        global n
        n = 0
        print('_______')
        time.sleep(10)
    
    # func()
    if __name__ == '__main__':
        Process(target=func).start()
        time.sleep(1)
        print(n)
    不共享例子

    进程结束:主进程会等待子进程结束之后再结束

    开启多个子进程:重要!
    例子 发邮件
     1 def func(num):
     2     time.sleep(1)  #阻塞,阻塞状态;1s之后全部随机的打印出来
     3     print('_'*num)
     4 
     5 # if __name__=='__main__':
     6     # for i in range(10):
     7     #     Process(target=func, args=(i,)).start()
     8     # print('over')
     9     #
    10     # Process(target=func, args=(1,)).start()
    11     # Process(target=func, args=(2,)).start()
    12     # Process(target=func, args=(3,)).start()
    13     # Process(target=func, args=(4,)).start()
    14 if __name__=='__main__':
    15     for i in range(10):
    16         p=Process(target=func, args=(i,))
    17         p.start()
    18         p.join() #阻塞直到子进程执行完毕之后再继续,但每次打印都阻塞了。优化,逐一join
    19     print('over')
    引入
     1 def func(num):
     2     time.sleep(1)  #阻塞,阻塞状态;1s之后全部随机的打印出来
     3     print('_'*num)
     4 if __name__=='__main__':
     5     l=[]
     6     for i in range(10):
     7         p=Process(target=func, args=(i,))
     8         p.start()
     9         l.append(p)
    10     for p in l:
    11         p.join()
    12     print('十条信息已经都发送over')
    开启多个子进程--常用版

    守护进程:守护进程也是一个子进程-------一般报活用
    当主进程的代码执行完毕后自动结束的子进程叫做守护进程
    打印6次--6次--21次例子,易考到
     1 def deamon_func():
     2     while True:
     3         print('life')
     4         time.sleep(0.5)
     5 if __name__=='__main__':
     6     p=Process(target=deamon_func)
     7     p.daemon=True  #守护进程必备
     8     p.start()
     9     for i in range(3):
    10         print(i*'*')
    11         time.sleep(1)
    12 
    13 
    14 结果:
    15 
    16 life
    17 life
    18 *
    19 life
    20 life
    21 **
    22 life
    23 life
    6
     1 import time
     2 def deamon_func():
     3     while True:
     4         print('我还活着')
     5         time.sleep(0.5)
     6 
     7 def wahaha():
     8     for i in range(10):
     9         time.sleep(1)
    10         print(i * '#')
    11 
    12 if __name__ == '__main__' :
    13     p2 = Process(target=wahaha)
    14     p2.start()
    15     p = Process(target=deamon_func)
    16     p.daemon = True  #守护进程必备
    17     p.start()
    18     for i in range(3):
    19         print(i*'*')
    20         time.sleep(1)
    21     p2.join()  #不加6次,加上21次
    22 
    23 结果:
    24 
    25 我还活着
    26 我还活着
    27 *
    28 我还活着
    29 
    30 我还活着
    31 **
    32 我还活着
    33 #
    34 我还活着
    35 ##
    36 我还活着
    37 我还活着
    38 我还活着
    39 ###
    40 我还活着
    41 我还活着
    42 ####
    43 我还活着
    44 我还活着
    45 #####
    46 我还活着
    47 我还活着
    48 ######
    49 我还活着
    50 我还活着
    51 #######
    52 我还活着
    53 我还活着
    54 ########
    55 我还活着
    56 我还活着
    57 #########
    58 
    59 不加join结果:
    60 
    61 我还活着
    62 我还活着
    63 *
    64 
    65 我还活着
    66 我还活着
    67 **
    68 #
    69 我还活着
    70 我还活着
    71 ##
    72 ###
    73 ####
    74 #####
    75 ######
    76 #######
    77 ########
    78 #########
    6;21

    小结。。
    # 开启一个子进程 start
    # 子进程和主进程是异步
    # 如果在主进程中要等待子进程结束之后再执行某段代码:join
    # 如果有多个子进程 不能在start一个进程之后就立刻join,把所有的进程放到列表中,等待所有进程都start之后再逐一join
    # 守护进程 —— 当主进程的"代码"执行完毕之后自动结束的子进程叫做守护进程

    四、进程锁、队列等概念

    锁、队列稍微重要一些

    锁牺牲效率,但保证了数据的安全--例子

    多进程同时抢购余票-例子,买票时记得上锁,这样只有1个成功了
     1 # #文件db的内容为:{"count":1}
     2 # #注意一定要用双引号,不然json无法识别
     3 # #并发运行,效率高,但竞争写同一文件,数据写入错乱
     4 from multiprocessing import Process,Lock
     5 import time,json,random
     6 def search():
     7     dic=json.load(open('db'))
     8     print('33[43m剩余票数%s33[0m' %dic['count'])
     9 
    10 def get():
    11     dic=json.load(open('db'))
    12     time.sleep(0.1) #模拟读数据的网络延迟
    13     if dic['count'] >0:
    14         dic['count']-=1
    15         time.sleep(0.2) #模拟写数据的网络延迟
    16         json.dump(dic,open('db','w'))
    17         print('33[43m购票成功33[0m')
    18 
    19 def task():
    20     search()
    21     get()
    22 
    23 if __name__ == '__main__':
    24     for i in range(10): #模拟并发100个客户端抢票
    25         p=Process(target=task)
    26         p.start()
    未上锁
     1 from multiprocessing import Process,Lock
     2 import time,json,random
     3 def search():
     4     dic=json.load(open('db'))
     5     print('33[43m剩余票数%s33[0m' %dic['count'])
     6 
     7 def get(num):
     8     dic=json.load(open('db'))
     9     time.sleep(random.random()) #模拟读数据的网络延迟
    10     if dic['count'] >0:
    11         dic['count']-=1
    12         time.sleep(random.random()) #模拟写数据的网络延迟
    13         json.dump(dic,open('db','w'))
    14         print('33[43m%s购票成功33[0m'%num)
    15 
    16 def task(num,lock):
    17     search()
    18     lock.acquire()
    19     get(num)
    20     lock.release()
    21 
    22 if __name__ == '__main__':
    23     lock = Lock()
    24     for i in range(10): #模拟并发100个客户端抢票
    25         p=Process(target=task,args = (i,lock))
    26         p.start()
    买票上锁

    信号量:本质就是 锁+计数器 例子抢ktv
     1 from multiprocessing import Process,Semaphore
     2 import time,random
     3 
     4 def go_ktv(sem,user):
     5     sem.acquire()
     6     print('%s 占到一间ktv小屋' %user)
     7     time.sleep(random.randint(3,5)) #模拟每个人在ktv中待的时间不同
     8     sem.release()
     9     print('%s 走出ktv小屋' % user)
    10 
    11 if __name__ == '__main__':
    12     sem=Semaphore(4)
    13     p_l=[]
    14     for i in range(13):
    15         p=Process(target=go_ktv,args=(sem,'user%s' %i,))
    16         p.start()
    17         p_l.append(p)
    18 
    19     for i in p_l:
    20         i.join()
    21     print('============》')
    View Code

    事件:工作中不常用
    事件内部内置了一个标志
    wait方法 如果这个标志是True,那么wait=pass 绿灯不等待 is_set
    wait方法 如果这个标志是False,那么wait就会陷入阻塞,一直阻塞到标志从False变成True
    # 一个事件在创建之初 内部的标志默认是False
    # Flase -> True set()
    # True -> False clear()
    红绿灯模型
     1 # 红绿灯模型
     2 from multiprocessing import Process, Event
     3 import time, random
     4 
     5 def car(e, n):
     6     while True:
     7         if not e.is_set():  # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
     8             print('33[31m红灯亮33[0m,car%s等着' % n)
     9             e.wait()    # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
    10             print('33[32m车%s 看见绿灯亮了33[0m' % n)
    11             time.sleep(random.randint(3, 6))
    12             if not e.is_set():   #如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
    13                 continue
    14             print('车开远了,car', n)
    15             break
    16 
    17 def traffic_lights(e, inverval):
    18     while True:
    19         time.sleep(inverval)   # 先睡3秒
    20         if e.is_set():         # 标志是True
    21             print('######', e.is_set())
    22             e.clear()  # ---->将is_set()的值设置为False
    23         else:                 # 标志是False
    24             e.set()    # ---->将is_set()的值设置为True
    25             print('***********',e.is_set())
    26 
    27 
    28 if __name__ == '__main__':
    29     e = Event()   #e就是事件
    30     t = Process(target=traffic_lights, args=(e, 3))  # 创建一个进程控制红绿灯
    31     for i in range(10):
    32         p=Process(target=car,args=(e,i,))  # 创建是个进程控制10辆车
    33         p.start()
    34     t.start()
    35 
    36     print('============》')
    37 
    38 
    39 
    40 # 10个进程 模拟车 :车的行走要依靠当时的交通灯
    41 # 交通灯是绿灯 车就走
    42 # 交通灯是红灯 车就停 停到灯变绿
    43 # wait 来等灯
    44 # set clear 来控制灯
    View Code

    队列 Queue保证数据安全, 管道+锁=队列
    拿到返回结果
    数字例子
     1 from multiprocessing import Queue,Process
     2 
     3 # def func(n):
     4 #     return n*n
     5 #
     6 # if __name__=='__main__':
     7 #     Process(target=func,args=(100,)).start()
     8 
     9 def func(n,q):
    10     q.put(n*n)  #args要加q,否则TypeError: func() missing 1 required positional argument: 'q'
    11   #put???放钥匙
    12 if __name__=='__main__':
    13     q=Queue()
    14     Process(target=func,args=(100,q)).start()
    15     print(q.get())  #取钥匙
    数字例子
    抢票例子
     1 from multiprocessing import Queue,Process
     2 
     3 def func(num,q):
     4     try:
     5         t = q.get_nowait()
     6         print("%s抢到票了"%num)
     7     except:pass
     8 
     9 if __name__ == '__main__':
    10     q = Queue()
    11     q.put(1)
    12     for i in range(10):
    13         Process(target=func,args=(i,q)).start()
    抢票例子
    # 管道 + 锁  == 队列
    # 管道也是一个可以实现进程之间通信的模型
    # 但是管道没有锁,数据不安全
    
    # 消息中间件
    # memcache
    # rabitmq
    # kafka —— 大数据相关
    # redis

    数据共享:无数据安全
    withlock=加了lock那两句话
     1 from multiprocessing import Manager,Process,Lock
     2 def work(d,lock):
     3     with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
     4         d['count']-=1
     5 
     6 if __name__ == '__main__':
     7     lock=Lock()
     8     m = Manager()
     9     dic = m.dict({'count':100})
    10     #dic = {'count':100}
    11     p_l=[]
    12     for i in range(50):
    13         p=Process(target=work,args=(dic,lock))
    14         p_l.append(p)
    15         p.start()
    16     for p in p_l:
    17         p.join()
    18     print(dic)
    View Code

    一般cpu个数=进程个数,不宜开太多
    五、线程
    进程:是计算机资源分配的最小单位
    线程和进程的关系:
    每一个进程都至少有一个线程
     1 # import os
     2 # import time
     3 # from threading import Thread
     4 # n = 100
     5 # def func(i):
     6 #     global n
     7 #     time.sleep(1)
     8 #     n -= 1
     9 #     print(os.getpid(),'thread%s'%i)
    10 # t_l = []
    11 # for i in range(100):
    12 #     t = Thread(target=func,args=(i,))
    13 #     t.start()
    14 #     t_l.append(t)
    15 # for t in t_l:t.join()
    16 # print('main : ',n)
    View Code

    本质的区别:同一个进程的每个线程之间数据共享
    线程之间也是异步的
    线程是轻量级的,创建一个线程的时间开销要远远小于进程
    线程是CPU调度的最小单位
    小结。。 数据完全共享例子。多并发。
    python中线程的特点
    其他语言中现线程的特点
    # 每个进程里至少有一个主线程负责执行代码
    # 在主线程中可以再开启一个新的线程
    # 在同一个进程中就有两个线程同时在工作了
    # 线程才是CPU调度的最小单位
    # 多个线程之间的数据时共享的
    
    # GIL锁  全局解释器锁
    # 解释器的锅 Cpython解释器的问题
    # 在同一个进程中 同一个时刻 只能有一个线程被CPU执行
    # 导致高计算型 代码 不适合用python的多线程来解决
    # 用多进程或者分布式来解决高计算型代码
    概念

    GIL锁:全局解释器锁----Cpython解释器的问题
    在同一个进程中同一时刻只能有一个线程被cpu执行,
    导致高计算型代码,不合适用python的多线程来解决,
    用多进程或者分布式来解决高计算型代码

    使用多线程处理高计算型场景 python并不占优势,
    在同一个进程中同一时刻只能有一个线程访问CPU
    CPU主要用于计算的,如果程序是高IO类型的,涉及到比较多的网络请求、数据库请求、文件请求
    用celery解决
    多进程--高计算,,分布式
    分布式:大问题分解成小问题,分别去执行,再汇总
    2.守护线程
    例子:使用多线程使用socket--看博客实现
    socketserver内部就是用多线程实现并发的

    # 主线程结束了之后守护线程也同时结束

    守护线程会等待主线程完全结束之后才结束
     1 # from threading import Thread
     2 # import time
     3 # def foo():
     4 #     while True:
     5 #         print(123)
     6 #         time.sleep(1)
     7 #
     8 # def bar():
     9 #     print(456)
    10 #     time.sleep(3)
    11 #     print("end456")
    12 #
    13 # t1 = Thread(target=foo)
    14 # t2 = Thread(target=bar)
    15 #
    16 # t1.daemon = True
    17 # t1.start()
    18 # t2.start()
    19 # print("main-------")
    20 
    21 结果:
    22 123
    23 456
    24 main-------
    25 123
    26 123
    27 123
    28 end456
    View Code
    锁: 当你的程序中出现了取值计算再赋值的操作 数据不安全--需要人为加锁
     1 from threading import Thread,Lock
     2 import time
     3 def work():
     4     global n
     5     lock.acquire()
     6     temp=n
     7     time.sleep(0.1)
     8     n = temp-1
     9     lock.release()
    10 if __name__ == '__main__':
    11     lock=Lock()
    12     n=100
    13     l=[]
    14     for i in range(100):
    15         p=Thread(target=work)
    16         l.append(p)
    17         p.start()
    18     for p in l:
    19         p.join()
    20     print(n)
    21 
    22 # 当你的程序中出现了取值计算再赋值的操作 数据不安全 —— 加锁

    递归锁:死锁---递归锁(一串钥匙)
    抢面例子
     1 from threading import RLock
     2 
     3 lock = RLock()
     4 lock.acquire()
     5 lock.acquire()
     6 print(123)
     7 lock.release()
     8 print(456)
     9 lock.release()
    10 
    11 
    12 # 普通的锁 在同一个线程中 只能acquire一次
    13 # 所以当acquire两次的时候就容易出现死锁现象
    14 # 出现了死锁现象可以使用递归锁去解决问题
    15 # 但是本质上死锁的出现是因为逻辑的错误
    16 # 因此我们更应该把注意力集中在解决逻辑错误
    17 # 而不要在出现错误的时候直接用递归锁规避
    小结

    3.池

    一般情况:
    cpu个数+1 进程数
    cpu个数*5 线程数
    池:会起20个线程
    例子--引入进程加if例子?? shutdown
     1 # import time
     2 # from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
     3 # def func(num):
     4 #     print(num)
     5 #     time.sleep(1)
     6 #     print(num)
     7 # if __name__ == '__main__':
     8 #     t = ThreadPoolExecutor(20)
     9 #     for i in range(100):
    10 #         t.submit(func,i)
    11 #     t.shutdown()  # join整个池子
    12 #     print('done')
    20个一起取得方式取数

    map简便算法例子--for+submit
     1 # import os,time,random
     2 # from concurrent.futures import ThreadPoolExecutor
     3 # def task(n):
     4 #     print('%s is runing' %os.getpid(),n)
     5 #     time.sleep(random.randint(1,3))
     6 #     return n**2
     7 # if __name__ == '__main__':
     8 #     executor=ThreadPoolExecutor(max_workers=3)
     9 #     # for i in range(11):
    10 #     #     future=executor.submit(task,i)
    11 #     executor.map(task,range(1,12)) #map取代了for+submit
    1-3个随机取

    callback回调函数
    concurrent.futures callback是由子线程做的
    例子,真实爬取例子
     1 # callback回调函数
     2 # 我有10个http的网页请求
     3 # 我要把这10个网页的上信息分析了
     4 
     5 # import time
     6 # import random
     7 # from concurrent.futures import ThreadPoolExecutor
     8 # from threading import current_thread
     9 # urls=[
    10 #         'https://www.baidu.com',
    11 #         'https://www.python.org',
    12 #         'https://www.openstack.org',
    13 #         'https://help.github.com/',
    14 #         'http://www.sina.com.cn/'
    15 #         'http://www.cnblogs.com/'
    16 #         'http://www.sogou.com/'
    17 #         'http://www.sohu.com/'
    18 #     ]
    19 #
    20 # def analies(content):
    21 #     print('分析网页',current_thread())
    22 #     print(content.result())
    23 #
    24 # def get_url(url):
    25 #     print('爬取网页',current_thread())
    26 #     time.sleep(random.uniform(1,3))
    27 #     # analies(url*10)
    28 #     return url*10
    29 #
    30 # t = ThreadPoolExecutor(3)
    31 # print('主线程',current_thread())
    32 # for url in urls:
    33 #     t.submit(get_url,url).add_done_callback(analies)
    34 
    35 # concurrent.futures callback是由子线程做的
    爬取
     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
     4 # from multiprocessing import Pool
     5 import requests   # 需要你安装一下 是一个扩展模块 pip install requests
     6 import os
     7 
     8 def get_page(url):
     9     print('<进程%s> get %s' %(os.getpid(),url))
    10     respone=requests.get(url)
    11     if respone.status_code == 200:
    12         return {'url':url,'text':respone.text}
    13 
    14 def parse_page(res):
    15     res=res.result()
    16     print('<进程%s> parse %s' %(os.getpid(),res['url']))
    17     parse_res='url:<%s> size:[%s]
    ' %(res['url'],len(res['text']))
    18     with open('db.txt','a') as f:
    19         f.write(parse_res)
    20 
    21 
    22 if __name__ == '__main__':
    23     # ret = get_page('https://www.baidu.com')
    24     # print(ret)
    25     urls=[
    26         'https://www.baidu.com',
    27         'https://www.python.org',
    28         'https://www.openstack.org',
    29         'https://help.github.com/',
    30         'http://www.sina.com.cn/'
    31     ]
    32 
    33     # p=Pool(3)
    34     # for url in urls:
    35     #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    36     # p.close()
    37     # p.join()
    38 
    39     p=ProcessPoolExecutor(3)
    40     for url in urls:
    41         p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
    真实爬取
    1 url:<https://www.baidu.com> size:[2443]
    2 url:<https://help.github.com/> size:[128922]
    3 url:<http://www.sina.com.cn/> size:[590631]
    4 url:<https://www.openstack.org> size:[64120]
    5 url:<https://www.python.org> size:[48756]
    db中内容

    六、协程

    协程:把一个线程拆分成几个  500     5w个并发 进程+线程+协程
    进程、线程都是操作系统在调度
    协程 是程序级别调度---减轻了操作系统的负担、增强了用户对程序的可控性
    例子--等待帮助切换,只认识gevent里的sleep,可以加importmonkey。。
    爬虫例子
    看博客中例子
    
    
     1 # 进程 计算机中资源分配的最小单位   cpu+1
     2 # 线程 CPU 调度最小单位            cpu*5
     3 # 协程 把一个线程拆分成几个         500
     4 
     5 # 进程 线程 都是操作系统在调度
     6 # 协程 是程序级别调度
     7 
     8 # 减轻了操作系统的负担、增强了用户对程序的可控性
     9 # from gevent import monkey;monkey.patch_all()
    10 # import gevent
    11 # import time
    12 # def eat(name):
    13 #     print('%s eat 1' %name)
    14 #     time.sleep(2)
    15 #     print('%s eat 2' %name)
    16 #
    17 # def play(name):
    18 #     print('%s play 1' %name)
    19 #     time.sleep(1)
    20 #     print('%s play 2' %name)
    21 #
    22 #
    23 # g1=gevent.spawn(eat,'egon')
    24 # g2=gevent.spawn(play,name='egon')
    25 # g1.join()
    26 # g2.join()
    27 # # gevent.joinall([g1,g2])
    28 # print('')
    View Code
    
    
    from gevent import monkey;monkey.patch_all()
    import gevent
    import requests
    import time
    
    def get_page(url):
        print('GET: %s' %url)
        response=requests.get(url)
        if response.status_code == 200:
            print('%d bytes received from %s' %(len(response.text),url))
    
    
    start_time=time.time()
    gevent.joinall([
        gevent.spawn(get_page,'https://www.python.org/'),
        gevent.spawn(get_page,'https://www.yahoo.com/'),
        gevent.spawn(get_page,'https://github.com/'),
    ])
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))
    gevent
     
  • 相关阅读:
    HTTP状态码
    CentOS 7 上安装vim(默认未安装)
    yum安装提示Another app is currently holding the yum lock; waiting for it to exit...
    CentOS 7 安装telnet服务
    shell编程
    shell基础
    ssh相关命令
    ssh无密码连接
    centos7小命令
    日志管理
  • 原文地址:https://www.cnblogs.com/lijie123/p/9175179.html
Copyright © 2011-2022 走看看