zoukankan      html  css  js  c++  java
  • Python 线程 进程 协程

    线程

    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

    threading模块

    python的标准库提供了两个模块用于多线程处理,_thread和threading,_thread是低级模块,threading是高级模块,是对_thread进行了封装。
    启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行:
    线程有两种调用方式:直接调用和继承式调用
     
    直接调用
    import time
    import threading
    def sayhi(num): # 定义每隔线程都要运行的函数
        print('%s is say hi' %num)
    
        time.sleep(3)
    
    if __name__ == '__main__':
        t1 = threading.Thread(target = sayhi, args = [1, ]) # 调用Thread方法生成一个线程实例,第一个参数tartget表示进程要执行的函数,args表示要传递给进程函数的参数
        t2 = threading.Thread(target = sayhi, args = [2, ])
        t1.start() # 启动线程
        t2.start()
        t1.join() # 等待子线程完毕,这句话的意思就等待一个线程执行完在执行这句话后面的逻辑,join方法还可以接收一个超时时间参数,表示最多等待多长时间,超过这个时间就不等了,继续执行下面的语句,注意,是不等待,不是中断线程的执行
        t2.join()
        print(t1.getName()) # getName()表示获取线程的名称,默认Thread-1、Thread-2...这种命名方式
        print(t2.getName())

    继承式调用

    import threading
    import time
    class Mythreading(threading.Thread): ''' 定义一个类,继承自threading.Thread ''' def __init__(self, num): ''' 初始化方法 :param num: :return: ''' threading.Thread.__init__(self) self.num = num def run(self): ''' 重写run方法,也就是每个线程要执行的函数 :return: ''' print('%s is say hi' %self.num) time.sleep(5) if __name__ == '__main__': t1 = Mythreading(1) # 用刚才定义的类创建进程对象 t2 = Mythreading(2) t1.start() t2.start() t1.join() t2.join() print(t1.getName()) print(t2.getName())

    守护线程

    守护线程的方式,是的主线程执行完毕强制结束下面的子线程的运行。

    在python中,主线程和子线程是并行执行的,主线程会等待子线程执行完毕后再退出(主线程在启动子线程后会继续向下执行,并不等待子线程)。

    import time
    import threading
     
    def child(n):
        '''
        子线程执行的函数
        :param n:
        :return:
        '''
        print('[%s]------running----
    ' % n)
        time.sleep(2)
        print('--done--')
     
    def main():
        '''
        主线程要执行的函数
        :return:
        '''
        for i in range(2): # 循环生成2个子线程
            t = threading.Thread(target=child,args=[i,])
            t.start()
            print('starting thread', t.getName())
     
     
    m = threading.Thread(target=main,args=[]) # 创建主线程对象
    m.setDaemon(True) #将主线程设置为Daemon线程,它退出时,其它子线程会同时退出,不管是否执行完任务
    m.start() # 启动主线程
    m.join() # 阻塞等待主线程执行完毕,这里不起作用
    #time.sleep(3)
    print("---main thread done----")
    线程方法
        start        线程准备就绪,等待CPU调度
        setName      为线程设置名称
        getName      获取线程名称
        setDaemon    设置为后台线程或前台线程(默认)
                        如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                        如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
        join         逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
        run          线程被cpu调度后自动执行线程对象的run方法

    线程锁(互斥锁Mutex)

    import threading
    import time
    def addNum():
        global num # 调用全局变量num
        print('--get num:', num)
        time.sleep(1)
        lock.acquire() #申请锁
        num += 1 # 每个线程都对num进行加1操作
        lock.release() #释放锁 
        print(num)
     
    if __name__ == '__main__':
        lock = threading.Lock()
        num = 0
        thread_list = [] # 初始化一个线程列表
        for i in range(10000): # 循环启动10000个进程
            t = threading.Thread(target = addNum)
            t.start()
            thread_list.append(t) # 加入到线程列表中
        for t in thread_list: # 循环等待线程列表里的所有线程结束
            t.join()
        print(num) # 打印num的最终值

    python3.x已经修复了这个问题,不加锁结果也是正确的

     
    RLock 递归锁

    当一个大锁中还要再包含子锁的时候,如果再用threading.Lock的话,程序锁和钥匙会出现对不上的情况,这时候就需要用到递归锁

    import threading,time
     
    def run1():
        print("grab the first part data")
        lock.acquire()
        global num
        num +=1
        lock.release()
        return num
    def run2():
        print("grab the second part data")
        lock.acquire()
        global  num2
        num2+=1
        lock.release()
        return num2
    def run3():
        lock.acquire()
        res = run1()
        print('--------between run1 and run2-----')
        res2 = run2()
        lock.release()
        print(res,res2)
     
     
    if __name__ == '__main__':
     
        num,num2 = 0,0
        lock = threading.RLock()
        for i in range(10):
            t = threading.Thread(target=run3)
            t.start()
     
    while threading.active_count() != 1:
        print(threading.active_count())
    else:
        print('----all threads done---')
        print(num,num2)
    View Code

    信号量(Semaphore)

    刚才说的锁也是互斥锁,同时只能有一个线程操作,而Semaphore可以同时允许一定数量的线程更改数据。

    import threading,time
      
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print("run the thread: %s
    " %n)
        semaphore.release()
      
    if __name__ == '__main__':
      
        num= 0
        semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
        for i in range(20):
            t = threading.Thread(target=run,args=(i,))
            t.start()
      
    while threading.active_count() != 1:
        pass #print threading.active_count()
    else:
        print('----all threads done---')
        print(num)
    View Code

    Events

    事件可以理解为就是一个信号,他只有两个状态可以理解为真和假,常用方法如下

        set():相当于设置为真
    
        clear():相当于设置为假
    
        isSet():判断是否为真
    
        wait():等待事件置为真(阻塞)
    View Code

    通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

    import threading
    def light():
        '''
        信号灯进程的线程执行的函数
        :return:
        '''
        import time
        if not event.isSet(): # 判断是否为真,如果不为真就设置为真,也就是一上来是绿灯
            event.set()
        count = 0 # 初始化计数器,可以理解为红绿灯之间的等待时间
        while True: # 无限制循环下去
            if count < 10:
                # 0-10秒是绿灯
                print('33[42;1m--green light on--33[0m')
            elif count < 13:
                # 10-12是黄灯
                print('33[43;1m--yellow light on--33[0m')
            elif count < 20:
                # 14-19秒是红灯
                if event.isSet(): # 判断如果为真,就设置成假的
                    event.clear()
                print('33[41;1m--red light on--33[0m')
            else:
                # 第20秒的时候计数器归零,并从新设置为真
                count = 0
                if not event.isSet():
                    event.set()
            time.sleep(1)
            count += 1 # 计数器加1
     
    def car(n):
        '''
        汽车进程要执行的函数
        :param n:
        :return:
        '''
        import time
        while True:
            time.sleep(1) # 每隔1秒检查一下红绿灯状态
            if event.isSet():
                # 如果为真就runing
                print('car [%s] is running...' %n)
            else:
                # 否则就waiting
                print('car [%s] is waiting for the red light...' %n)
                event.wait() # 等待事件变为真
     
    if __name__ == '__main__':
        event = threading.Event() # 创建Event对象
        Light = threading.Thread(target = light) # 创建信号灯线程
        Light.start() # 启动线程
        for i in range(3): # 循环创建3个汽车线程对象,并启动
            t = threading.Thread(target = car, args = [i,])
            t.start()
    View Code

    queue

    实现解耦、队列;先进先出,后进后出(当get不到数据时,会一直卡着等待数据)

    import queue
     
     
    q = queue.Queue()
    for i in range(10):
        q.put(i)
     
    for t in range(10):
        print(q.get())
         
    # 0
    # 1
    # 2
    # 3
    # 4
    # 5
    # 6
    # 7
    # 8
    # 9
    View Code
    queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
    
    class queue.Queue(maxsize=0) #先入先出
    class queue.LifoQueue(maxsize=0) #last in fisrt out 
    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
    Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
    
    The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).
    
    exception queue.Empty
    Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.
    
    exception queue.Full
    Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.
    
    Queue.qsize()
    Queue.empty() #return True if empty  
    Queue.full() # return True if full 
    Queue.put(item, block=True, timeout=None)
    Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).
    
    Queue.put_nowait(item)
    Equivalent to put(item, False).
    
    Queue.get(block=True, timeout=None)
    Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).
    
    Queue.get_nowait()
    Equivalent to get(False).
    
    Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
    
    Queue.task_done()
    Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
    
    If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
    
    Raises a ValueError if called more times than there were items placed in the queue.
    
    Queue.join() block直到queue被消费完毕
    
    更多..
    View Code

    Timer

    定时器,指定n秒后执行某操作

    from threading import Timer
     
     
    def hello():
        print("hello, world")
     
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed

     

    进程

    程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

     

    multiprocessing模块

    multiprocessing模块提供了一个Process类来代表一个进程对象

    from multiprocessing import Process
    import os
     
     
    def run_proc(name):
        # 子进程要执行的函数
        print('Run child process %s (%s)...' % (name, os.getpid())) # os.getpid()表示获得当前进程的pid
     
    if __name__=='__main__':
        print('Parent process %s.' % os.getpid()) # 打印父进程的pid
        p = Process(target=run_proc, args=('test',)) # 创建进程对象,参数结构和多线程一样
        print('Child process will start.')
        p.start() # 启动子进程
        p.join() # 阻塞等待子进程执行完毕
        print('Child process end.')
    View Code

     

    进程间通信

    Queue

    不同进程间内存是不共享,所以多进程不能像多线程一样通过全局变量(当然全局变量也是不提倡的),所以只能通过队列,多进程模块也自带一个队列Queue,使用方法和threading里的queue差不多

    from multiprocessing import Process, Queue
     
    def f(q):
        q.put([42, None, 'hello'])
     
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    View Code
    Pipe

    管道,可以理解为两个进程之间的一个桥梁

    from multiprocessing import Process, Pipe
     
    def f(conn):
        conn.send([42, None, 'hello']) # 网管道里传递数据
        conn.close()
     
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe() # 一个是父进程的管道对象,一个是子进程的对象,自己成往里面send,父进程对象recv,有点像socket
        p = Process(target=f, args=(child_conn,)) # 把管道对象作为参数传递给子进程
        p.start()
        print(parent_conn.recv())   # 接收管道里的数据并打印出来
        p.join()
    View Code

    有人会说既然可以往子进程要执行的而函数传递参数,直接通过这个参数取子进程传递过来的数据就好了,比如可以用列表等可变数据类型(字符串和数值型等不可变类型的数据,想都不要想,统一进程都做不到)为啥还用管道或队列

    from multiprocessing import Process, Pipe
     
    def f(conn, strinfo):
        conn.send([42, None, 'hello']) # 网管道里传递数据
        conn.close() # 关闭管道
        strinfo.append('child')
     
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe() # 一个是父进程的管道对象,一个是子进程的对象,自己成往里面send,父进程对象recv,有点像socket
        strinfo = ['parent']
        p = Process(target=f, args=(child_conn, strinfo)) # 把管道对象作为参数传递给子进程
        p.start()
        print(parent_conn.recv())   # 接收管道里的数据并打印出来
        print(strinfo)
        p.join()
    View Code

    Managers 

    #Manager 进程间共享数据
     
    import multiprocessing
    import os
     
    def f(d,l):
        d["1"] = 1
        d["2"] = 2
        l.append(os.getpid())
     
     
    if __name__ == "__main__":
        manager = multiprocessing.Manager()
        d = manager.dict()   #创建一个字典,进程间可以共享数据
        l = manager.list()
        p_list = []
        for i in range(10):
            p = multiprocessing.Process(target=f,args=(d,l,))
            p.start()
            p_list.append(p)
        for t in p_list:
            t.join()
     
        print(d)
        print(l)
     
    #输出
    # {'2': 2, '1': 1}
    # [516, 3628, 6076, 5020, 5396, 4752, 6072, 3608, 3704, 5124]
    View Code
    进程同步

    Without using the lock output from the different processes is liable to get all mixed up.

    from multiprocessing import Process, Lock
     
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
     
    if __name__ == '__main__':
        lock = Lock()
     
        for num in range(10):
            Process(target=f, args=(lock, num)).start()
    View Code

    进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程, 如果进程池序列没有可提供的进程,那么就会等待,知道有可用进程为止

    Pool模块有两种常用的启动进程的方法

    apply和apply_async,从字面上理解是apply_async是异步的,其实就是apply_async支持把一个函数作为参数传递进去,当进程函数执行完的时候可以通过return一个值,这个值,会自动作为参数传递个传递进来的函数,并执行该函数,我们称之为回调(callback)

    from  multiprocessing import Pool, freeze_support
    import time
     
    def Foo(i):
        '''
        子进程执行的函数
        :param i: 
        :return: 
        '''
        time.sleep(2)
        return i+100
     
    def Bar(arg):
        '''
        子进程回调函数
        :param arg:
        :return:
        '''
        print('-->exec done:',arg)
     
    if __name__ == '__main__': # 这个在windows环境中绝对不能省略否则会报错
        freeze_support()
        pool = Pool(5) # 创建进程池对象
     
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,), callback=Bar)
            # pool.apply(func=Foo, args=(i,))
        print('end')
        pool.close()
        pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

    协程

    协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程

    协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

    协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

    协程的好处:

    • 无需线程上下文切换的开销
    • 无需原子操作锁定及同步的开销
    • "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
    • 方便切换控制流,简化编程模型
    • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

    缺点:

    • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
    • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

    Gevent

    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

    import gevent
     
    def foo():
        print('Running in foo')
        gevent.sleep(0)
        print('Explicit context switch to foo again')
     
    def bar():
        print('Explicit context to bar')
        gevent.sleep(0)
        print('Implicit context switch back to bar')
     
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar),
    ])

    遇到IO操作自动切换

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    from gevent import monkey
    monkey.patch_all()  #monkey.patch_all()执行后可以识别urllib里面的I/0操作
    import gevent
    import urllib.request
    
    def f(url):
        print('GET: %s' % url)
        resp = urllib.request.urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://github.com/'),
    ])
    View Code

    通过gevent实现单线程下的多socket并发

    server side

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    import sys
    import time
    import gevent
    
    from gevent import socket, monkey
    
    #monkey.patch_all()
    
    
    def server(port):
        s = socket.socket()
        s.bind(('0.0.0.0', port))
        s.listen(500)
        while True:
            cli, addr = s.accept()
            gevent.spawn(handle_request, cli)
    
    
    def handle_request(conn):
        try:
            while True:
                data = conn.recv(1024)
                print("recv:", data)
                conn.send(data)
                if not data:
                    conn.shutdown(socket.SHUT_WR)
        except Exception as ex:
            print(ex)
        finally:
            conn.close()
    
    if __name__ == '__main__':
        server(50007)
    View Code

    client side   

    import socket
     
    HOST = 'localhost'    # The remote host
    PORT = 50007         # The same port as used by the server
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((HOST, PORT))
    while True:
        msg = bytes(input(">>:"),encoding="utf8")
        s.sendall(msg)
        data = s.recv(1024)
        #print(data)
     
        print('Received', repr(data))
    s.close()
    View Code

    并发100个sock连接

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    import socket
    import threading
    
    def sock_conn():
    
        client = socket.socket()
        client.connect(("localhost",50007))
        count = 0
        while True:
            client.send( ("hello %s" %count).encode("utf-8"))
            data = client.recv(1024)
            print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
            count +=1
        client.close()
    
    
    for i in range(1000):
        t = threading.Thread(target=sock_conn)
        t.start()
    View Code
  • 相关阅读:
    unordered_set
    树的所有实现
    各类算法模板
    单链表全部实现(绝对史上最完整 附例题)
    求最长回文子串
    无重复的最长子串
    秋叶集
    1451. 重新排列句子中的单词
    152. 乘积最大子数组
    JVM总结的部分内容
  • 原文地址:https://www.cnblogs.com/sxlnnnn/p/6375243.html
Copyright © 2011-2022 走看看