zoukankan      html  css  js  c++  java
  • 九、进程与线程

    进程:一个任务就是一个进程(Process)

    线程:进程内的“子任务”称为线程(Thread)

    线程是最小的执行单元,而进程由至少一个线程组成。多进程和多线程的程序涉及到同步、数据共享的问题

    一、多进程

    • fork():调用一次,返回两次,把当前进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回,子进程永远返回0,父进程返回子进程的ID。 子进程需要调用 getppid() 拿到父进程的ID。(window下无法调用该函数)
       1 import os
       2 
       3 print('Process (%s) start...' % os.getpid())
       4 # Only works on Unix/Linux/Mac:
       5 pid = os.fork()
       6 if pid == 0:
       7     print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
       8 else:
       9     print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
      10 
      11 输出:
      12 Process (876) start...
      13 I (876) just created a child process (877).
      14 I am child process (877) and my parent is 876.
    • multiprocessing:跨平台版本的多进程模块,提供一个 Process 类来代表一个进程对象
       1 from multiprocessing import Process
       2 import os
       3 #子进程要执行的代码
       4 def run_proc(name):
       5     print('运行子进程中,名字:%s,进程ID:%s...' %(name,os.getpid()))
       6 
       7 if __name__ == '__main__':
       8     print('父进程ID:%s.'%os.getpid())
       9     p = Process(target=run_proc, args=('test',))    #创建一个Process实例;args为元组;
      10     print('开始执行子进程.')
      11     p.start()
      12     p.join()    #等待子进程结束后再继续往下运行,通常用于进程间的同步;
      13     print('结束子进程.')
      14     print(os.getpid())
      15     print(os.getppid())
      16 
      17 输出:
      18 父进程ID:1368.
      19 开始执行子进程.
      20 运行子进程中,名字:test,进程ID:8692...
      21 结束子进程.
      22 1368
      23 3324
    • Pool:进程池,批量创建子进程, Pool对象 调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close() ,调用 close() 之后就不能继续添加新的 Process 了,Pool的默认大小是CPU的核数。
      1 def apply(self, func, args=(), kwds={})函数,相当于‘func(* args,** kwds)’
       1 from multiprocessing import Pool
       2 import os,time,random
       3 
       4 def long_time_task(name):
       5     print('运行任务%s,ID:%s' % (name,os.getpid()))
       6 
       7     start = time.time()              #开始运行:0秒开始
       8     time.sleep(random.random()* 3)   #取一个0.0 - 1.0间的随机浮点数乘以3倍,来作为进程在此停留或者运行的时间
       9     end = time.time()                #经过time.sleep后的时间,相当于end-start = time.sleep(random.random()* 3) 的时间
      10 
      11     print('任务%s运行%0.2f秒。'%(name,(end-start)))
      12 
      13 if __name__ == '__main__':
      14     print('父进程ID:%s' % os.getpid())
      15     p = Pool(4)         # Pool的值在本机默认值为4,表示最多同时执行4个进程,后面的需等前面某个进程完成时才能继续执行;
      16     for i in range(5):
      17         # apply_async会阻塞进程直到返回数据才把进程释放,然后有空余的进程来执行下一个进程(不等于挂起进程)
      18         p.apply_async(long_time_task,args=(i,))
      19     print('等待所有子进程完成...')
      20     p.close()
      21     p.join()
      22     print('所有子进程完成。')
      23 
      24 输出:
      25 父进程ID:5048
      26 等待所有子进程完成...
      27 运行任务0,ID:6840
      28 运行任务1,ID:5232
      29 运行任务2,ID:4884
      30 运行任务3,ID:1900
      31 任务2运行0.24秒。
      32 运行任务4,ID:4884
      33 任务1运行1.10秒。
      34 任务4运行0.97秒。
      35 任务0运行2.08秒。
      36 任务3运行2.13秒。
      37 所有子进程完成。
    • 子进程
       1 import subprocess
       2 
       3 print('$ nslookup www.python.org')
       4 r = subprocess.call(['nslookup', 'www.python.org'])
       5 print('Exit code:', r)
       6 
       7 输出:
       8 $ nslookup www.python.org
       9 Server:        192.168.19.4
      10 Address:    192.168.19.4#53
      11 
      12 Non-authoritative answer:
      13 www.python.org    canonical name = python.map.fastly.net.
      14 Name:    python.map.fastly.net
      15 Address: 199.27.79.223
      16 
      17 Exit code: 0

      通过 communicate() 方法输入对子进程进行输入操作,父进程如果想要和子进程通过  communicate() 方法通信, subprocess.Popen() 里对应的参数必须是  subprocess.PIPE ,如果为默认值 None ,那么子进程使用和父进程相同的标准流文件

      下文相当于在命令行执行命令nslookup,然后手动输入:

      1 set q=mx
      2 python.org
      3 exit
       1 import subprocess
       2 
       3 print('$ nslookup')
       4 p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)    #默认为:=None
       5 output, err = p.communicate(b'set q=mx
      python.org
      exit
      ')
       6 print(output.decode('utf-8'))
       7 print('Exit code:', p.returncode)
       8 
       9 输出:
      10 $ nslookup
      11 Server:        192.168.19.4
      12 Address:    192.168.19.4#53
      13 
      14 Non-authoritative answer:
      15 python.org    mail exchanger = 50 mail.python.org.
      16 
      17 Authoritative answers can be found from:
      18 mail.python.org    internet address = 82.94.164.166
      19 mail.python.org    has AAAA address 2001:888:2000:d::a6
      20 
      21 
      22 Exit code: 0

       subprocess.Popen():创建并返回一个子进程,并在这个进程中执行指定的程序

      subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)
      • args:要执行的命令或可执行文件的路径。一个由字符串组成的序列(通常是列表),列表的第一个元素是可执行程序的路径,剩下的是传给这个程序的参数,如果没有要传给这个程序的参数,args 参数可以仅仅是一个字符串;
      • bufsize:控制 stdin, stdout, stderr 等参数指定的文件的缓冲,和打开文件的 open()函数中的参数 bufsize 含义相同;
      • executable:如果这个参数不是 None,将替代参数 args 作为可执行程序;
      • stdin:指定子进程的标准输入;
      • stdout:指定子进程的标准输出;
      • stderr:指定子进程的标准错误输出;
      • preexec_fn:默认是None,否则必须是一个函数或者可调用对象,在子进程中首先执行这个函数,然后再去执行为子进程指定的程序或Shell。
      • close_fds:布尔型变量,为 True 时,在子进程执行前强制关闭所有除 stdin,stdout和stderr外的文件;
      • shell:布尔型变量,明确要求使用shell运行程序,与参数 executable 一同指定子进程运行在什么 Shell 中——如果executable=None 而 shell=True,则使用 /bin/sh 来执行 args 指定的程序;也就是说,Python首先起一个shell,再用这个shell来解释指定运行的命令。
      • cwd:代表路径的字符串,指定子进程运行的工作目录,要求这个目录必须存在;
      • env:字典,键和值都是为子进程定义环境变量的字符串;
      • universal_newline:布尔型变量,为 True 时,stdout 和 stderr 以通用换行(universal newline)模式打开,
      • startupinfo:见下一个参数;
      • creationfalgs:最后这两个参数是Windows中才有的参数,传递给Win32的CreateProcess API调用
    • 进程间的通信: Python的 multiprocessing 模块包装了底层的机制,提供了Queue 、 Pipes 等多种方式来交换数据
       1 #以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据
       2 from multiprocessing import Process,Queue
       3 import os,time,random
       4 
       5 #写数据进程执行的代码
       6 def write(q):
       7     print('Process to write:%s' % os.getpid())
       8     for value in ['A','B','C']:
       9         print('Put %s to queue...'% value)
      10         q.put(value)
      11         time.sleep(random.random())
      12 
      13 #读数据进程执行的代码
      14 def read(q):
      15     print('Process to read:%s' % os.getpid())
      16     while True:
      17         value = q.get(True)
      18         print('Get %s from queue.' % value)
      19 
      20 if __name__ == '__main__':
      21     #父进程创建Queue,并传给各个子进程
      22     q = Queue()
      23     pw = Process(target=write,args=(q,))
      24     pr = Process(target=read,args=(q,))
      25     #启动子进程pw,写入:
      26     pw.start()
      27     #启动子进程pr,读取:
      28     pr.start()
      29     #等待pw结束
      30     pw.join()
      31     #pr进程里是死循环,无法等待期结束,只能强行终止:
      32     pr.terminate()
      33 
      34 输出:
      35 Process to write:7148
      36 Put A to queue...
      37 Process to read:9040
      38 Get A from queue.
      39 Put B to queue...
      40 Get B from queue.
      41 Put C to queue...
      42 Get C from queue

    二、多线程

      线程包括开始、执行顺序和结束三部分。它有一个指令指针,用于记录当前运行的上下文。当其他线程运行时,它可以被抢占(中断)和临时挂起(也称为睡眠) ——这种做法叫让步yielding。 

    使用两个模块:_thread:低级模块;Threading:高级模块

    启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行

    • Threading:进程默认就会启动一个线程,称为主线程 MainThread ,主线程又可以启动新的线程, current_thread() 函数永远返回当前线程的实例。子线程的名字在创建时指定,下文用 LoopThread 命名子线程。不起名字Python会自动给线程命名为Thread-1Thread-2...
       1 import time, threading
       2 
       3 # 新线程执行:
       4 def loop():
       5     print('thread %s is running...' % threading.current_thread().name)  #返回当前线程的实例
       6     n = 0
       7     while n < 5:
       8         n += 1
       9         print('thread %s >>> %s' % (threading.current_thread().name, n))
      10         time.sleep(1)
      11     print('thread %s ended.' % threading.current_thread().name)
      12 
      13 print('thread %s is running...' % threading.current_thread().name)  #返回主线程的实例:MainThread
      14 t = threading.Thread(target=loop, name='LoopThread')
      15 t.start()
      16 t.join()
      17 print('thread %s ended.' % threading.current_thread().name)
      18 输出:
      19 
      20 thread MainThread is running...
      21 thread LoopThread is running...
      22 thread LoopThread >>> 1
      23 thread LoopThread >>> 2
      24 thread LoopThread >>> 3
      25 thread LoopThread >>> 4
      26 thread LoopThread >>> 5
      27 thread LoopThread ended.
      28 thread MainThread ended.
    • Lock:多线程中,所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改,通过 threading.Lock() 来实现执行单个线程,其它线程需等待被锁的线程释放后才能继续执行。防止多个线程同时运行时修改数据数据错误。  
       1 #不使用Look 的情况,当循环次数较多时,交替执行的线程其运行顺序会被改变,从而使结果发生改变
       2 import time,threading
       3 #假设银行存款
       4 balance = 0
       5 def change_it(n):
       6     global balance  #使用全局变量
       7     balance = balance + n
       8     balance = balance - n
       9 
      10 def run_thread(n):
      11     for i in range(100000):
      12         change_it(n)
      13 
      14 t1 = threading.Thread(target=run_thread,args=(4,))
      15 t2 = threading.Thread(target=run_thread,args=(8,))
      16 t1.start()
      17 t2.start()
      18 t1.join()
      19 t2.join()
      20 print(balance)

      理想运行结果为:t1执行后,再进行t2的执行。结果都为0;而线程由操作系统调用,执行顺序可能被改变:

       1 初始值 balance = 0
       2 #t1,t2同时执行
       3 t1: balance = balance + 4  # balance = 4
       4 t2: balance = balance + 8  # balance = 8
       5 
       6 t1: balance = balance - 4  # balance = 0
       7 #t2执行第二条命令时,balance的值为t1刚执行完的值:0
       8 t2: balance = balance - 8  # balance = 0 - 8 = -8
       9 t2: balance = -8
      10 
      11 结果 balance = -8

      使用Look:结果不受其它线程的影响

       1 import threading
       2 balance = 0
       3 lock = threading.Lock()
       4 
       5 def change_it(n):
       6     global balance  #使用全局变量
       7     balance = balance + n
       8     balance = balance - n
       9 
      10 def run_thread(n):
      11     for i in range(10000):
      12         #先获取锁
      13         lock.acquire()
      14         #使用try来保证获得锁的进程用完一定被释放,不让其它线程称为死线程无法执行
      15         try:
      16             change_it(n)
      17         finally:
      18             #释放锁
      19             lock.release()
      20 
      21 t1 = threading.Thread(target=run_thread,args=(4,))
      22 t2 = threading.Thread(target=run_thread,args=(8,))
      23 t1.start()
      24 t2.start()
      25 t1.join()
      26 t2.join()
      27 print(balance)

       坏处:阻止了多线程并发执行;线程与线程互相获取对方的锁会导致线程挂起,无法执行

    • GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。只能使用一个核或通过C扩展实现多核处理多线程。(多核任务可通过多进程来实现)
    • ThreadLocal:将一个全局变量(如:local_school)作为 ThreadLocal() 的对象,相当于作为一个 dict ,每个线程(Thread)以自身作为key对它都可以读写 student 属性,且互不影响。即 local_school.student() 为局部变量每次都单独赋予每个线程。
       1 import threading
       2 #创建全局变量
       3 local_school = threading.local()
       4 
       5 def process_student():
       6     #当前线程关联的student
       7     std = local_school.student
       8     print('Hello,%s (in %s)'%(std,threading.current_thread().name))
       9 
      10 def process_thread(name):
      11     #绑定threadlocal的student,使赋予每个线程的局部变量student的值都不冲突,单独赋予;
      12     local_school.student = name
      13     process_student()
      14 
      15 t1 = threading.Thread(target=process_thread,args=('Jack',),name='Thread-1')
      16 t2 = threading.Thread(target=process_thread,args=('john',),name='Thread=2')
      17 t1.start()
      18 t2.start()
      19 t1.join()
      20 t2.join()
      21 
      22 输出:
      23 Hello,Jack (in Thread-1)
      24 Hello,john (in Thread=2)

       计算密集型:任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以要高效利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数

      IO密集型:涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。

      异步IO:用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型;单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。

    三、分布式进程

      在Thread和Process中,应优选Process,因为Process更稳定,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。

    multiprocessing模块中的managers子模块支持把多进程分布到多台机器上,依靠网络通信。

     1 # task_master.py
     2 #!/user/bin/pytthon
     3 # -*- coding:utf-8 -*-
     4 #已有一个通过Queue通信的多进程程序在同一台机器上运行,希望把发送任务的进程和处理任务的进程分布到两台机器上;
     5 #原有的Queue继续使用,通过managers模块把Queue通过网络暴露出去,就可让其他机器的进程访问Queue了;服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务
     6 import random,time,queue
     7 from multiprocessing.managers import BaseManager
     8 from multiprocessing import freeze_support
     9 
    10 task_queue =  queue.Queue()  # 发送任务的队列:
    11 result_queue = queue.Queue() # 接收结果的队列:
    12 class QueueManager(BaseManager):  # 从BaseManager继承的QueueManager:
    13     pass
    14 # windows下运行,非Windows系统直接跳到注册部分;
    15 def return_task_queue():
    16     global task_queue
    17     return task_queue  # 返回发送任务队列
    18 def return_result_queue ():
    19     global result_queue
    20     return result_queue # 返回接收结果队列
    21 
    22 def test():
    23     # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象;
    24     # 非Windows下的代码,window下的序列化不能使用匿名函数
    25     #QueueManager.register('get_task_queue', callable=lambda: task_queue)   
    26     #QueueManager.register('get_result_queue', callable=lambda: result_queue)
    27     QueueManager.register('get_task_queue', callable=return_task_queue)
    28     QueueManager.register('get_result_queue', callable=return_result_queue)
    29     # 绑定端口5000, 设置验证码'abc':
    30     #manager = QueueManager(address=('', 5000), authkey=b'abc')
    31     # windows需要写ip地址
    32     manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
    33     manager.start()  # 启动Queue:
    34     # 获得通过网络访问的Queue对象:
    35     task = manager.get_task_queue()
    36     result = manager.get_result_queue()
    37     for i in range(10):   # 放几个任务进去:
    38         n = random.randint(0, 10000)
    39         print('Put task %d...' % n)
    40         task.put(n)
    41     # 从result队列读取结果:
    42     print('Try get results...')
    43     for i in range(10):
    44         # 这里加了异常捕获
    45         try:
    46             r = result.get(timeout=5)
    47             print('Result: %s' % r)
    48         except queue.Empty:
    49              print('result queue is empty.')
    50     # 关闭:
    51     manager.shutdown()
    52     print('master exit.')
    53 if __name__=='__main__':
    54     freeze_support()
    55     print('start!')
    56     test()
     1 # task_worker.py
     2 import time, sys, queue
     3 from multiprocessing.managers import BaseManager
     4 
     5 # 创建类似的QueueManager:
     6 class QueueManager(BaseManager):
     7     pass
     8 
     9 # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
    10 QueueManager.register('get_task_queue')
    11 QueueManager.register('get_result_queue')
    12 
    13 # 连接到服务器,也就是运行task_master.py的机器:
    14 server_addr = '127.0.0.1'
    15 print('Connect to server %s...' % server_addr)
    16 # 端口和验证码注意保持与task_master.py设置的完全一致:
    17 m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
    18 # 从网络连接:
    19 m.connect()
    20 # 获取Queue的对象:
    21 task = m.get_task_queue()
    22 result = m.get_result_queue()
    23 # 从task队列取任务,并把结果写入result队列:
    24 for i in range(10):
    25     try:
    26         n = task.get(timeout=1)
    27         print('run task %d * %d...' % (n, n))
    28         r = '%d * %d = %d' % (n, n, n*n)
    29         time.sleep(1)
    30         result.put(r)
    31     except queue.Empty:
    32         print('task queue is empty.')
    33 # 处理结束:
    34 print('worker exit.')
    task_master.py与task_worker.py得到的结果
     1 # task_master.py
     2 start!
     3 Put task 6236...
     4 Put task 4265...
     5 Put task 9257...
     6 Put task 2598...
     7 Put task 181...
     8 Put task 797...
     9 Put task 7652...
    10 Put task 6855...
    11 Put task 1465...
    12 Put task 8195...
    13 Try get results...
    14 Result: 6236 * 6236 = 38887696
    15 Result: 4265 * 4265 = 18190225
    16 Result: 9257 * 9257 = 85692049
    17 Result: 2598 * 2598 = 6749604
    18 Result: 181 * 181 = 32761
    19 Result: 797 * 797 = 635209
    20 Result: 7652 * 7652 = 58553104
    21 Result: 6855 * 6855 = 46991025
    22 Result: 1465 * 1465 = 2146225
    23 Result: 8195 * 8195 = 67158025
    24 master exit.
     1 # task_worker.py
     2 Connect to server 127.0.0.1...
     3 run task 6236 * 6236...
     4 run task 4265 * 4265...
     5 run task 9257 * 9257...
     6 run task 2598 * 2598...
     7 run task 181 * 181...
     8 run task 797 * 797...
     9 run task 7652 * 7652...
    10 run task 6855 * 6855...
    11 run task 1465 * 1465...
    12 run task 8195 * 8195...
    13 worker exit.

    添加任务到 Queue 不可以直接对原始的 task_queue 进行操作,那样就绕过了 QueueManager的 封装,必须通过 manager.get_task_queue() 获得的 Queue 接口添加;

     task_worker.py 中没有创建 Queue 的代码, Queue 对象存储在 task_master.py 进程中:简书账户:‘啃饼小白’的文章中借用而来,如有侵权请告知删除,谢谢!

     Queue 是用来传递任务和接收结果,每个任务的描述数据量要尽量小。它通过 QueueManager 实现通过网络访问。由于 QueueManager 管理不止一个 Queue ,所以要给每个 Queue 的网络调用接口起个名字,比如 get_task_queue 。(笔记借鉴廖雪峰教程内容。)

    # 异步IO(协程)

    廖雪峰 异步IO 中的 协程 详细讲解博客链接:https://blog.csdn.net/SL_World/article/details/86597738

  • 相关阅读:
    【年度总结】——踏雪留痕
    ios提交程序后出现的各种问题
    c++动态库中使用命名空间的问题
    第八章 网络的时代—网络开发(4)
    USB otg 学习笔记
    servlet_3
    Windows server 2012清除并重建SID
    实时监听输入框值变化:oninput & onpropertychange
    JQuery 自动触发事件
    jquery input change事件
  • 原文地址:https://www.cnblogs.com/liqiongming/p/10197228.html
Copyright © 2011-2022 走看看