zoukankan      html  css  js  c++  java
  • python中多线程,多进程,多协程概念及编程上的应用

    1, 多线程

    1.  线程是进程的一个实体,是CPU进行调度的最小单位,他是比进程更小能独立运行的基本单位。
    2.  线程基本不拥有系统资源,只占用一点运行中的资源(如程序计数器,一组寄存器和栈),但是它可以与同属于一个进程的其他线程共享全部的资源。
    3.  提高程序的运行速率,上下文切换快,开销比较少,但是不够稳定,容易丢失数据,形成死锁。

    直接上代码:

    import time
    import threading
    
    # 函数1用时2秒
    def fun1():
        time.sleep(2)
        print(threading.current_thread().name, time.ctime())
    
    # 函数2用时4秒
    def fun2():
        time.sleep(4)
        print(threading.current_thread().name, time.ctime())
    
    # 函数3用时6秒
    def fun3():
        time.sleep(6)
        print('hello python', time.ctime())
    
    th1 = threading.Thread(target=fun1)
    th2 = threading.Thread(target=fun2)
    th3 = threading.Thread(target=fun3)
    
    th1.start()
    th2.start()
    th3.start()

    打印结果:

    Thread-1 Mon Jan  7     11:01:52 2019
    Thread-2 Mon Jan  7     11:01:54 2019
    hello python Mon Jan  7 11:01:56 2019

    解析:从结果看出,他们同一时间 11:01:50开始执行,分别用了不同的时间结束

    接着往下看,添加join阻塞线程

    ''''''

    th1.start() th1.join() th2.start() th2.join() th3.start() th3.join()

    打印结果:

    Thread-1 Mon Jan 7     11:19:00 2019
    Thread-2 Mon Jan 7     11:19:04 2019
    hello python Mon Jan 7 11:19:10 2019

    我们看到这三线程按顺序依次执行。

    我们接着看看线程的方法使用:

    threading.enumerate()            #列举线程,返回列表,其中里面会有一条主线程
    threading.activeCount()          #查看线程运行个数
    threading.current_thread().name     #查看当前运行线程名称
    join()                      #阻塞线程运行

    我们接着看第二种开线程的方式:

    import threading
    import time
    
    class MyThread(threading.Thread):
        def run(self):
            for i in range(3):
                time.sleep(1)
                msg = "I'm "+self.name+' @ '+str(i) #name属性中保存的是当前线程的名字
                print(msg)
    
    if __name__ == '__main__':
        t = MyThread()
        t.setName('yangzhenyu')
        a = t.isAlive()
        print(a)
        print(t.getName())
    
        t.start()
        b = t.isAlive()
        print(b)

    打印结果:

    False
    yanzghenyu
    True
    I'm yanzghenyu @ 0
    I'm yanzghenyu @ 1
    I'm yanzghenyu @ 2

    方法总结:

    t.setName()    #设置运行线程名称,不指定默认Thread-1 
    t.getName()    #获取线程名称
    t.isAlive()        #判断线程是否运行,返回布尔类型   

    线程间共享全局变量:

    import threading
    import time
    
    n = 100
    
    def work01():
        global n
        for i in range(3):
            n += 1
        print(n)                          //103
    
    def work02():
        global n
        print(n)                         //103
    
    print(n)                             //100
    
    t1 = threading.Thread(target=work01)
    t1.start()
    time.sleep(
    1)
    t2
    = threading.Thread(target=work02) t2.start()

    关于线程锁 

    1. 用threading.Lock()创建锁,用acquire()申请锁,每次只有一个线程获得锁,其他线程必须等此线程release()后才能获得锁
    2. RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。
    3. 注意:如果使用RLock,那么acquire和release必须成对出现,即同一线程中调用了n次acquire,必须调用n次的release才能真正释放所占用的琐

    下面例子中我们用到的是Lock(),当加完锁之后,该方法同一时间内只能被一个线程调用。

    import threading
    mylock=threading.Lock()#创建锁
    num = 0
    def add_num(name):
        global num
        while True:
            mylock.acquire()#申请锁也就是加锁
            print('thread %s locked! num=%d'%(name,num))
            if num>=5:
                print('thread %s release! num=%d'%(name,num))
                mylock.release()#释放锁
                return 
            num += 1
            print('thread %s release! num = %d'%(name,num))
            mylock.release()
    
    t1 = threading.Thread(target=add_num,args=('A',))
    t2 = threading.Thread(target=add_num,args=('B',))
    t1.start()
    t2.start()

    打印结果:

    thread A locked! num=0
    thread A release! num = 1
    thread A locked! num=1
    thread A release! num = 2
    thread A locked! num=2
    thread A release! num = 3
    thread A locked! num=3
    thread A release! num = 4
    thread A locked! num=4
    thread A release! num = 5
    thread A locked! num=5
    thread A release! num=5
    thread B locked! num=5
    thread B release! num=5

     

    cpu io密集型适合用多线程进行开发



    关于进程:

    • 进程是系统进行资源分配的最小单位,每个进程都有自己的独立内存空间,不用进程通过进程间通信来通信。
    • 但是进程占据独立空间,比较重量级,所以上下文进程间的切换开销比较大,但是比较稳定安全。

    进程创建:

    第一种创建进程的方式:

    from multiprocessing import Process
    import time
    import random
    import os
    
    
    def piao(name):
        print("%s is piaoping"%name)
        time.sleep(random.randint(0,1))
        print("%s is piao end"%name)
    
    if __name__ == '__main__':
        print("CPU的个数是:%d"%os.cpu_count())
        p1 = Process(target=piao,args=("alex",),name="进程1")
        print(p1.name)
        p1.start()
        print("父进程!") #执行速度要远快于建立新进程的时间

    打印结果:

    CPU的个数是:2
    进程1
    父进程!
    alex is piaoping
    alex is piao end

    第二种创建进程的方式:

    from multiprocessing import Process
    import time
    import random
    
    #继承Process类,并实现自己的run方法
    
    class Piao(Process):
        def __init__(self,name):
            #必须调用父类的init方法
            super().__init__()
            self.name = name
    
        def run(self):
            print("%s is piaoing"%self.name)
            time.sleep(random.randint(1,3))
            print("%s is piaoeng"%self.name)
    
    
    if __name__ == '__main__':
        p1 = Piao("Alex")
        #开辟一个新的进程实际上就是执行本进程所对应的run()方法
        p1.start()
        print("主进程!")

    结果:

    主进程!
    Alex is piaoing
    Alex is piaoeng

    解析:join括号中不携带参数,表示父进程在这个位置要等待p1进程执行完成后,如果指定参数,也就是等待时间s,那么主进程将在这个时间内结束,

         用is_active()  方法即可检测进程的状态,不加join() 返回True,表示进程还在进行。

    进程的方法,

    start()    启动进程实例(创建子进程);
    terminate():不管任务是否完成,立即终止;
    name:    当前进程实例别名,默认为Process-N,N为从1开始递增的整数;
    pid:     当前进程实例的PID值;  os.getpid()
    is_alive(): 判断进程实例是否还在执行;
    join([timeout]):是否等待进程实例执行结束,或等待多少秒;

    进程池:

      在程序实际处理问题时,忙时会有成千上万个任务需要执行,闲时有零星任务,创建时需要消耗时间,销毁也需要时间,

    即使开启成千上万个进程,操作系统也不能 让他同时执行。这里就用到了进程池,用于管理小块内存的申请与释放。

    ,

    1,上代码:

    from multiprocessing.pool import Pool
    from time import sleep
    
    
    def fun(a):
        sleep(1)
        print(a)
    
    if __name__ == '__main__':
        p = Pool()   # 这里不加参数,但是进程池的默认大小,等于电脑CPU的核数
                # 也是创建子进程的个数,也是每次打印的数字的个数
        for i in range(10):
            p.apply_async(fun, args=(i,))
    p.close() p.join()
    # 等待所有子进程结束,再往后执行 print("end")

    2,callback 举例:

    from multiprocessing import Process,Pool
    
    
    def func(i):
        i+=1
        return i#普通进程处理过的数据返回给主进程p1
    
    def call_back(p1):
        p1+=1
        print(p1)
    
    if __name__ == '__main__':
        p = Pool()
        for i in range(10):
            p1 = p.apply_async(func,args=(i,),callback = call_back)#p调用普通进程并且接受其返回值,将返回值给要执行的回调函数处理
        p.close()
        p.join()

    解析:   1,p.apply ( func,args = ())     同步的效率,也就是说池中的进程一个一个的去执行任务

          p.apply_async( func,args = () , callback = None) : 异步的效率,也就是池中的进程一次性都去执行任务.

        2,异步处理任务时 : 必须要加上close和join. 进程池的所有进程都是守护进程(主进程代码执行结束,守护进程就结束). 

        3,func : 进程池中的进程执行的任务函数

           4,args : 可迭代对象性的参数,是传给任务函数的参数

           5,callback : 回调函数,就是每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,

                    由回调函数进行进一步处理,回调函数只异步才有,同步没有.回调函数是父进程调用.

    3. map( func,iterable)  (该方法经常用到爬虫)

    from multiprocessing import Pool
    
    def func(num):
        num += 1
        print(num)
        return num
    
    if __name__ == '__main__':
        p = Pool(2)
        res = p.map(func,[i for i in range(100)])
        # p.close()#map方法自带这两种功能
        # p.join()
        print('主进程中map的返回值',res)

    func : 进程池中的进程执行的任务函数

    iterable : 可迭代对象,是把可迭代对象那个中的每个元素一次传给任务函数当参数.

    map方法自带close和join

    进程间的通信:

    1)队列

    from multiprocessing import Queue,Process
    import os,time,random
    
    #添加数据函数
    def proc_write(queue,urls):
        print("进程(%s)正在写入..."%(os.getpid()))
        for url in urls:
            queue.put(url)
            print("%s被写入到队列中"%(url))
            time.sleep(random.random()*3)
    
    #读取数据函数
    def proc_read(queue):
        print("进程(%s)正在读取..."%(os.getpid()))
    
        while True:
            url = queue.get()
            print("从队列中提取到:%s"%(url))
    
    if __name__ =="__main__":
    queue
    = Queue() proc_writer1 = Process(target=proc_write,args=(queue,["ur1","ur2","ur3","ur4"])) proc_writer2 = Process(target=proc_write,args=(queue,["ur5","ur6","ur7","ur8"])) proc_reader = Process(target=proc_read,args=(queue,)) proc_writer1.start() proc_writer1.join() proc_writer2.start() proc_writer2.join() proc_reader.start() proc_reader.terminate()

    生产者与消费者模式(线程间的通信):

    from queue import Queue
    import threading,time
    
    
    class Producer(threading.Thread):
        def run(self):
            global queue
            count = 0
            while True:
                if queue.qsize() < 1000:
                    for i in range(100):
                        count = count +1
                        msg = '生成产品'+str(count)
                        queue.put(msg)
                        print(msg)
                time.sleep(0.5)
    
    class Consumer(threading.Thread):
        def run(self):
            global queue
            while True:
                if queue.qsize() > 100:
                    for i in range(3):
                        msg = self.name + '消费了 '+queue.get()
                        print(msg)
                time.sleep(1)
    
    
    if __name__ == '__main__':
        queue = Queue()
    
        for i in range(500):
            queue.put('初始产品'+str(i))
        for i in range(2):
            p = Producer()
            p.start()
        for i in range(5):
            c = Consumer()
            c.start()

    2) 进程间的通信(管道)

    from multiprocessing import Pipe,Process
    import random,time,os
    
    def proc_send(pipe,urls):
        for url in urls:
            print("进程(%s)发送:%s"%(os.getpid(),url))
            pipe.send(url)
            time.sleep(random.random())
    
    def proc_recv(pipe):
        while True:
            print("进程(%s)接收到:%s"%(os.getpid(),pipe.recv()))
            time.sleep(random.random())
    
    if __name__ == "__main__":
        pipe = Pipe()
        p1 = Process(target=proc_send,args=(pipe[0],["url_"+str(i) for i in range(10)],))   
        p2 = Process(target=proc_recv,args=(pipe[1],))
        p1.start()
        p2.start()
        p1.join()
        p2.terminate()

    解析:

      pipe用于两个进程间的通信,两个进程分别位于管道的两端,Pipe方法返回(conn1,conn2)代表一个管道的两端,

      Pipe方法有dumplex参数,若该参数为True,管道为全双工模式,

      若为Fasle,conn1只负责接收消息,conn2只负责发送消息.send和recv方法分别是发送和接收消息的方法



    协程:

    协程:是更小的执行单位,是一种轻量级的线程,协程的切换只是单纯的操作CPU的上下文,所以切换速度特别快,且耗能小。

    gevent是第三方库,通过greenlet实现协程,其基本思想是:

    当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

    由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

    from gevent import monkey
    
    monkey.patch_all()  # 用来在运行时动态修改已有的代码,而不需要修改原始代码。
    
    import gevent
    import requests
    
    
    def f(url):
        print('GET: %s' % url)
        html = requests.get(url).text
        print(url, len(html))
    
    
    gevent.joinall([
        gevent.spawn(f, 'http://i.maxthon.cn/'),  # 先执行这个函数,发送请求,等待的时候发送第二个请求
        gevent.spawn(f, 'http://www.jianshu.com/u/3cfeb3395a95'),
        gevent.spawn(f, 'http://edu.51cto.com/?jydh')])

    运行结果:

    GET: http://i.maxthon.cn/
    GET: http://www.jianshu.com/u/3cfeb3395a95
    GET: http://edu.51cto.com/?jydh
    http://i.maxthon.cn/ 461786
    http://edu.51cto.com/?jydh 353858
    http://www.jianshu.com/u/3cfeb3395a95 597

    从结果看,3个网络操作是并发执行的,而且结束顺序不同,但只有一个线程。

    使用gevent,可以获得极高的并发性能,但gevent只能在Unix/Linux下运行,在Windows下不保证正常安装和运行。

  • 相关阅读:
    Chrome cookies folder
    Fat URLs Client Identification
    User Login Client Identification
    Client IP Address Client Identification
    HTTP Headers Client Identification
    The Personal Touch Client Identification 个性化接触 客户识别
    购物车 cookie session
    购物车删除商品,总价变化 innerHTML = ''并没有删除节点,内容仍存在
    453
    购物车-删除单行商品-HTMLTableElement.deleteRow()
  • 原文地址:https://www.cnblogs.com/lvye001/p/10232377.html
Copyright © 2011-2022 走看看