zoukankan      html  css  js  c++  java
  • 多线程&多进程解析:Python、os、sys、Queue、multiprocessing、threading

    当涉及到操作系统的时候,免不了要使用os模块,有时还要用到sys模块。

    设计到并行程序,一般开单独的进程,而不是线程,原因是python解释器的全局解释器锁GIL(global interpreter lock),本文最后会讲到。使用进程可以实现完全并行,无GIL的限制,可充分利用多cpu多核的环境。

    os/sys模块

    1、os模块

    os.system() 函数可以启动一个进程,执行完之后返回状态码。

    os.fork() 复制一个进程,如果是子进程返回0,如果是父进程返回子进程的pid,使用这个函数的时候,建议你学习一下linux编程的知识。

    os.popen 以管道的方式创建进程。

    os.spawnl 也可以创建进程,并能指定环境变量。

    os.kill(pid, sig) 关闭一个进程,pid是进程号,sig是信号。与fork配合使用,例如你刚才用fork创建了一个子进程,它的pid是11990, 那么调用

    os.kill( 11990, signal.CTRL_BREAK_EVENT)

    就以ctrl+c的方式杀死了这个进程。

    os.wait() -> (pid, status)找到任一个僵死子进程,或者等待任一个子进程的SIGCHLD信号

    os.waitpid(pid, options) -> (pid, status) 等待给定进程结束

    除了利用os模块实现多进程和通信,还有一个模块multiprocessing封装了很多创建进程和进程间通信的操作,发挥多核的威力。

    附:

    文件操作也是与操作系统相关的操作,也被封装在os模块。

    文件操作的详细内容见  http://www.cnblogs.com/xinchrome/p/5011304.html

    2、sys模块

    同样是一个与系统相关的模块,它们都表示了程序运行的上下文环境。但是与os模块不同的是,os模块主要封装系统操作,sys模块主要封装系统中的各种环境参数。

    比如文件操作、进程线程操作封装在os模块中;

    标准输入输出stdout/stdin、命令行参数argv、环境变量path、平台platform等参数封装在sys模块中;

    不过sys中也含有一些进程操作,比如sys.exit(n)和sys.exit('Unable to create first child.')

    多进程multiprocessing

     multiprocessing模块的内容:

    multiprocessing.Process(target=run)类的实例表示一个进程,具有字段 pid,方法 start() join()等

    multiprocessing.Pool(processes=4) 类的实例表示一个进程池

    multiprocessing.Lock类的实例表示一个锁,具有acquire()和release() 方法

    multiprocessing.Semaphore(2) 信号量类的实例表示一个信号量,可以指定初始值,具有 acquire() 和 release() 方法

    multiprocessing.Event() 表示一个信号,用于实现多进程等待某一个进程的情况

    进程间要实现通信,除了锁、信号量、事件,还有队列multiprocessing.Queue。

    import multiprocessing
    
    def writer_proc(q):      
        try:         
            q.put(1, block = False) 
        except:         
            pass   
    
    def reader_proc(q):      
        try:         
            print q.get(block = False) 
        except:         
            pass
    
    if __name__ == "__main__":
        q = multiprocessing.Queue()
        writer = multiprocessing.Process(target=writer_proc, args=(q,))  
        writer.start()   
    
        reader = multiprocessing.Process(target=reader_proc, args=(q,))  
        reader.start()  
    
        reader.join()  
        writer.join()

    multiprocessing.Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

    get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

     

    进程同步互斥实例:

    import multiprocessing
    import time
    
    # 信号量实现同步,进程间也可以使用信号量
    def preq(s):
        print "a"
        s.release()
        s.release()
    
    def worker(s,i):
        s.acquire()
        s.acquire()
        print(multiprocessing.current_process().name + " acquire")
        time.sleep(i)
        print(multiprocessing.current_process().name + " release")
        s.release()
    if __name__ == "__main__":
      
        s = multiprocessing.Semaphore(0)
        for i in range(1):
            pre = multiprocessing.Process(target=preq, args=(s,))
            pre.start()
            p = multiprocessing.Process(target=worker, args=(s,1))
            p.start()
    
    
    # 锁实现进程间对文件的互斥访问
    def worker_with(lock, f):
        with lock:
            fs = open(f,"a+")
            fs.write('Lock acquired via with
    ')
            fs.close()
            
    def worker_no_with(lock, f):
        lock.acquire()
        try:
            fs = open(f,"a+")
            fs.write('Lock acquired directly
    ')
            fs.close()
        finally:
            lock.release()
    if __name__ == "__main__":
        f = "file.txt"
      
        lock = multiprocessing.Lock()
        w = multiprocessing.Process(target=worker_with, args=(lock, f))
        nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))
        w.start()
        nw.start()
        w.join()
        nw.join()
    
    
    
    # Event实现同步,Event类的实例表示一个信号,进程调用它的wait方法等待被唤醒,调用set方法唤醒所有正在等待的进程
    def wait_for_event(e):
        """Wait for the event to be set before doing anything"""
        print ('wait_for_event: starting')
        e.wait()
        print ('wait_for_event: e.is_set()->' + str(e.is_set()))
    def wait_for_event_timeout(e, t):
        """Wait t seconds and then timeout"""
        print ('wait_for_event_timeout: starting')
        e.wait(t)
        print ('wait_for_event_timeout: e.is_set()->' + str(e.is_set()))
    
    if __name__ == '__main__':
        e = multiprocessing.Event()
        w1 = multiprocessing.Process(name='block', 
                                     target=wait_for_event,
                                     args=(e,))
        w1.start()
        w2 = multiprocessing.Process(name='non-block', 
                                     target=wait_for_event_timeout, 
                                     args=(e, 2))
        w2.start()
        time.sleep(3)
        e.set()
        print ('main: event is set')
    View Code

    Process类中定义的方法

    | is_alive(self)
    | Return whether process is alive
    |
    | join(self, timeout=None)
    | Wait until child process terminates
    |
    | run(self)
    | Method to be run in sub-process; can be overridden in sub-class
    |
    | start(self)
    | Start child process
    |
    | terminate(self)
    | Terminate process; sends SIGTERM signal or uses TerminateProcess()

    以上来自于Python自带帮助

    进程处理信号 signal

     利用signal模块,进程可以捕获信号,根据相应的handler做处理。

    信号(signal)-- 进程之间通讯的方式,是一种软件中断。一个进程一旦接收到信号就会打断原来的程序执行流程来处理信号。
    几个常用信号:
        SIGINT 终止进程 中断进程 (control+c)

        SIGQUIT 退出进程

        SIGTERM 终止进程: 软件终止信号 (命令行中输入kill命令时,向进程发送的默认信号) 当直接写kill PID,默认是向进程发送SIGTERM

        SIGKILL 终止进程:杀死进程,捕捉这个信号会报错,也就是进程不能捕捉此信号(kill -9)
        SIGALRM 闹钟信号。Alarms信号是一个特殊信号类型,它可以让程序要求系统经过一段时间对自己发送通知。os 标准模块中指出,它可用于避免无限制阻塞 I/O 操作或其它系统调用。

        SIGCHLD 子进程退出时对父进程发出的信号,如果父进程还没有处理它,子进程将会停留在僵死状态等待其父进程调用wait函数,这个状态下的子进程就是僵死进程

    PS:常用信号简介:

    ctrl-c 发送 SIGINT 信号给前台进程组中的所有进程。常用于终止正在运行的程序。
    ctrl-z 发送 SIGTSTP 信号给前台进程组中的所有进程,常用于挂起一个进程。
    ctrl-d 不是发送信号,而是表示一个特殊的二进制值,表示 EOF,也就是输入流(例如普通文件或者stdin)的结束。
    ctrl- 发送 SIGQUIT 信号给前台进程组中的所有进程,终止前台进程并生成 core 文件。

    常常会在python程序被关闭之前加一个钩子,用atexit模块以及signal模块来实现

    父进程捕获终止信号后,还需要向每个子进程发送终止信号。

    注意信号处理函数需要接受两个参数。

    #!/usr/bin python
    
    # 正常退出或者被ctl+c终止时,进程捕获信号,调用处理函数(钩子)
    import atexit
    from signal import signal, SIGTERM
    
    def test():
            print 'exit........'
    atexit.register(test)
    signal(SIGTERM, lambda signum, stack_frame: exit(1))
    
    while True:
            pass
    
    
    # 进程发送信号终止其他进程
    import os  
    import signal  
       
    #发送信号,16175是前面那个绑定信号处理函数的pid,需要自行修改  
    os.kill(16175,signal.SIGTERM)  
    os.kill(16175,signal.SIGUSR1) 
    
    
    # Linux编程范式:fork(),等待SIGCHLD信号
    import os  
    import signal  
    from time import sleep  
       
    def onsigchld(a,b):  
        print '收到子进程结束信号'  
    signal.signal(signal.SIGCHLD,onsigchld)  
       
    pid = os.fork()  
    if pid == 0:  
       print '我是子进程,pid是',os.getpid()  
       sleep(2)  
    else:  
        print '我是父进程,pid是',os.getpid()  
        os.wait()  #等待子进程结束  
    
    
    # 闹钟信号,用于告诉操作系统向自己发送信号
    import signal
    import time
    
    def receive_alarm(signum, stack):
        print 'Alarm :', time.ctime()
    
    # Call receive_alarm in 2 seconds
    signal.signal(signal.SIGALRM, receive_alarm)
    signal.alarm(2)
    
    print 'Before:', time.ctime()
    time.sleep(10)
    print 'After :', time.ctime() 
    View Code

    多线程threading & thread

      与进程不同,线程要实现同步,直接用Python自带的Queue模块即可。Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

         python多线程编程,一般使用thread和threading模块。thread模块想对较底层,threading模块对thread模块进行了封装,更便于使用。所有,通常多线程编程使用threading模块。线程的创建一般有两种:①将创建的函数传递进threading.Thread()对象的target字段,可以是函数或者定义了__call__方法的类实例。②继承threading.Thread类,通常重写run()方法。

    1 threading模块的内容

    Thread类的实例可以引用一个线程,这是我们用的最多的一个类,你可以指定目标线程函数执行或者自定义继承自它的子类都可以实现子线程功能;

    Timer类是Thread类的子类,表示等待一段时间后才开始运行某段代码,或者重复运行某段代码;

    (Python自带的)Queue类的实例是实现了多生产者(Producer)、多消费者(Consumer)的队列,支持锁原语,能够在多个线程之间提供很好的同步支持;

    Lock类的实例引用了一个锁原语,这个我们可以对全局变量互斥时使用,提供acquire和release方法;

    RLock的实例表示可重入锁,使单线程可以再次获得已经获得的锁;

    Condition类的实例表示条件变量,能让一个线程停下来,等待其他线程满足某个“条件”,除了提供acquire和release方法外,还提供了wait和notify方法,相当于一个多功能信号量;

    Event 类的实例表示通用的条件变量。多个线程可以等待某个事件发生,在事件发生后,所有的线程都被激活;

    Semaphore类的实例表示信号量,为等待资源的线程提供一个类似队列的结构,提供acquire和release方法,初始化的时候可以指定初值Semaphore(3);

    BoundedSemaphore 与semaphore类似,但不允许超过初始值;

     threading.Thread类的内容:

    getName(self) 返回线程的名字

    isAlive(self) 布尔标志,表示这个线程是否还在运行中

    isDaemon(self) 返回线程的daemon标志

    join(self, timeout=None) 程序挂起,直到线程结束,如果给出timeout,则最多阻塞timeout秒

    run(self) 定义线程的功能函数

    setDaemon(self, daemonic) 把线程的daemon标志设为daemonic

    setName(self, name) 设置线程的名字

    start(self) 开始线程执行

    ps:th.join()方法可能不是很安全,如果th对应的线程没有被真正启动,那么调用th.join()的线程将不会等待,而会继续运行下去。用信号量更好。

    多线程举例:

    #coding:utf-8
    import threading, time 
    
    #最简单的启动线程的方式
    def sayHi(): 
        time.sleep(1) 
        print 'Hi, linuxapp' 
    
    th=threading.Thread(target=sayHi) 
    th.start() 
    th.join() 
    
     
    # 使用threading.Thread(),设置线程类实例的target属性,表示一个线程
    def T():
        print threading.current_thread().getName()
     
    t1 = threading.Thread(target=T, name='tt11')
    t1.start()
    t1.join()
    
    # 通过threading.Thread类的子类实例表示线程,注意父类构造方法__init__不能省略
    class T2(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
     
        def run(self):
            print "in run() of T2  " + threading.current_thread().getName()
     
    # threading.Lock类的实例表示一个互斥锁,一个资源被加锁后,其他线程不能访问
    class T3(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.counter = 0;
            self.mutex = threading.Lock()
    
        def run(self):
            time.sleep(1)
            if self.mutex.acquire():
                self.counter += 1
                print self.counter
                self.mutex.release()
     
    
    # 如果同一个线程需要多次获得资源,如果不使用 mutex = threading.RLock() ,就会死锁
    class T4(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.counter = 0
            self.mutex = threading.Lock()
     
        def run(self):
            time.sleep(1)
            if self.mutex.acquire():
                self.counter += 1
                if self.mutex.acquire():
                    self.counter += 1
                    self.mutex.release()
                self.mutex.release()
     
     
    def main():
        t = T3()
        t.start()
     
    if __name__ == '__main__':
        main()
     
    # threading.Condition类的实例表示一个条件变量,相当于多功能信号量
    condition = threading.Condition()  
    products = 0  
        
    class Producer(threading.Thread):  
        def __init__(self):  
            threading.Thread.__init__(self)  
               
        def run(self):  
            global condition, products  
            while True:  
                if condition.acquire():  
                    if products < 10:  
                        products += 1;  
                        print "Producer(%s):deliver one, now products:%s" %(self.name, products)  
                        condition.notify()  
                    else:  
                        print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)  
                        condition.wait();  
                    condition.release()  
                    time.sleep(2)  
               
    class Consumer(threading.Thread):  
        def __init__(self):  
            threading.Thread.__init__(self)  
               
        def run(self):  
            global condition, products  
            while True:  
                if condition.acquire():  
                    if products > 1:  
                        products -= 1  
                        print "Consumer(%s):consume one, now products:%s" %(self.name, products)  
                        condition.notify()  
                    else:  
                        print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)  
                        condition.wait();  
                    condition.release()  
                    time.sleep(2)  
                       
    if __name__ == "__main__":  
        for p in range(0, 2):  
            p = Producer()  
            p.start()  
               
        for c in range(0, 10):  
            c = Consumer()  
            c.start()
    
    
    # threading.Event类的实例表示一个信号,如果信号signal为true,那么等待这个signal的所有线程都将可以运行
    class MyThread(threading.Thread):  
        def __init__(self, signal):  
            threading.Thread.__init__(self) 
            self.singal = signal  
               
        def run(self):  
            print "I am %s,I will sleep ..."%self.name  
            # 进入等待状态
            self.singal.wait()  
            print "I am %s, I awake..." %self.name  
               
    if __name__ == "__main__":
        # 初始 为 False 
        singal = threading.Event()  
        for t in range(0, 3):  
            thread = MyThread(singal)  
            thread.start()  
           
        print "main thread sleep 3 seconds... "  
        time.sleep(3)  
        # 唤醒含有signal, 处于等待状态的线程  
        singal.set()
    View Code

    python多线程的限制
    python多线程有个讨厌的限制,全局解释器锁(global interpreter lock),这个锁的意思是任一时间只能有一个线程使用解释器,跟单cpu跑多个程序也是一个意思,大家都是轮着用的,这叫“并发”,不是“并行”。手册上的解释是为了保证对象模型的正确性!这个锁造成的困扰是如果有一个计算密集型的线程占着cpu,其他的线程都得等着,试想你的多个线程中有这么一个线程,多线程生生被搞成串行;当然这个模块也不是毫无用处,手册上又说了:当用于IO密集型任务时,IO期间线程会释放解释器,这样别的线程就有机会使用解释器了!所以是否使用这个模块需要考虑面对的任务类型。

  • 相关阅读:
    MQTT:前端js客户端库MQTT.js
    MQTT:java客户端库Paho
    EMQ X:认证
    EMQ X:初体验
    MQTT协议
    连接Mysql时报javax.net.ssl.SSLHandshakeException No appropriate protocol (protocol is disabled or cipher suites are inappropriate)错误
    微星11代gp76 3070解锁140w功率
    Jenkins:使用ssh方式拉取gitlab代码
    juc:AQS
    juc:LockSupport
  • 原文地址:https://www.cnblogs.com/xinchrome/p/5031497.html
Copyright © 2011-2022 走看看