zoukankan      html  css  js  c++  java
  • 【Python】进程和线程

    前言

    1. 什么是“多任务”的操作系统?

    简单地说,就是操作系统可以同时运行多个任务。

    2. 单核CPU是如何执行多任务的?

    操作系统轮流让多个任务交替执行,任务1执行0.01s,切换到任务2,任务2执行0.01s,再切换到任务3,执行0.01s……这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。

    3. 如何通俗的解释进程和线程?

    对于操作系统来说,一个任务就是一个“进程(Process)”,例如打开一个浏览器就是启动了浏览器进程,打开了Word就启动了一个Word进程。但是启动Word可能不止同时干一件事,例如:可以同时进行打字、排版、检查等,在一个进程内部,要同时干多件事,就需要同时执行多个“子任务”,我们把进程内的这些“子任务”就称为“线程(Thread)”。由于每个进程至少要干一件事,所以,一个进程至少有一个线程。真正地同时执行多线程需要多核CPU才可能实现。

    4. 实现“多任务”的方式有哪些?

    • 多进程模式:启动多个进程,每个进程虽然只有一个线程,但多个进程可以一块执行多个任务。
    • 多线程模式:启动一个进程,在一个进程内启动多个线程,这样,多个线程也可以一块执行多个任务。
    • 多进程+多线程模式:启动多个进程,每个进程再启动多个线程,这样同时执行的任务就更多了,当然这种模型更复杂,实际很少采用。

    ## 多进程 ###1. multiprocessing是跨平台版本的多进程模块,multiprocessing模块提供了一个Process类来代表一个进程对象 ```python from multiprocessing import Process import os

    def run_proc(name):
    """子进程执行的代码

    :param name:
    :return:
    """
    print('Run child process %s (%s)...' % (name, os.getpid()))
    

    if name == 'main':
    print('Parent process %s.' % os.getpid())
    # 创建子进程时,只需要传入一个执行函数和函数的参数
    p = Process(target=run_proc, args=('test', ))
    print('Child process will start.')
    # 启动进程
    p.start()
    # 等待子进程结束后再继续往下运行,通常用于进程间的同步
    p.join()
    print('Child process end.')


    Parent process 17120.
    Child process will start.
    Run child process test (9316)...
    Child process end.

    <br/>
    ### 2. 如果要启动大量的子进程,可以用进程池的方式批量创建子进程
    ```python
    from multiprocessing import Pool
    import os
    import time
    import random
    
    
    def long_time_task(name):
        print('Run task %s (%s)...' % (name, os.getpid()))
        start_time = time.time()
        time.sleep(random.random() * 3)
        end_time = time.time()
        print('Task %s run %0.2f seconds...' % (name, (end_time - start_time)))
    
    
    if __name__ == '__main__':
        print('Parent process %s...' % os.getpid())
        p = Pool(8)
        for i in range(9):
            p.apply_async(long_time_task, args=(i, ))
        print('Waiting for all subprocess done...')
        # 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了
        p.close()
        p.join()
        print('All subprocess done...')
    
    ------------------------------------
    Parent process 872...
    Waiting for all subprocess done...
    Run task 0 (9048)...
    Run task 1 (12644)...
    Run task 2 (10024)...
    Run task 3 (6660)...
    Run task 4 (5736)...
    Run task 5 (8396)...
    Run task 6 (11776)...
    Run task 7 (7180)...
    Task 4 run 0.20 seconds...
    Run task 8 (5736)...
    Task 5 run 0.65 seconds...
    Task 6 run 0.71 seconds...
    Task 0 run 1.68 seconds...
    Task 3 run 1.74 seconds...
    Task 2 run 1.79 seconds...
    Task 1 run 1.87 seconds...
    Task 7 run 2.54 seconds...
    Task 8 run 2.93 seconds...
    All subprocess done...
    
    

    ### 3. subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出
    import subprocess
    
    print('$ nslookup www.python.org')
    r = subprocess.call(['nslookup', 'www.python.org'])
    print('Exit code..', r)
    
    ------------------------------------
    $ nslookup www.python.org
    Server:  localhost
    Address:  192.168.1.1
    
    Name:    dualstack.python.map.fastly.net
    Addresses:  2a04:4e42:36::223
    	  151.101.108.223
    Aliases:  www.python.org
    
    Exit code.. 0
    

    ### 4. Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据 ```python from multiprocessing import Process, Queue import os import time import random

    以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据

    写数据进程执行的代码

    def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
    print('Put %s to queue...' % value)
    q.put(value)
    time.sleep(random.random())

    读数据进程执行的代码

    def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
    value = q.get(True)
    print('Get %s from queue...' % value)

    if name == 'main':
    # 父进程创建Queue,并传给各个子进程
    q = Queue()
    pw = Process(target=write, args=(q, ))
    pr = Process(target=read, args=(q, ))
    # 启动子进程pw,写入
    pw.start()
    # 启动子进程pr,读取
    pr.start()
    # 等待pw结束
    pw.join()
    # pr进程是死循环,无法等待其结束,只能强行终止
    pr.terminate()


    Process to write: 13676
    Process to read: 6688
    Put A to queue...
    Get A from queue...
    Put B to queue...
    Get B from queue...
    Put C to queue...
    Get C from queue...

    <br/>
    ### 小结
    * 在Unix/Linux下,可以使用fork()调用实现多进程。
    * 要实现跨平台的多进程,可以使用multiprocessing模块。
    * 进程间通信是通过Queue、Pipes等实现的。
    
    <br/>
    ## 多线程
    多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,
    而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
    ```python
    import time
    import threading
    
    
    # 多个线程同时操作一个变量的例子
    # 假定这是你的银行存款
    balance = 0
    
    # 创建一个锁
    lock = threading.Lock()
    
    
    def change_it(n):
        # 先存后取,结果应该为0
        global balance
        balance = balance + n
        balance = balance - n
    
    
    def run_thread_one(n):
        """未加锁
    
        :param n:
        :return:
        """
        for i in range(1000000):
            change_it(n)
    
    
    def run_thread_two(n):
        """加锁
    
        :param n:
        :return:
        """
        for i in range(1000000):
            # 先获取到锁
            # 当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止
            lock.acquire()
            try:
                change_it(n)
            finally:
                # 获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally来确保锁一定会被释放
                # 用完了一定要释放锁
                lock.release()
    
    
    t1 = threading.Thread(target=run_thread_two, args=(5, ))
    t2 = threading.Thread(target=run_thread_two, args=(8, ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)
    

    ### 分析 定义了一个共享变量balance,初始值为0,并且启动两个线程,先存后取,理论上结果应该为0, 但是,由于线程的调度是由操作系统决定的,当t1、t2交替执行时,只要循环次数足够多,balance的结果就不一定是0了。 原因是因为高级语言的一条语句在CPU执行时是若干条语句。 究其原因,是因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了
    ### 解决 要确保balance计算正确,就要给change_it()上一把锁,当某个线程开始执行change_it()时, 该线程因为获得了锁,因此其他线程不能同时执行change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。 由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。 创建一个锁就是通过threading.Lock()来实现。
    ### 总结 锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行, 坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。 其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。
    ## ThreadLocal 在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。 但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦。这时候,ThreadLocal的出现解决了这个问题。 ```python import threading

    创建全局ThreadLocal对象

    local_school = threading.local()

    def process_student():
    # 获取当前线程关联的student
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

    def process_thread(name):
    # 绑定ThreadLocal的student
    local_school.student = name
    process_student()

    t1 = threading.Thread(target=process_thread, args=('Ali', ), name='Thread-A')
    t2 = threading.Thread(target=process_thread, args=('Bob', ), name='Thread-B')
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    Hello, Ali (in Thread-A)
    Hello, Bob (in Thread-B)

    <br/>
    ### 总结
    * ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
    * 一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
    
    <br/>
    ## 进程 VS. 线程
    ### 进程和线程
    * 多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。(当然主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低)著名的Apache最早就是采用多进程模式。
    多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下,用fork调用还行,在Windows下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题。
    * 多线程模式通常比多进程快一点,但是也快不到哪去,而且,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。在Windows上,如果一个线程执行的代码出了问题,你经常可以看到这样的提示:“该程序执行了非法操作,即将关闭”,其实往往是某个线程出了问题,但是操作系统会强制结束整个进程。在Windows下,多线程的效率比多进程要高。
    
    <br/>
    ### 计算密集型和IO密集型
    * 计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。
    * 涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。
    
    <br/>
    ### 异步IO
    * 现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。
    * 对应到Python语言,单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。
    
    <br/>
    ## 分布式进程
    Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。
    ```python
    # task_master.py
    # 举个例子
    # 如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
    import random
    import time
    import queue
    from multiprocessing.managers import BaseManager
    from multiprocessing import freeze_support
    
    # 任务个数
    task_number = 10
    
    # 发送任务的队列
    task_queue = queue.Queue(task_number)
    
    # 接收结果的队列
    result_queue = queue.Queue(task_number)
    
    
    def get_task():
        return task_queue
    
    
    def get_result():
        return result_queue
    
    
    # 从BaseManager继承的QueueManager
    class QueueManager(BaseManager):
        pass
    
    
    def test():
        # 把两个Queue都注册到网络上,callable参数关联了Queue对象
        QueueManager.register('get_task', callable=get_task)
        QueueManager.register('get_result', callable=get_result)
        # 绑定端口5000,设置验证码'qwer'
        manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'qwer')
        # 启动Queue
        manager.start()
        try:
            # 获得通过网络访问的Queue对象
            task = manager.get_task()
            result = manager.get_result()
            # 放几个任务进去
            for i in range(task_number):
                n = random.randint(0, 10000)
                print('Put task %d...' % n)
                task.put(n)
    
            # 每秒检测一次是否所有任务都被执行完
            while not result.full():
                time.sleep(1)
    
            # 从result队列读取结果
            print('Try get results...')
            for i in range(10):
                r = result.get(timeout=10)
                print('Result: %s' % r)
        except:
            print('Manager Error!')
        finally:
            manager.shutdown()
            print('master exit.')
    
    
    if __name__ == '__main__':
        # 缓解多进程
        freeze_support()
        test()
    
    ------------------------------------
    Put task 9248...
    Put task 5123...
    Put task 342...
    Put task 2535...
    Put task 8494...
    Put task 7938...
    Put task 5643...
    Put task 6313...
    Put task 2393...
    Put task 2168...
    Try get results...
    
    Result: 9248 * 9248 = 85525504
    Result: 5123 * 5123 = 26245129
    Result: 342 * 342 = 116964
    Result: 2535 * 2535 = 6426225
    Result: 8494 * 8494 = 72148036
    Result: 7938 * 7938 = 63011844
    Result: 5643 * 5643 = 31843449
    Result: 6313 * 6313 = 39853969
    Result: 2393 * 2393 = 5726449
    Result: 2168 * 2168 = 4700224
    master exit.
    
    # task_worker.py
    import time
    import queue
    import sys
    from multiprocessing.managers import BaseManager
    
    
    # 创建类似的QueueManager
    class QueueManager(BaseManager):
        pass
    
    
    # 由于这个QueueManager只从网络上获取Queue,所以注册时只能只能提供名字
    QueueManager.register('get_task')
    QueueManager.register('get_result')
    
    # 连接到服务器,也就是运行task_master.py的机器
    # 端口和验证码注意保持一致与task_master.py设置的完全一致
    connect = QueueManager(address=('127.0.0.1', 5000), authkey=b'qwer')
    
    try:
        # 从网络连接
        connect.connect()
    except:
        print('connect error!')
        sys.exit()
    
    # 获取Queue的对象
    task = connect.get_task()
    result = connect.get_result()
    
    # 从task队列取任务,并把结果写入result队列
    while not task.empty():
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    
    # 处理结束
    print('worker exit.')
    
    
    if __name__ == '__main__':
        pass
    
    ------------------------------------
    run task 9248 * 9248...
    run task 5123 * 5123...
    run task 342 * 342...
    run task 2535 * 2535...
    run task 8494 * 8494...
    run task 7938 * 7938...
    run task 5643 * 5643...
    run task 6313 * 6313...
    run task 2393 * 2393...
    run task 2168 * 2168...
    worker exit.
    
    
  • 相关阅读:
    关于Spring MVC跨域
    Linux(Debian) 上安装tomcat并注册服务开机自启动
    fastjson SerializerFeature详解
    Mysql group_concat函数被截断的问题
    eclipse中show whitespace characters显示代码空格,TAB,回车 导致代码乱恶心
    Oracle数据库之动态SQL
    tomcat日志报Invalid message received with signature的解决办法
    用HttpClient发送HTTPS请求报SSLException: Certificate for <域名> doesn't match any of the subject alternative names问题的解决
    高可用集群架构的演进
    database disk image is malformed
  • 原文地址:https://www.cnblogs.com/parzulpan/p/11959293.html
Copyright © 2011-2022 走看看