zoukankan      html  css  js  c++  java
  • 线程、进程、协程

    线程

    python 的 threading 模块提供了线程的相关操作,线程是应用程序中工作的最小单元。

    import time
    import threading
    
    
    def process(arg):
        time.sleep(1)
        print(arg)
    
    
    if __name__ == '__main__':
    
        for i in range(10):
            t = threading.Thread(target=process, args=(i,))
            t.start()

    通过 threading 模块实现多线程,本身需要至少 10s 的程序会很快执行完成

    start()
        # 线程准备就绪,等待 CPU 的调度
    setDaemon()
        # 为线程命名
    getName()
        # 获取线程名称
    setDaemon(True)
        # 设置是否为后台进程,默认为 False
        # 为 True 时,主线程执行时,后台线程也在执行,当主线程执行完成后,无论后台的子线程是否执行完成,都会被停止
        # 为 False 时,主线程为等待子线程执行完成后再中止
    join()
        # 逐个执行线程,执行完毕后继续向下执行,该方法使多线程失去了意义
    run()
        # 线程被 CPU 调度后自动执行线程对应的 run 方法

    创建多线程的两种方法

    import threading
    
    def f1(x):
        print(x)
    
    # 第一种方法:常用
    t = threading.Thread(target=f1, args=(1,))
    t.start() # t.start() 表示线程已经准备就绪,等待 CPU 调用, CPU 在调用的时候,其实就是调用 run() 方法
    
    # 第二种方法:
    class MyThread(threading.Thread):
        def __init__(self, func, args):
            self.func = func
            self.args = args
            super(MyThread, self).__init__()
    
        def run(self):
            self.func(self.args)
    
    obj = MyThread(f1, 123)
    obj.start()

    线程锁(Lock,RLock)

    threading 的 Lock 和 RLock 方法提供了线程锁的功能,同时只允许一个线程更改数据

    Lock 方法不支持多重锁

    RLock 方法支持多重锁,一般使用 RLock 方法

    由于线程之间的数据是共享的,当多个线程同时修改一个数据时,就会出现脏数据,此时就需要通过线程锁来让线程一个一个修改数据

    """
    创建一个全局变量 NUM = 10, 通过 func 函数每次自减 1,1s 后打印数据
    """
    
    import time
    import threading
    
    NUM = 10
    
    def func():
        # 修改全局变量
        global NUM
        NUM -= 1
        # 等待 1s 后输出 NUM
        time.sleep(1)
        print(NUM)
    
    # 创建 10 个线程执行 func 函数
    for i in range(10):
        t = threading.Thread(target=func)
        t.start()
    
    
    ################ 输出结果 ################
    0
    0
    0
    0
    0
    0
    0
    0
    0
    0

    无线程锁时,输出的值全部都为 0,输出的结果是所有线程修改后的数据。

    通过线程锁,使数据同时只能让一个线程修改

    import time
    import threading
    
    NUM = 10
    
    # 创建锁
    lock = threading.RLock()
    
    def func():
        # 上锁
        lock.acquire()
    
        # 修改全局变量
        global NUM
        NUM -= 1
        # 等待 1s 后输出 NUM
        time.sleep(1)
        print(NUM)
    
        # 解锁
        lock.release()
    
    
    # 创建 10 个线程执行 func 函数
    for i in range(10):
        t = threading.Thread(target=func)
        t.start()
    
    
    ################ 输出结果 ################
    9
    8
    7
    6
    5
    4
    3
    2
    1
    0

    信号量(Semaphore)

    同时允许多个线程

    import time
    import threading
    
    semaphore = threading.Semaphore(2)
    
    def func(x):
        semaphore.acquire()
        time.sleep(1)
        print(x)
        semaphore.release()
    
    # 创建 10 个线程执行 func 函数
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()

    事件(Event)

    通过主线程控制子线程的执行,主要有三个方法 set,wait,clear

    事件处理会在全局定义一个 Flag,如果 Flag 为 False,那么当执行到 event.wait 时就会阻塞,当 Flag 为 True 是,event.wait 就不会阻塞。和红绿灯机制相似

    • claer 将 Flag 设置为 False
    • set 将 Flag 设置为 True
    • wait 阻塞
    import time
    import threading
    
    event = threading.Event()
    
    def func(x):
        event.wait()
        time.sleep(1)
        print(x)
        event.wait()
    
    
    
    # 创建 10 个线程执行 func 函数
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()
    
    # Flag 默认为 False
    inp = input('>')
    if inp:
        event.set()

    条件(Condition)

    线程默认等待,只有满足某个条件时,才会释放 n 个线程

    import threading
    
    condition = threading.Condition()
    
    def func(x):
        condition.acquire()
        condition.wait()
        print(x)
        condition.release()
    
    
    
    # 创建 10 个线程执行 func 函数
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()
    
    while True:
        inp = input('>')
        if inp:
            condition.acquire()
            # 释放 int(inp) 个线程
            condition.notify(int(inp))
            condition.release()

    定时器(Timer)

    指定 n 秒后执行某操作

    import threading
    
    def func():
        print("Hello World")
    
    
    t = threading.Timer(1, func)
    t.start()

    线程池

    使用 concurrent.futures 的 ThreadPoolExecutor 类实现线程池,但是如果获取返回值时会阻塞

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def f(x):
        time.sleep(1)
        print(x)
        # return x+100
    
    # 定义线程池的大小为 2
    pool = ThreadPoolExecutor(5)
    
    for i in range(20):
        r = pool.submit(f, i)
        # print(r.result()) # 获取返回值,如果获取返回值则会阻塞

    自定义线程池

    """
    在线程池初始化的时候就需要先创建线程池
    """
    import queue
    import threading
    
    
    class ThreadPool(object):
    
        def __init__(self, maxsize = 5):
            self.maxsize = maxsize
            self._queue = queue.Queue(self.maxsize)
            for i in range(self.maxsize):
                self._queue.put(threading.Thread)
    
        def get_request(self):
            return self._queue.get()
    
        def add_request(self):
            self._queue.put(threading.Thread)
    
    
    def func(x, pool):
        time.sleep(1)
        print(x)
        pool.add_request()
    
    pool = ThreadPool(2)
    
    for i in range(20):
        thread = pool.get_request()
        t = thread(target=func, args=(i, pool))
        t.start()
    自定义线程池一 
    """
    支持传函数、传参、传回调函数、立即终止所有线程、最大优点:线程的循环利用,节省时间和资源
    【来源网站 http://www.bkjia.com/Pythonjc/1135798.html】
    """
    
    import queue
    import threading
    import contextlib
    import time
    
    StopEvent = object()
    
    
    class ThreadPool(object):
    
        def __init__(self, max_num):
            self.q = queue.Queue()
            self.max_num = max_num
    
            self.terminal = False
            self.generate_list = []
            self.free_list = []
    
        def run(self, func, args, callback=None):
            """
            线程池执行一个任务
            :param func: 任务函数
            :param args: 任务函数所需参数
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
            :return: 如果线程池已经终止,则返回True否则None
            """
    
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
                self.generate_thread()
            w = (func, args, callback,)
            self.q.put(w)
    
        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.call)
            t.start()
    
        def call(self):
            """
            循环去获取任务函数并执行任务函数
            """
            current_thread = threading.currentThread
            self.generate_list.append(current_thread)
    
            event = self.q.get()
            while event != StopEvent:
    
                func, arguments, callback = event
                try:
                    result = func(*arguments)
                    status = True
                except Exception as e:
                    status = False
                    result = e
    
                if callback is not None:
                    try:
                        callback(status, result)
                    except Exception as e:
                        pass
    
                if self.terminal: # False
                    event = StopEvent
                else:
                    with self.worker_state(self.free_list,current_thread):
                        event = self.q.get()
    
        @contextlib.contextmanager
        def worker_state(self,x,v):
            x.append(v)
            try:
                yield
            finally:
                x.remove(v)
    
        def close(self):
            num = len(self.generate_list)
            while num:
                self.q.put(StopEvent)
                num -= 1
    
        # 终止线程(清空队列)
        def terminate(self):
    
            self.terminal = True
    
            while self.generate_list:
                self.q.put(StopEvent)
            self.q.empty()
    
    
    def work(i):
        time.sleep(1)
        print(i)
    
    pool = ThreadPool(10)
    for item in range(50):
        pool.run(func=work, args=(item,))
    
    pool.close()
    自定义线程池二

    进程

    创建子进程需要使用 multiprocess 模块

    from multiprocessing import Process
    
    def f():
        print("Hello")
    
    for i in range(2):
        p = Process(target=f,)
        p.start()

    进程间数据共享

    进程间默认无数据共享,

    import time
    from multiprocessing import Process
    
    li = []
    
    def f(x):
        li.append(x)
        time.sleep(1)
        # 输出各自的结果
        print(li)
    
    for i in range(10):
        p = Process(target=f, args=(i,))
        p.start()

    创建 Array 时必须指定类型

    'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
    通过Array实现进程间数据共享
    通过Manager实现进程间数据共享
    from multiprocessing import Process, Queue
    
    def f():
    
        print(q.get())
    
    if __name__ == '__main__':
        q = Queue()
        q.put(1)
        q.put(2)
        q.put(3)
        q.put(4)
    
        for i in range(4):
            p = Process(target=f)
            p.start()
    通过Queue实现进程间数据共享

    进程池

    from multiprocessing import Process, Pool
    
    def f(x):
        time.sleep(2)
        print(x)
    
    pool = Pool(5)
    for i in range(10):
        pool.apply_async(f,args=(i,))
    
    print('end')
    pool.close()
    pool.join()

    协程

    线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

    协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

    协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

    from greenlet import greenlet
    
    def func1():
        print("func1 11")
        gr2.switch()
        print("func1 22")
        gr2.switch()
    
    def func2():
        print("func2 11")
        gr1.switch()
        print("func2 22")
    
    if __name__ == '__main__':
        gr1 = greenlet(func1)
        gr2 = greenlet(func2)
        gr1.switch()
    通过greenlet实现
    import gevent
    
    def func1():
        print("func1 11")
        gevent.sleep()
        print("func1 22")
    
    
    def func2():
        print("func2 11")
        gevent.sleep()
        print("func2 22")
    
    if __name__ == '__main__':
        gevent.joinall([
            gevent.spawn(func1),
            gevent.spawn(func2),
        ])
    通过gevent实现

    请求 URL 实例:

    from gevent import monkey
    import gevent
    import requests
    
    monkey.patch_all()
    
    def geturl(url):
        print("GET: %s" % url)
    
        resp = requests.get(url)
        data = resp.text
        print("%s types received from %s" % (len(data), url))
    
    
    gevent.joinall([
        gevent.spawn(geturl, 'https://www.baidu.com'),
        gevent.spawn(geturl, 'https://www.python.org'),
        gevent.spawn(geturl, 'https://github.com'),
    ])

     用途

    多线程:用于 IO 密集型操作

    多进程:用于计算密集型操作

    协程:用于线程内部,解决 IO 等待

  • 相关阅读:
    ABP 源码分析目录
    vika维格表模板大赛落幕:3888元大礼包花落谁家?
    OKR太难定?模板大全:产品/研发/运营/销售/行政全都有
    菜鸟成长记(四)----- 实习总结与反思
    菜鸟成长记(三)----- 年终总结与感悟
    菜鸟成长记(二)
    自我激励
    承认
    书评-《当下的力量(珍藏版)》-埃克哈特&#183;托利
    关于吃苦
  • 原文地址:https://www.cnblogs.com/wenchong/p/5906966.html
Copyright © 2011-2022 走看看