zoukankan      html  css  js  c++  java
  • Python之路第十天,高级(2)-多线程,多进程,协程

    线程

    threading

    threading模块对象 描述
    Thread 表示一个线程的执行对象
    Lock 锁原语对象
    RLock 可重入锁对象,使单线程可再次获得已经获得了的锁(递归锁定)
    Condition 条件变量能让一个钱程停下来,等待其它线程满足了某个“条件”,如状态的改变或值的改变
    Event 通用的条件变量。多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活
    Semaphore 为等待锁的线程提供了一个类似“等候室”的结构
    BoundedSemaphore 与Semaphore类似,只是它不允许超过初始值
    Timer 写Thread类似,只是它要等待一段时间后才开始运行

    Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

    #!/usr/bin/env python3
    
    import threading
    import time
      
    def show(arg):
        time.sleep(1)
        print('thread'+str(arg))
      
    for i in range(10):
        t = threading.Thread(target=show, args=(i,))
        t.start()
      
    print('main thread stop')
    

    上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

    threading模块中Thread类的方法:

    函数 描述
    start() 开始线程的执行
    run() 定义线程的功能函数(一般会被子类重写)
    join(timeout=None) 程序挂起,直到线程结束,如果给了timeout,则最多阻塞timeout秒
    getName() 返回线程的名字
    setName() 设置线程的名字
    isAlive() 布尔标志,表示这个线程是否在运行中
    isDaemon() 返回线程的daemon标志
    setDaemon() 把线程的标志设置为daemonic(一定要在start()函数前调用)

    用Thread类,你可以用多种方法来创建线程。

    • 创建一个Thread类的实例,传给它一个函数;
    • 创建一个Thread类的实例,传递给它一个可调用的类对象;
    • 从Thread派生出一个子类,创建这个子类的实例。

    上面那一段代码用的是第一种方法,下面用第三种方法来创建线程

    自定义线程类:

    import threading
    import time
    
    
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num = num
    
        def run(self):
            """
            定义每个线程要运行的函数,执行start()方法的时候这个函数会自动执行
            :return: None
            """
    
            print("running on number:%s" % self.num)
    
            time.sleep(3)
    
    if __name__ == '__main__':
    
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()
    

    线程锁(LOCK,RLOCK)

    由于线程之间是进行随机调度,并且每个线程可能只执行n条之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

    未使用锁:

    import threading
    import time
    
    NUM = 0
    
    
    def func():
        global NUM
        time.sleep(1)
        NUM += 1
        print(NUM)
    
    for i in range(10):
        t = threading.Thread(target=func, args=())
        t.start()
    
    print('主线程结束')
    

    使用锁:

    import threading
    import time
    
    NUM = 0
    lock = threading.RLock()
    
    
    def func():
        lock.acquire()
        global NUM
        NUM += 1
        time.sleep(1)
        print(NUM)
        lock.release()
    
    for i in range(10):
        t = threading.Thread(target=func)
        t.start()
    

    条件(Condition)

    使得线程等待,只有满足某条件时,才释放n个线程

    import threading
    
    
    def run(n):
        con.acquire()
        con.wait()
        print("run the thread: %s" % n)
        con.release()
    
    if __name__ == '__main__':
    
        con = threading.Condition()
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            t.start()
    
        while True:
            num = input('>')
            if num == 'q':
                break
            con.acquire()
            con.notify(int(num))
            con.release()
    

    事件(Event)

    Python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait()方法时便不再阻塞。

    • clear:将“Flag”设置为False
    • set:将“Flag”设置为True
    import threading
    
    
    def do(event):
        print('start')
        event.wait()
        print('execute')
    
    
    event_obj = threading.Event()
    for i in range(10):
        t = threading.Thread(target=do, args=(event_obj,))
        t.start()
    
    event_obj.clear()
    var = input('请输入:')
    if var == 'true':
        event_obj.set()
    

    信号量(BoundedSemaphore)

    互斥锁 同时只允许一个线程更改数据,而BoundedSemaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

    import threading,time
    
    
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print("run the thread: %s" % n)
        semaphore.release()
    
    if __name__ == '__main__':
    
        num = 0
        # 最多允许5个线程同时运行
        semaphore = threading.BoundedSemaphore(5)
        for i in range(20):
            t = threading.Thread(target=run, args=(i,))
            t.start()
    

    定时器(Timer)

    定时器,指定n秒后执行某操作

    from threading import Timer
     
     
    def func():
        print("hello, world")
    
    # 一秒之后执行func函数
    t = Timer(1, func)
    t.start()
    

    进程

    Process类中的方法和Thread中的方法名一致

    先来一个例子:

    from multiprocessing import Process
    import threading
    import time
      
    def foo(num):
        print('say hi', num)
      
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()
    

    进程间的数据共享

    先来一个例子证明各个进程之前是无法共享数据的

    from multiprocessing import Process
    
    li = []
    
    
    def foo(num):
        li.append(num)
        print('say hi', li)
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=foo, args=(i,))
            p.start()
    
        print('ending', li)
    
    

    进程间的数据共享之Array

    此数组类似于C语言中的链表,数组中的类型对应表,和C语言的数据类型一致

    'c': ctypes.c_char,  'u': ctypes.c_wchar,
        'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
        'h': ctypes.c_short, 'H': ctypes.c_ushort,
        'i': ctypes.c_int,   'I': ctypes.c_uint,
        'l': ctypes.c_long,  'L': ctypes.c_ulong,
        'f': ctypes.c_float, 'd': ctypes.c_double
    
    
    from multiprocessing import Process
    from multiprocessing import Array
    
    
    def func(num, arg):
    
        arg[num] = num + 100
        for item in arg:
            print(item)
        print('================')
    
    if __name__ == "__main__":
        li = Array('i', 10)
        for i in range(10):
            p = Process(target=func, args=(i, li,))
            p.start()
    

    进程之间数据共享之dict

    from multiprocessing import Process, Manager
    
    
    def func(num, arg):
        arg[num] = 100 + num
        print(arg.values())
    
    if __name__ == '__main__':
        manage = Manager()
        dic = manage.dict()
        for i in range(2):
            p = Process(target=func, args=(i, dic))
            p.start()
            p.join()
    

    进程锁

    from multiprocessing import Process, Array, RLock
    
    
    def func(l, tmp, num):
        """
        将第0个数加100
        """
        l.acquire()
        tmp[0] = 100 + num
        for item in tmp:
            print(num, '----->', item)
        l.release()
    
    
    if __name__ == '__main__':
        lock = RLock()
        temp = Array('i', [11, 22, 33, 44])
    
        for i in range(20):
            p = Process(target=func, args=(lock, temp, i,))
            p.start()
    
    

    进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply
    • apply_async
    from multiprocessing import Pool
    import time
    
    
    def foo(num):
        time.sleep(2)
        return num + 100
    
    
    def bar(arg):
        print(arg)
    
    
    # print pool.apply(Foo,(1,))
    # print pool.apply_async(func =Foo, args=(1,)).get()
    if __name__ == '__main__':
        pool = Pool(5)
        for i in range(10):
            pool.apply_async(func=foo, args=(i,), callback=bar)
        print('end')
        pool.close()
        # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        pool.join()
    

    协程

    线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

    协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

    协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

    from greenlet import greenlet
    
    
    def test1():
        print(12)
        gr2.switch()
        print(34)
        gr2.switch()
    
    
    def test2():
        print(56)
        gr1.switch()
        print(78)
    
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()
    
    import gevent
    
    
    def foo():
        print('Running in foo')
        gevent.sleep(0)
        print('Explicit context switch to foo again')
    
    
    def bar():
        print('Explicit context to bar')
        gevent.sleep(0)
        print('Implicit context switch back to bar')
    
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar),
    ])
    

    遇到IO操作自动切换:

    from gevent import monkey; monkey.patch_all()
    import gevent
    import requests
    
    
    def f(url):
        print('GET: %s' % url)
        resp = requests.get(url)
        data = resp.text
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://github.com/'),
    ])
    
    
  • 相关阅读:
    Codeforces Round #592 (Div. 2)C. The Football Season(暴力,循环节)
    Educational Codeforces Round 72 (Rated for Div. 2)D. Coloring Edges(想法)
    扩展KMP
    poj 1699 Best Sequence(dfs)
    KMP(思路分析)
    poj 1950 Dessert(dfs)
    poj 3278 Catch That Cow(BFS)
    素数环(回溯)
    sort与qsort
    poj 1952 buy low buy lower(DP)
  • 原文地址:https://www.cnblogs.com/zhangxunan/p/5666894.html
Copyright © 2011-2022 走看看