zoukankan      html  css  js  c++  java
  • python中多线程,多进程,多协程概念及编程上的应用

    1, 多线程

    1.  线程是进程的一个实体,是CPU进行调度的最小单位,他是比进程更小能独立运行的基本单位。
    2.  线程基本不拥有系统资源,只占用一点运行中的资源(如程序计数器,一组寄存器和栈),但是它可以与同属于一个进程的其他线程共享全部的资源。
    3.  提高程序的运行速率,上下文切换快,开销比较少,但是不够稳定,容易丢失数据,形成死锁。

    直接上代码:

    import time
    import threading
    
    # 函数1用时2秒
    def fun1():
        time.sleep(2)
        print(threading.current_thread().name, time.ctime())
    
    # 函数2用时4秒
    def fun2():
        time.sleep(4)
        print(threading.current_thread().name, time.ctime())
    
    # 函数3用时6秒
    def fun3():
        time.sleep(6)
        print('hello python', time.ctime())
    
    th1 = threading.Thread(target=fun1)
    th2 = threading.Thread(target=fun2)
    th3 = threading.Thread(target=fun3)
    
    th1.start()
    th2.start()
    th3.start()

    打印结果:

    Thread-1 Mon Jan  7     11:01:52 2019
    Thread-2 Mon Jan  7     11:01:54 2019
    hello python Mon Jan  7 11:01:56 2019

    解析:从结果看出,他们同一时间 11:01:50开始执行,分别用了不同的时间结束

    接着往下看,添加join阻塞线程

    ''''''

    th1.start() th1.join() th2.start() th2.join() th3.start() th3.join()

    打印结果:

    Thread-1 Mon Jan 7     11:19:00 2019
    Thread-2 Mon Jan 7     11:19:04 2019
    hello python Mon Jan 7 11:19:10 2019

    我们看到这三线程按顺序依次执行。

    我们接着看看线程的方法使用:

    threading.enumerate()            #列举线程,返回列表,其中里面会有一条主线程
    threading.activeCount()          #查看线程运行个数
    threading.current_thread().name     #查看当前运行线程名称
    join()                      #阻塞线程运行

    我们接着看第二种开线程的方式:

    import threading
    import time
    
    class MyThread(threading.Thread):
        def run(self):
            for i in range(3):
                time.sleep(1)
                msg = "I'm "+self.name+' @ '+str(i) #name属性中保存的是当前线程的名字
                print(msg)
    
    if __name__ == '__main__':
        t = MyThread()
        t.setName('yangzhenyu')
        a = t.isAlive()
        print(a)
        print(t.getName())
    
        t.start()
        b = t.isAlive()
        print(b)

    打印结果:

    False
    yanzghenyu
    True
    I'm yanzghenyu @ 0
    I'm yanzghenyu @ 1
    I'm yanzghenyu @ 2

    方法总结:

    t.setName()    #设置运行线程名称,不指定默认Thread-1 
    t.getName()    #获取线程名称
    t.isAlive()        #判断线程是否运行,返回布尔类型   

    线程间共享全局变量:

    import threading
    import time
    
    n = 100
    
    def work01():
        global n
        for i in range(3):
            n += 1
        print(n)                          //103
    
    def work02():
        global n
        print(n)                         //103
    
    print(n)                             //100
    
    t1 = threading.Thread(target=work01)
    t1.start()
    time.sleep(
    1)
    t2
    = threading.Thread(target=work02) t2.start()

    关于线程锁 

    1. 用threading.Lock()创建锁,用acquire()申请锁,每次只有一个线程获得锁,其他线程必须等此线程release()后才能获得锁
    2. RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。
    3. 注意:如果使用RLock,那么acquire和release必须成对出现,即同一线程中调用了n次acquire,必须调用n次的release才能真正释放所占用的琐

    下面例子中我们用到的是Lock(),当加完锁之后,该方法同一时间内只能被一个线程调用。

    import threading
    mylock=threading.Lock()#创建锁
    num = 0
    def add_num(name):
        global num
        while True:
            mylock.acquire()#申请锁也就是加锁
            print('thread %s locked! num=%d'%(name,num))
            if num>=5:
                print('thread %s release! num=%d'%(name,num))
                mylock.release()#释放锁
                return 
            num += 1
            print('thread %s release! num = %d'%(name,num))
            mylock.release()
    
    t1 = threading.Thread(target=add_num,args=('A',))
    t2 = threading.Thread(target=add_num,args=('B',))
    t1.start()
    t2.start()

    打印结果:

    thread A locked! num=0
    thread A release! num = 1
    thread A locked! num=1
    thread A release! num = 2
    thread A locked! num=2
    thread A release! num = 3
    thread A locked! num=3
    thread A release! num = 4
    thread A locked! num=4
    thread A release! num = 5
    thread A locked! num=5
    thread A release! num=5
    thread B locked! num=5
    thread B release! num=5

     

    cpu io密集型适合用多线程进行开发



    关于进程:

    • 进程是系统进行资源分配的最小单位,每个进程都有自己的独立内存空间,不用进程通过进程间通信来通信。
    • 但是进程占据独立空间,比较重量级,所以上下文进程间的切换开销比较大,但是比较稳定安全。

    进程创建:

    第一种创建进程的方式:

    from multiprocessing import Process
    import time
    import random
    import os
    
    
    def piao(name):
        print("%s is piaoping"%name)
        time.sleep(random.randint(0,1))
        print("%s is piao end"%name)
    
    if __name__ == '__main__':
        print("CPU的个数是:%d"%os.cpu_count())
        p1 = Process(target=piao,args=("alex",),name="进程1")
        print(p1.name)
        p1.start()
        print("父进程!") #执行速度要远快于建立新进程的时间

    打印结果:

    CPU的个数是:2
    进程1
    父进程!
    alex is piaoping
    alex is piao end

    第二种创建进程的方式:

    from multiprocessing import Process
    import time
    import random
    
    #继承Process类,并实现自己的run方法
    
    class Piao(Process):
        def __init__(self,name):
            #必须调用父类的init方法
            super().__init__()
            self.name = name
    
        def run(self):
            print("%s is piaoing"%self.name)
            time.sleep(random.randint(1,3))
            print("%s is piaoeng"%self.name)
    
    
    if __name__ == '__main__':
        p1 = Piao("Alex")
        #开辟一个新的进程实际上就是执行本进程所对应的run()方法
        p1.start()
        print("主进程!")

    结果:

    主进程!
    Alex is piaoing
    Alex is piaoeng

    解析:join括号中不携带参数,表示父进程在这个位置要等待p1进程执行完成后,如果指定参数,也就是等待时间s,那么主进程将在这个时间内结束,

         用is_active()  方法即可检测进程的状态,不加join() 返回True,表示进程还在进行。

    进程的方法,

    start()    启动进程实例(创建子进程);
    terminate():不管任务是否完成,立即终止;
    name:    当前进程实例别名,默认为Process-N,N为从1开始递增的整数;
    pid:     当前进程实例的PID值;  os.getpid()
    is_alive(): 判断进程实例是否还在执行;
    join([timeout]):是否等待进程实例执行结束,或等待多少秒;

    进程池:

      在程序实际处理问题时,忙时会有成千上万个任务需要执行,闲时有零星任务,创建时需要消耗时间,销毁也需要时间,

    即使开启成千上万个进程,操作系统也不能 让他同时执行。这里就用到了进程池,用于管理小块内存的申请与释放。

    ,

    1,上代码:

    from multiprocessing.pool import Pool
    from time import sleep
    
    
    def fun(a):
        sleep(1)
        print(a)
    
    if __name__ == '__main__':
        p = Pool()   # 这里不加参数,但是进程池的默认大小,等于电脑CPU的核数
                # 也是创建子进程的个数,也是每次打印的数字的个数
        for i in range(10):
            p.apply_async(fun, args=(i,))
    p.close() p.join()
    # 等待所有子进程结束,再往后执行 print("end")

    2,callback 举例:

    from multiprocessing import Process,Pool
    
    
    def func(i):
        i+=1
        return i#普通进程处理过的数据返回给主进程p1
    
    def call_back(p1):
        p1+=1
        print(p1)
    
    if __name__ == '__main__':
        p = Pool()
        for i in range(10):
            p1 = p.apply_async(func,args=(i,),callback = call_back)#p调用普通进程并且接受其返回值,将返回值给要执行的回调函数处理
        p.close()
        p.join()

    解析:   1,p.apply ( func,args = ())     同步的效率,也就是说池中的进程一个一个的去执行任务

          p.apply_async( func,args = () , callback = None) : 异步的效率,也就是池中的进程一次性都去执行任务.

        2,异步处理任务时 : 必须要加上close和join. 进程池的所有进程都是守护进程(主进程代码执行结束,守护进程就结束). 

        3,func : 进程池中的进程执行的任务函数

           4,args : 可迭代对象性的参数,是传给任务函数的参数

           5,callback : 回调函数,就是每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,

                    由回调函数进行进一步处理,回调函数只异步才有,同步没有.回调函数是父进程调用.

    3. map( func,iterable)  (该方法经常用到爬虫)

    from multiprocessing import Pool
    
    def func(num):
        num += 1
        print(num)
        return num
    
    if __name__ == '__main__':
        p = Pool(2)
        res = p.map(func,[i for i in range(100)])
        # p.close()#map方法自带这两种功能
        # p.join()
        print('主进程中map的返回值',res)

    func : 进程池中的进程执行的任务函数

    iterable : 可迭代对象,是把可迭代对象那个中的每个元素一次传给任务函数当参数.

    map方法自带close和join

    进程间的通信:

    1)队列

    from multiprocessing import Queue,Process
    import os,time,random
    
    #添加数据函数
    def proc_write(queue,urls):
        print("进程(%s)正在写入..."%(os.getpid()))
        for url in urls:
            queue.put(url)
            print("%s被写入到队列中"%(url))
            time.sleep(random.random()*3)
    
    #读取数据函数
    def proc_read(queue):
        print("进程(%s)正在读取..."%(os.getpid()))
    
        while True:
            url = queue.get()
            print("从队列中提取到:%s"%(url))
    
    if __name__ =="__main__":
    queue
    = Queue() proc_writer1 = Process(target=proc_write,args=(queue,["ur1","ur2","ur3","ur4"])) proc_writer2 = Process(target=proc_write,args=(queue,["ur5","ur6","ur7","ur8"])) proc_reader = Process(target=proc_read,args=(queue,)) proc_writer1.start() proc_writer1.join() proc_writer2.start() proc_writer2.join() proc_reader.start() proc_reader.terminate()

    生产者与消费者模式(线程间的通信):

    from queue import Queue
    import threading,time
    
    
    class Producer(threading.Thread):
        def run(self):
            global queue
            count = 0
            while True:
                if queue.qsize() < 1000:
                    for i in range(100):
                        count = count +1
                        msg = '生成产品'+str(count)
                        queue.put(msg)
                        print(msg)
                time.sleep(0.5)
    
    class Consumer(threading.Thread):
        def run(self):
            global queue
            while True:
                if queue.qsize() > 100:
                    for i in range(3):
                        msg = self.name + '消费了 '+queue.get()
                        print(msg)
                time.sleep(1)
    
    
    if __name__ == '__main__':
        queue = Queue()
    
        for i in range(500):
            queue.put('初始产品'+str(i))
        for i in range(2):
            p = Producer()
            p.start()
        for i in range(5):
            c = Consumer()
            c.start()

    2) 进程间的通信(管道)

    from multiprocessing import Pipe,Process
    import random,time,os
    
    def proc_send(pipe,urls):
        for url in urls:
            print("进程(%s)发送:%s"%(os.getpid(),url))
            pipe.send(url)
            time.sleep(random.random())
    
    def proc_recv(pipe):
        while True:
            print("进程(%s)接收到:%s"%(os.getpid(),pipe.recv()))
            time.sleep(random.random())
    
    if __name__ == "__main__":
        pipe = Pipe()
        p1 = Process(target=proc_send,args=(pipe[0],["url_"+str(i) for i in range(10)],))   
        p2 = Process(target=proc_recv,args=(pipe[1],))
        p1.start()
        p2.start()
        p1.join()
        p2.terminate()

    解析:

      pipe用于两个进程间的通信,两个进程分别位于管道的两端,Pipe方法返回(conn1,conn2)代表一个管道的两端,

      Pipe方法有dumplex参数,若该参数为True,管道为全双工模式,

      若为Fasle,conn1只负责接收消息,conn2只负责发送消息.send和recv方法分别是发送和接收消息的方法



    协程:

    协程:是更小的执行单位,是一种轻量级的线程,协程的切换只是单纯的操作CPU的上下文,所以切换速度特别快,且耗能小。

    gevent是第三方库,通过greenlet实现协程,其基本思想是:

    当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

    由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

    from gevent import monkey
    
    monkey.patch_all()  # 用来在运行时动态修改已有的代码,而不需要修改原始代码。
    
    import gevent
    import requests
    
    
    def f(url):
        print('GET: %s' % url)
        html = requests.get(url).text
        print(url, len(html))
    
    
    gevent.joinall([
        gevent.spawn(f, 'http://i.maxthon.cn/'),  # 先执行这个函数,发送请求,等待的时候发送第二个请求
        gevent.spawn(f, 'http://www.jianshu.com/u/3cfeb3395a95'),
        gevent.spawn(f, 'http://edu.51cto.com/?jydh')])

    运行结果:

    GET: http://i.maxthon.cn/
    GET: http://www.jianshu.com/u/3cfeb3395a95
    GET: http://edu.51cto.com/?jydh
    http://i.maxthon.cn/ 461786
    http://edu.51cto.com/?jydh 353858
    http://www.jianshu.com/u/3cfeb3395a95 597

    从结果看,3个网络操作是并发执行的,而且结束顺序不同,但只有一个线程。

    使用gevent,可以获得极高的并发性能,但gevent只能在Unix/Linux下运行,在Windows下不保证正常安装和运行。

  • 相关阅读:
    【2019.7.10】树上差分 杂[LCA 倍增][树上差分 点差分 边差分]
    【luogu4145】 上帝造题的七分钟2 / 花神游历各国 [线段树]
    【luogu1198】 [JSOI2008]最大数 [线段树]
    【luogu2783】 有机化学之神偶尔会做作弊 [tarjan 缩点][LCA]
    【luogu3388】 【模板】割点(割顶)[tarjan 割点]
    【luogu2272】 [ZJOI2007]最大半连通子图 [tarjan 缩点][拓扑排序]
    【luogu2194】HXY烧情侣 [tarjan 缩点]
    【luogu3627】 [APIO2009]抢掠计划 [tarjan 缩点][最短路]
    【luogu3398】 仓鼠找sugar [LCA 倍增]
    【luogu2746】 [USACO5.3]校园网Network of Schools [tarjan 缩点]
  • 原文地址:https://www.cnblogs.com/lvye001/p/10232377.html
Copyright © 2011-2022 走看看