zoukankan      html  css  js  c++  java
  • 第十五章 并发编程

    1.操作系统的发展史                                            

    参考:http://www.cnblogs.com/Eva-J/articles/8253521.html

    知识点

    输入输出 -- 大部分时间都不会占用CPU,但会降低你程序的效率

    操作系统的三种基本类型:多道批处理系统、分时系统、实时系统。

     现在操作系统

    基于多道批处理系统和分时系统

    多个程序、作业在遇到IO操作的时候,操作系统会帮助你进行切换

    让CPU的利用率得到最大的提高

    2.进程                                                        

    初识进程

    进程: 运行中的程序

    操作系统 只负责管理调度进程

    进程是操作系统中资源分配的最小单位

    每一个运行中的程序都需要有自己的内存、资源

    都分配给进程 记录执行的状态 管理自己的内存资源

    在Python中,每一个运行的程序 都是一个进程

    如何使用python来开启一个进程--我们使用multiprocessing模块,process-进程,multi多元的

    并发编程例:

    import os
    import time
    from multiprocessing import Process
    def func(num):
        print(num,os.getpid())
        time.sleep(0.5)
        print(num,os.getpid())
        time.sleep(0.5)
        print(num, os.getpid())
        time.sleep(0.5)
        print(num, os.getpid())
    
    # 创建一个进程
    if __name__ == '__main__':  # windows,开启子进程的时候,会执行import
        print(os.getpid())
        p = Process(target=func, args=(10,))
        p.start() # 开启进程
        print(os.getpid())
        time.sleep(1)
        print(os.getpid(),1)
        time.sleep(1)
        print(os.getpid(),2)
    
    # 运行结果
    #                  时间轴
    # 17500              0
    # 17500              0
    # 10 12656           0+
    # 10 12656           0.5
    # 17500 1            1
    # 10 12656           1+
    # 10 12656           1.5+
    # 17500 2            2
    
    

    几个概念:

    同步:一个时刻只能做一件事

    异步:同时做多件事

    异步可以有效地提高程序的效率

    进程:主进程中开启的新的进程

    主进程:运行的这个程序

    父进程:创造出子进程的进程

    注意:进程与进程之间都是异步的,而且开启一个进程是有时间开销的

    进程的进阶

    什么是进程:运行中的程序,计算机中最小的资源分配单位

    程序开始执行就会产生一个主进程

    python中可以主进程中用代码启动一个进程 -- 子进程

    同时主进程也被称为父进程父子进程之间的代码执行是异步的,各自执行自己的

    父子进程之间的数据不可以共享

    例:

    import time
    from multiprocessing import Process
    
    n = 100
    def func():
        global n
        n = 0
        print(n)
        print('----------')
    
    if __name__ == '__main__':
        Process(target=func).start()
        time.sleep(1)
        print(n)
    # 0
    # ----------
    # 100
    # 说明:父进程中n的值并没有随着子进程的运行而改变

     

    进程什么时候结束

    主进程什么时候结束:主进程会等待子进程结束之后再结束,父进程会等待回收子进程所占用的资源

    例:直接开启多个子进程

    import time
    from multiprocessing import Process
    
    n = 100
    def func(n):
        time.sleep(1)
        print('-'*n)
    
    if __name__ == '__main__':
        Process(target=func, args=(1,)).start()
        Process(target=func, args=(2,)).start()
        Process(target=func, args=(3,)).start()
        Process(target=func, args=(4,)).start()
    
    # 说明:主要看时间片的轮转,是一个随机事件
    # -
    # ----
    # ---
    # --

    例:使用for循环开启多个子进程

    import time
    from multiprocessing import Process
    
    n = 100
    def func(n):
        time.sleep(1)
        print('-'*n)
    
    if __name__ == '__main__':
        for i in range(10):
            Process(target=func, args=(i,)).start()
    # -
    # --
    # --------
    # ----
    # ---
    # -----
    # ---------
    # ------
    # -------

    阻塞

    例:使用阻塞(join)开启子进程

    import time
    from multiprocessing import Process
    
    n = 100
    def func(n):
        time.sleep(1)
        print('-'*n)
    
    if __name__ == '__main__':
        p = Process(target=func, args=(1,))
        p.start()
        print('子进程开始了')
        p.join() # 阻塞 直到子进程都执行完毕
        print('十条信息已经都发送完了')
    # 子进程开始了
    # -
    # 十条信息已经都发送完了

    例:使用阻塞(join)循环开启多个子进程

    import time
    from multiprocessing import Process
    
    n = 100
    def func(n):
        time.sleep(1)
        print('-'*n)
    
    if __name__ == '__main__':
        lst = []
        for i in range(10):
            p = Process(target=func, args=(i,))
            p.start()
            lst.append(p)
        for p in lst:p.join()
    
        print('十条信息已经都发送完了')
    
    # -----
    # -
    # ---
    # -------
    # --
    # ---------
    # ----
    # --------
    # ------
    # 
    # 十条信息已经都发送完了

    守护进程

    守护进程也是一个子进程

    定义:当主进程的代码执行完毕之后自动结束的子进程叫做守护进程

    例:

    import time
    from multiprocessing import Process
    def deamon_func():
        while True:
            print('我还活着')
            time.sleep(0.5)
    
    
    if __name__ == '__main__':
        p = Process(target=deamon_func)
        # 开启守护进程
        p.daemon = True
        p.start()
        for i in range(3):
            print(i*'*')
            time.sleep(0.4)
    
    # 我还活着
    # *
    # 我还活着
    # **
    # 我还活着
    # 说明:正常情况下,父进程会等待子进程结束才结束
    # 而设置守护进程后,父进程结束了,子进程的死循环即可停止

    练习:分析程序请问会打印多少个我还活着

    import time
    from multiprocessing import Process
    def deamon_func():
        while True:
            print('我还活着')
            time.sleep(0.5)
    
    def wahaha():
        for i in range(10):
            time.sleep(1)
            print(i*'#')
    
    if __name__ == '__main__':
        Process(target=wahaha).start()
        p = Process(target=deamon_func)
        p.daemon = True
        p.start()
        for i in range(3):
            print(i*'*')
            time.sleep(1)
    
    
    # 我还活着
    # 我还活着
    # *
    #
    # 我还活着
    # 我还活着
    # **
    # #
    # 我还活着
    # 我还活着
    # ##
    # ###
    # ####
    # #####
    # ######
    # #######
    # ########
    # #########
    # 说明:因为'我还活着'是守护进程打印的,所以当父进程结束后,停止打印,3/0.5=6次
    # 但程序不会停止,需要等待子进程wahaha结束

    例:主进程添加阻塞

    import time
    from multiprocessing import Process
    def deamon_func():
        while True:
            print('我还活着')
            time.sleep(0.5)
    
    def wahaha():
        for i in range(10):
            time.sleep(1)
            print(i*'#')
    
    if __name__ == '__main__':
        p2 = Process(target=wahaha)
        p2.start()
        p = Process(target=deamon_func)
        p.daemon = True
        p.start()
        for i in range(3):
            print(i*'*')
            time.sleep(1)
        p2.join()
    
    # 我还活着
    # 我还活着
    # *
    # 
    # 我还活着
    # 我还活着
    # **
    # #
    # 我还活着
    # 我还活着
    # ##
    # 我还活着
    # 我还活着
    # ###
    # 我还活着
    # 我还活着
    # ####
    # 我还活着
    # 我还活着
    # #####
    # 我还活着
    # 我还活着
    # ######
    # 我还活着
    # 我还活着
    # #######
    # 我还活着
    # 我还活着
    # ########
    # 我还活着
    # 我还活着
    # #########
    # 我还活着

    总结

    # 开启一个子进程 start
    # 子进程和主进程是异步的
    # 如果在主进程中要等待子进程结束之后再执行某行代码:join
    # 如果有多个子进程 不能在start一个进程之后就立刻join,把所有的进程放到列表中
    # 等待所有的进程都start之后再逐一join
    # 守护进程 -- 当主进程的代码执行完毕之后自动结束的子进程叫做守护进程

    添加锁是为了保证程序的运行顺序,牺牲了效率但是保护数据的安全

    例:未添加阻塞、锁

    import os
    import time
    import random
    from multiprocessing import Process,Lock
    
    def work(n):
        print('%s: %s is running' %(n,os.getpid()))
        time.sleep(random.random())
        print('%s:%s is done' %(n,os.getpid()))
    
    if __name__ == '__main__':
        for i in range(3):
            p=Process(target=work,args=(i,  ))
            p.start()
    # 0: 23192 is running
    # 2: 6064 is running
    # 1: 22560 is running
    # 1:22560 is done
    # 0:23192 is done
    # 2:6064 is done
    # 说明:在没有使用阻塞,锁的时候,因为时间片是随机事件,所以随机子进程开始

    例:使用阻塞保证子进程的独立运行

    import os
    import time
    import random
    from multiprocessing import Process,Lock
    
    def work(n):
        print('%s: %s is running' %(n,os.getpid()))
        time.sleep(random.random())
        print('%s:%s is done' %(n,os.getpid()))
    
    if __name__ == '__main__':
        for i in range(3):
            p=Process(target=work,args=(i,  ))
            p.start()
            p.join()
    
    # 0: 13184 is running
    # 0:13184 is done
    # 1: 22868 is running
    # 1:22868 is done
    # 2: 22200 is running
    # 2:22200 is done

    例:使用锁

    import os
    import time
    import random
    from multiprocessing import Process,Lock
    
    def work(n, lock):
        lock.acquire()
        print('%s: %s is running' %(n,os.getpid()))
        time.sleep(random.random())
        print('%s:%s is done' %(n,os.getpid()))
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(3):
            p=Process(target=work,args=(i,lock))
            p.start()
            
    # 0: 18864 is running
    # 0:18864 is done
    # 1: 23264 is running
    # 1:23264 is done
    # 2: 15820 is running
    # 2:15820 is done

    信号量

    信号量本质是锁+计数器

    from multiprocessing import Process,Semaphore
    import time,random
    
    def go_ktv(sem,user):
        sem.acquire()
        print('%s 占到一间ktv小屋' %user)
        time.sleep(random.randint(3,5)) #模拟每个人在ktv中待的时间不同
        sem.release()
        print('%s 走出ktv小屋' % user)
    
    if __name__ == '__main__':
        sem=Semaphore(4)
        p_l=[]
        for i in range(13):
            p=Process(target=go_ktv,args=(sem,'user%s' %i,))
            p.start()
            p_l.append(p)
    
        for i in p_l:
            i.join()
        print('============》')
    
    # 信号量的本质就是 锁+计数器
    # user12 占到一间ktv小屋
    # user8 占到一间ktv小屋
    # user0 占到一间ktv小屋
    # user7 占到一间ktv小屋
    # user12 走出ktv小屋
    # user4 占到一间ktv小屋
    # user6 占到一间ktv小屋
    # user0 走出ktv小屋
    # user8 走出ktv小屋
    # user3 占到一间ktv小屋
    # user7 走出ktv小屋
    # user11 占到一间ktv小屋
    # ******
    # ============》
    # 说明:程序设置Semaphore(4),表示同时只能有四个子进程存在,如果有结束的,再添加新的,知道所有子进程结束

    事件

    事件内部内置了一个标志
    wait 方法 如果这个标志是True,那么wait == pass
    wait 方法 如果这个标志是False,那么wait就会陷入阻塞,一直阻塞到标志从False变成True

    一个事件在创建之初 内部的标志默认是False
    False -> True set()
    True -> False clear()

    红绿灯模型

    # 红绿灯模型
    from multiprocessing import Process, Event
    import time, random
    
    def car(e, n):
        while True:
            if not e.is_set():  # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
                print('33[31m红灯亮33[0m,car%s等着' % n)
                e.wait()    # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
                print('33[32m车%s 看见绿灯亮了33[0m' % n)
                time.sleep(random.randint(3, 6))
                if not e.is_set():   #如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
                    continue
                print('车开远了,car', n)
                break
    
    def traffic_lights(e, inverval):
        while True:
            time.sleep(inverval)   # 先睡3秒
            if e.is_set():         # 标志是True
                print('######', e.is_set())
                e.clear()  # ---->将is_set()的值设置为False
            else:                 # 标志是False
                e.set()    # ---->将is_set()的值设置为True
                print('***********',e.is_set())
    
    
    if __name__ == '__main__':
        e = Event()   #e就是事件
        t = Process(target=traffic_lights, args=(e, 3))  # 创建一个进程控制红绿灯
        for i in range(10):
            p=Process(target=car,args=(e,i,))  # 创建是个进程控制10辆车
            p.start()
        t.start()
    
        print('============》')
    
    
    
    # 10个进程 模拟车 :车的行走要依靠当时的交通灯
    # 交通灯是绿灯 车就走
    # 交通灯是红灯 车就停 停到灯变绿
    # wait 来等灯
    # set clear 来控制灯

    队列

    管道+锁 = 队列

    管道也是一个可以实现进程之间通信的模型

    但是管道没有锁,数据不安全

    from multiprocessing import Queue,Process
    
    def func(q,num):
        try:
            t = q.get_nowait()
            print("%s抢到票了"%num)
        except:pass
    
    if __name__ == '__main__':
        q = Queue()
        q.put(1)
        for i in range(10):
            Process(target=func,args=(q,i)).start()

    3.线程                                                        

    什么是进程 :是计算机资源分配的最小单位

    什么是线程 :是CPU调度的最小单位

    线程和进程的关系:

      每个进程中都至少有一个线程

    多线程服务的时候才能感受到线程的存在

    本质的区别:

      同一个进程的每个线程之间数据共享

      线程之间也是异步的

      线程是轻量级的:创建一个线程的时间开销要远远小于进程

      线程是CPU调度的最小单位

    例:线程的开启

    import os
    import time
    from threading import Thread
    def func(i):
        print(os.getpid())
        time.sleep(1)
        print('thread',i)
    
    Thread(target=func,args=(1,)).start()
    print(123*'*')
    print('main', os.getpid())
    # 23280
    # ***************************************************************************************************************************
    # main 23280
    # thread 

    总结:

    # 每一个进程里至少有一个主线程负责执行代码
    # 在主线程中可以再开启一个新的线程
    # 在同一个进程中就有两个线程同时在工作了
    # 线程才是CPU调度的最小单位
    # 多个线程之间的数据是共享的
    # GIL锁 全局解释器锁
    # 解释器的锅 Cpython解释器的问题
    # 在同一个进程中 同一时刻 只能有一个线程被CPU执行
    # 导致高计算型 代码 不适合用python的多线程来解决
    # 用多进程或者分布式来解决高计算型代码

    守护线程

    例:

    from threading import Thread
    import time
    def foo():
        while True:
            print(123)
            time.sleep(1)
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    t1 = Thread(target=foo)
    t2 = Thread(target=bar)
    
    t1.daemon = True
    t1.start()
    t2.start()
    print("main-------")
    
    
    # 主线程结束了之后守护线程也同时结束
    # 守护线程会等待主线程完全结束之后才结束

    例:

    from threading import Thread,Lock
    import time
    def work():
        global n
        lock.acquire()
        temp=n
        time.sleep(0.1)
        n = temp-1
        lock.release()
    if __name__ == '__main__':
        lock=Lock()
        n=100
        l=[]
        for i in range(100):
            p=Thread(target=work)
            l.append(p)
            p.start()
        for p in l:
            p.join()
        print(n)
    
    # 当你的程序中出现了取值计算再赋值的操作 数据不安全 —— 加锁

    线程池

    程序可以开启的进程数和线程数

    CPU的个数+1 进程数

    CPU的个数*5 线程数

    池的介绍
    #1 介绍
    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    Both implement the same interface, which is defined by the abstract Executor class.
    
    #2 基本方法
    #submit(fn, *args, **kwargs)
    异步提交任务
    
    #map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    #shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    #result(timeout=None)
    取得结果
    
    #add_done_callback(fn)
    回调函数

    例:

    import time
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    def func(num):
        print(num)
        time.sleep(1)
        print(num)
    if __name__ == '__main__':
        t = ThreadPoolExecutor(20)
        for i in range(100):
            t.submit(func,i)
        t.shutdown()  # join整个池子,必须添加
        print('done')
    # 说明:线程会以20个为一组开启

    例:使用map取代了for+submit

    import os,time,random
    from concurrent.futures import ThreadPoolExecutor
    def task(n):
        print('%s is runing' %os.getpid(),n)
        time.sleep(random.randint(1,3))
        return n**2
    if __name__ == '__main__':
        executor=ThreadPoolExecutor(max_workers=3)
        executor.map(task,range(1,12)) #map取代了for+submit

    callback回掉函数 - 当拿到返回值的时候,立刻进行某操作

    我有10个http的网页请求,把这10个网页上的信息分析了

    例:

    import time
    import random
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
            'http://www.cnblogs.com/'
            'http://www.sogou.com/'
            'http://www.sohu.com/'
        ]
    
    def analies(content):
        print('分析网页',current_thread())
        print(content.result())
    
    def get_url(url):
        print('爬取网页',current_thread())
        time.sleep(random.uniform(1,3))
        # analies(url*10)
        return url*10
    
    t = ThreadPoolExecutor(3)
    print('主线程',current_thread())
    for url in urls:
        t.submit(get_url,url).add_done_callback(analies)
    
    concurrent.futures callback是由子线程做的

    线程的其他知识与进程类似

    4.协程                                                        

    协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

    进程 计算机中资源分配的最小单位 cpu+1

    线程 CPU 调度最小单位 cpu*5

    协程 把一个线程拆分成几个 500

    进程 线程 都是操作系统在调度

    协程 是程序级别调度

    减轻了操作系统的负担、增强了用户对程序的可控性

    例1:

    from gevent import monkey;monkey.patch_all()
    import gevent
    import time
    def eat(name):
        print('%s eat 1' %name)
        time.sleep(2)
        print('%s eat 2' %name)
    
    def play(name):
        print('%s play 1' %name)
        time.sleep(1)
        print('%s play 2' %name)
    
    
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,name='egon')
    g1.join()
    g2.join()
    # gevent.joinall([g1,g2])
    print('')
    # egon eat 1
    # egon play 1
    # egon play 2
    # egon eat 2
    #

    例2:

    from gevent import monkey;monkey.patch_all()
    import gevent
    import requests
    import time
    
    def get_page(url):
        print('GET: %s' %url)
        response=requests.get(url)
        if response.status_code == 200:
            print('%d bytes received from %s' %(len(response.text),url))
    
    
    start_time=time.time()
    gevent.joinall([
        gevent.spawn(get_page,'https://www.python.org/'),
        gevent.spawn(get_page,'https://www.yahoo.com/'),
        gevent.spawn(get_page,'https://github.com/'),
    ])
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))
    # GET: https://www.python.org/
    # GET: https://www.yahoo.com/
    # GET: https://github.com/
    # 48801 bytes received from https://www.python.org/
    # 527368 bytes received from https://www.yahoo.com/
    # 59420 bytes received from https://github.com/
    # run time is 6.552013397216797
  • 相关阅读:
    intel Skylake平台安装WIN7
    复杂表单
    sqlalchemy的cascades
    新版mysql 5.7的group_by非常不和谐
    sqlalchemy使用tip
    sql分组数据去重
    flask-sqlalchemy relationship
    老笔记本装xubuntu+win7
    [leetcode]Reorder List
    [leetcode]Insertion Sort List
  • 原文地址:https://www.cnblogs.com/gongniue/p/9214480.html
Copyright © 2011-2022 走看看