zoukankan      html  css  js  c++  java
  • 【Python第八篇】线程、进程及协程

    本节内容

    1. 线程
      1. threading语法
      2. 线程锁之LockRlock信号量
      3. Timer定时器
      4. Event事件 
      5. queue队列
    2. 进程
      1. multiprocessing语法
      2. 进程间通讯
      3. 进程池
    3. 协程

    Python 线程

    线程是应用程序中工作的最小单元。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

    Python threading模块

    线程有2种调用方式,如下:

    直接调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import threading
    import time
     
    def sayhi(num): #定义每个线程要运行的函数
     
        print("running on number:%s" %num)
     
        time.sleep(3)
     
    if __name__ == '__main__':
     
        t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
        t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
     
        t1.start() #启动线程
        t2.start() #启动另一个线程
     
        print(t1.getName()) #获取线程名
        print(t2.getName())

    继承式调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import threading
    import time
     
     
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num = num
     
        def run(self):#定义每个线程要运行的函数
     
            print("running on number:%s" %self.num)
     
            time.sleep(3)
     
    if __name__ == '__main__':
     
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()

    更多方法:

      • start            线程准备就绪,等待CPU调度
      • setName      为线程设置名称
      • getName      获取线程名称
      • setDaemon   设置为后台线程或前台线程(默认)
                           如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                            如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
      • join              逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
      • run              线程被cpu调度后自动执行线程对象的run方法

    线程锁(Lock)

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以线程锁同一时刻允许一个线程执行操作。

    import time
    import threading
     
    def addNum():
        global num #在每个线程中都获取这个全局变量
        print('--get num:',num )
        time.sleep(1)
        lock.acquire() #修改数据前加锁
        num  -=1 #对此公共变量进行-1操作
        lock.release() #修改后释放
     
    num = 100  #设定一个共享变量
    thread_list = []
    lock = threading.Lock() #生成全局锁
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)
     
    for t in thread_list: #等待所有线程执行完毕
        t.join()
     
    print('final num:', num )

    RLock(递归锁)

    说白了就是在一个大锁中还要再包含子锁

    import threading
    import time
       
    gl_num = 0
       
    lock = threading.RLock()
       
    def Func():
        lock.acquire()
        global gl_num
        gl_num +=1
        time.sleep(1)
        print gl_num
        lock.release()
           
    for i in range(10):
        t = threading.Thread(target=Func)
        t.start()
    

    Semaphore(信号量)

    互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

    import threading,time
     
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print("run the thread: %s" %n)
        semaphore.release()
     
    if __name__ == '__main__':
     
        num= 0
        semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
        for i in range(20):
            t = threading.Thread(target=run,args=(i,))
            t.start()
    

    Time(定时器)

    定时器,指定n秒后执行某操作

    from threading import Timer
    
    def hello():
        print("hello, world")
     
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed

    event(事件)

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    • clear:将“Flag”设置为False
    • set:将“Flag”设置为True
    import threading
    
    def do(event):
        print('start')
        event.wait()
        print('execute')
    
    event_obj = threading.Event()
    for i in range(10):
        t = threading.Thread(target=do, args=(event_obj,))
        t.start()
    
    event_obj.clear()
    inp = input('input:')
    if inp == 'true':
        event_obj.set()
    
    import threading,time
    
    event = threading.Event()
    
    def lighter():
        counter = 0
        event.set()  # 设置绿灯
        while True:
            if counter > 5 and counter <10:  # 变为红灯
                event.clear()  # 把标志位清了
                print("33[41;1m red light is on...33[0m")
            elif counter > 10:
                event.set()  # 变为绿灯
                counter = 0
            else:
                print("33[42;1m green light is on...33[0m")
            time.sleep(1)
            counter += 1
    
    def car(name):
        while True:
            if event.is_set():  # 代表绿灯
                print('%s is running...' %name)
                time.sleep(1)
            else:
                print('%s seems red light, waiting...' % name)
                event.wait()  # 等待标志位被设定
                print("33[34;1m %s green light is on,start going...33[0m" % name)
    
    light = threading.Thread(target=lighter,)
    light.start()
    car1 = threading.Thread(target=car,args=('Audi',))
    car1.start()
    事件实例:红绿灯

    queue(队列)

    queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

    class queue.Queue(maxsize=0) #先入先出
    class queue.LifoQueue(maxsize=0) #last in fisrt out 
    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

    The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

    exception queue.Empty

    Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

    exception queue.Full

    Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

    Queue.qsize()
    Queue.empty() #return True if empty  
    Queue.full() # return True if full 
    Queue.put(itemblock=Truetimeout=None)

    Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

    Queue.put_nowait(item)

    Equivalent to put(item, False).

    Queue.get(block=Truetimeout=None)

    Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

    Queue.get_nowait()

    Equivalent to get(False).

    Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

    Queue.task_done()

    Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

    If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

    Raises a ValueError if called more times than there were items placed in the queue.

    Queue.join() block直到queue被消费完毕
    队列实现实例:
    import threading,queue
    
    def producer():
        for i in range(10):
            q.put("骨头 %s" % i)
        print("开始等待所有的骨头被取走...")
        q.join()
        print("所有的骨头被取完了...")
    
    def consumer(n):
        while q.qsize() > 0:
            print("%s 取到" % n, q.get())
            q.task_done()  # 告知这个任务执行完了
    
    q = queue.Queue()
    p = threading.Thread(target=producer, )
    p.start()
    c1 = consumer("李晨")

    Python 进程

    import time
    import multiprocessing
    
    def run(name):
        time.sleep(1)
        print('hello',name)
    
    if __name__ == '__main__':
        for i in range(10):
            p = multiprocessing.Process(target=run,args=('chris %s' %i,))
            p.start()
    

    注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。

    进程间通讯

    不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

    Queues

    使用方法跟threading里的queue差不多

    import multiprocessing
    
    def fun(q2):
        q2.put(['chris',22])
    
    if __name__ == '__main__':
        q = multiprocessing.Queue()
        p = multiprocessing.Process(target=fun,args=(q,))
        p.start()
        print(q.get())
        p.join()
    

    Pipes

    import multiprocessing
    
    def fun(conn):
        conn.send([11,22,33])
        print('from farent:',conn.recv())
        conn.close()
    
    if __name__ == '__main__':
        parent_conn,child_conn = multiprocessing.Pipe()
        p = multiprocessing.Process(target=fun,args=(child_conn,))
        p.start()
        print(parent_conn.recv())
        parent_conn.send('i am farther')
        p.join()
    

    Managers

    A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example,

    import multiprocessing
    import os
    
    def f(d,l):
        d['name'] = 'chirs'
        d[os.getpid()] =os.getpid()
    
        l.append(os.getpid())
        print(l)
    
    if __name__ == '__main__':
        manager = multiprocessing.Manager()
        d = manager.dict()
        l = manager.list(range(4))
        p_list = []
        for i in range(10):
            p = multiprocessing.Process(target=f,args=(d,l))
            p.start()
            p_list.append(p)
    
        for n in p_list:
            n.join()
    
        print(d)
        print(l)
    

    Lock

    当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

    如果两个进程没有使用lock来同步,则他们的写操作可能会出现混乱。

    import multiprocessing
    
    def f(l, i):
        l.acquire()
        print('hello world', i)
        l.release()
    
    if __name__ == '__main__':
        lock = multiprocessing.Lock()
    
        for num in range(10):
            multiprocessing.Process(target=f, args=(lock, num)).start()
    

    进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply
    • apply_async
    from multiprocessing import Pool
    import os
    import time
    
    def Foo(i):
        time.sleep(2)
        print('F00进程:',os.getpid())
        return i + 100
    
    def Bar(args):
        print("Bar进程:",args,os.getpid())
    
    if __name__ == '__main__':
        pool = Pool(3)
        print("主进程",os.getpid())
        for i in range(5):
            #pool.apply(func=Foo,args=(i,))   # 串行
            #pool.apply_async(func=Foo, args=(i,))  # 并行
            pool.apply_async(func=Foo,args=(i,),callback=Bar)  # 并行,callback是回调
        print('end')
        pool.close()  # 一定要先关闭进程池,在join
        pool.join()   # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭
    

    协程

    协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程

    协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

    协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

    协程的好处:

    • 无需线程上下文切换的开销
    • 无需原子操作锁定及同步的开销
      •   "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
    • 方便切换控制流,简化编程模型
    • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

    缺点:

    • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
    • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

    协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

    协程满足的条件:

    1. 必须在只有一个单线程里实现并发
    2. 修改共享数据不需加锁
    3. 用户程序里自己保存多个控制流的上下文栈
    4. 一个协程遇到IO操作自动切换到其它协程

    greenlet

    from greenlet import greenlet
    
    def test1():
        print(12)
        gr2.switch()
        print(34)
        gr2.switch()
    
    def test2():
        print(56)
        gr1.switch()
        print(78)
    
    gr1 = greenlet(test1) #启动一个协程
    gr2 = greenlet(test2)
    gr1.switch()

    Gevent 

    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

    import gevent
    
    def foo():
        print('Running in foo')
        gevent.sleep(2)
        print('Explicit context switch to foo again')
    
    
    def bar():
        print('Explicit context to bar')
        gevent.sleep(1)
        print('Implicit context switch back to bar')
    
    
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar),
    ])
    

    遇到IO阻塞时会自动切换任务

    from gevent import monkey
    import gevent
    from  urllib.request import urlopen
    
    monkey.patch_all()
    def f(url):
        print('GET: %s' % url)
        resp = urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
    ])
    

    通过gevent实现单线程下的多socket并发

    import socket,sys,time,gevent
    from gevent import socket, monkey
    
    monkey.patch_all()
    
    def server(port):
        s = socket.socket()
        s.bind(('0.0.0.0', port))
        s.listen(500)
        while True:
            cli, addr = s.accept()
            gevent.spawn(handle_request, cli)
    
    
    def handle_request(conn):
        try:
            while True:
                data = conn.recv(1024)
                print("recv:", data)
                conn.send(data)
                if not data:
                    break
        except Exception as  ex:
            print(ex)
        finally:
            conn.close()
    
    
    if __name__ == '__main__':
        server(8001)
    gevent_socket.py
    import socket
    
    HOST = 'localhost'  # The remote host
    PORT = 8001  # The same port as used by the server
    s = socket.socket()
    s.connect((HOST, PORT))
    while True:
        msg = bytes(input(">>:"), encoding="utf8")
        s.sendall(msg)
        data = s.recv(1024)
        # print(data)
    
        print('Received',data)
    
    s.close()
    socket_client.py

    爬虫

    from urllib import request
    import gevent,time
    from gevent import monkey
    monkey.patch_all() #把当前程序的所有的io操作给我单独的做上标记
    
    def f(url):
        print('GET: %s' % url)
        resp = request.urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    urls = ['https://www.python.org/',
            'https://www.cnblogs.com/',
            'https://github.com/' ]
    
    # time_start = time.time()
    # for url in urls:
    #     f(url)
    # print("同步cost",time.time() - time_start)
    
    async_time_start = time.time()
    gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'http://www.cnblogs.com/'),
        gevent.spawn(f, 'https://github.com/'),
    ])
    print("异步cost",time.time() - async_time_start)
    爬虫
  • 相关阅读:
    js中this.index使用
    js中index()的四种经典用法(转https://blog.csdn.net/superit401/article/details/51726826)
    splice()的用法
    $().click()和$(document).on('click','要选择的元素',function(){})的不同(转https://www.cnblogs.com/sqh17/p/7746418.html)
    transform(转https://blog.csdn.net/qq_24189933/article/details/79293870)
    transition 带的参数什么意思
    最后一次作业----------课程总结
    实训作业---I/O流
    第五次实训
    。。。
  • 原文地址:https://www.cnblogs.com/fuyefeng/p/7127683.html
Copyright © 2011-2022 走看看