zoukankan      html  css  js  c++  java
  • python--第九天总结

    python 多进程和多线程

    多线程可以共享全局变量,多进程不能。多线程中,所有子线程的进程号相同;多进程中,不同的子进程进程号不同。

    【多进程】

    Python在2.6引入了多进程的机制,并提供了丰富的组件及api以方便编写并发应用。multiprocessing包的组件Process, Queue, Pipe, Lock等组件提供了与多线程类似的功能。使用这些组件,可以方便地编写多进程并发程序。
     
    Process
     
    Process等同于java.lang.Thread。start方法用以启动某个进程。一个简单的示例:
     
    from multiprocessing import Process
    import os
    import time
    
    def sleeper(name, seconds):
        print "Process ID# %s" % (os.getpid())
        print "Parent Process ID# %s" % (os.getppid())
        print "%s will sleep for %s seconds" % (name, seconds)
        time.sleep(seconds)
    
    if __name__ == "__main__":
        child_proc = Process(target=sleeper, args=('bob', 5))
        child_proc.start()
        print "in parent process after child process start"
        print "parent process abount to join child process"
        child_proc.join()
        print "in parent process after child process join"
        print "the parent's parent process: %s" % (os.getppid())
    实例化一个Process必须要指定target和args。target是新的进程的入口方法,可以认为是main方法。args是该方法的 参数列表。启动进程类似于启动Thread,必须要调用start方法。也可以继承Process,覆盖run方法,在run方法中实现该进程的逻辑。调 用join方法会阻塞当前调用进程,直到被调用进程运行结束。
     
    手工终止一个进程可以调用terminate方法,在UNIX系统中,该方法会发送SIGTERM信号量,而在windows系统中,会借助 TerminateProcess方法。需要注意的是,exit处理逻辑并不会被执行,该进程的子进程不会被终止,他们只会变成孤儿进程。
     
    Queue
     
    Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参 数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到 该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full 异常。
     
    get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为 True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为 False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一 段示例代码:
     
    from multiprocessing import Process, Queue
    
    def offer(queue):
        queue.put("Hello World")
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=offer, args=(q,))
        p.start()
        print q.get()
    Pipes
     
    Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说 conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
     
    send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
    from multiprocessing import Process, Pipe
    
    def send(conn):
        conn.send("Hello World")
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=send, args=(child_conn,))
        p.start()
        print parent_conn.recv()
    同步
     
    multiprocessing包提供了Condition, Event, Lock, RLock, Semaphore等组件可用于同步。下面是使用Lock的一个示例:
    from multiprocessing import Process, Lock
    
    def l(lock, num):
        lock.acquire()
        print "Hello Num: %s" % (num)
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(20):
            Process(target=l, args=(lock, num)).start()

    【多线程】

    目前python 提供了几种多线程实现方式 thread,threading,multithreading ,其中thread模块比较底层,而threading模块是对thread做了一些包装,可以更加方便的被使用。
    2.7版本之前python对线程的支持还不够完善,不能利用多核CPU,但是2.7版本的python中已经考虑改进这点,出现了multithreading  模块。threading模块里面主要是对一些线程的操作对象化,创建Thread的class。一般来说,使用线程有两种模式:
    A 创建线程要执行的函数,把这个函数传递进Thread对象里,让它来执行;
    B 继承Thread类,创建一个新的class,将要执行的代码 写到run函数里面。

    下面介绍两种实现方法。
    第一种 创建函数并且传入Thread 对象

        import threading,time
        from time import sleep, ctime
        def now() :
            return str( time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime() ) )
    
        def test(nloop, nsec):
            print 'start loop', nloop, 'at:', now()
            sleep(nsec)
            print 'loop', nloop, 'done at:', now()
    
        def main():
            print 'starting at:',now()
            threadpool=[]
    
            for i in xrange(10):
                th = threading.Thread(target= test,args= (i,2))
                threadpool.append(th)
    
            for th in threadpool:
                th.start()
    
            for th in threadpool :
                threading.Thread.join( th )
    
            print 'all Done at:', now()
    
        if __name__ == '__main__':
                main()

    执行结果:

    第二种 创建一个新的class

        import threading ,time
        from time import sleep, ctime
        def now() :
            return str( time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime() ) )
    
        class myThread (threading.Thread) :
              """docstring for myThread"""
              def __init__(self, nloop, nsec) :
                  super(myThread, self).__init__()
                  self.nloop = nloop
                  self.nsec = nsec
    
              def run(self):
                  print 'start loop', self.nloop, 'at:', ctime()
                  sleep(self.nsec)
                  print 'loop', self.nloop, 'done at:', ctime()
        def main():
             thpool=[]
             print 'starting at:',now()
            
             for i in xrange(10):
                 thpool.append(myThread(i,2))
                 
             for th in thpool:
                 th.start()
           
             for th in thpool:
                 th.join()
            
             print 'all Done at:', now()
    
        if __name__ == '__main__':
                main()

    执行结果:

    我们将会看到一些在 Python 中使用线程的实例和如何避免线程之间的竞争。
    你应当将下边的例子运行多次,以便可以注意到线程是不可预测的和线程每次运行出的不同结果。声明:从这里开始忘掉你听到过的关于 GIL 的东西,因为 GIL 不会影响到我想要展示的东西。

    示例1,我们将要请求五个不同的url:

    1、单线程

    import time
    import urllib2
      
    def get_responses():
        urls = [
            'http://www.google.com',
            'http://www.amazon.com',
            'http://www.ebay.com',
            'http://www.alibaba.com',
            'http://www.reddit.com'
        ]
        start = time.time()
        for url in urls:
            print url
            resp = urllib2.urlopen(url)
            print resp.getcode()
        print "Elapsed time: %s" % (time.time()-start)
      
    get_responses()

    输出是:

    http://www.google.com 200
    http://www.amazon.com 200
    http://www.ebay.com 200
    http://www.alibaba.com 200
    http://www.reddit.com 200

    Elapsed time: 3.0814409256

    解释:

    url顺序的被请求
    除非cpu从一个url获得了回应,否则不会去请求下一个url
    网络请求会花费较长的时间,所以cpu在等待网络请求的返回时间内一直处于闲置状态。

    2、多线程

    import urllib2
    import time
    from threading import Thread
      
    class GetUrlThread(Thread):
        def __init__(self, url):
            self.url = url 
            super(GetUrlThread, self).__init__()
      
        def run(self):
            resp = urllib2.urlopen(self.url)
            print self.url, resp.getcode()
      
    def get_responses():
        urls = [
            'http://www.google.com', 
            'http://www.amazon.com', 
            'http://www.ebay.com', 
            'http://www.alibaba.com', 
            'http://www.reddit.com'
        ]
        start = time.time()
        threads = []
        for url in urls:
            t = GetUrlThread(url)
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        print "Elapsed time: %s" % (time.time()-start)
      
    get_responses()

    输出:
    http://www.reddit.com 200
    http://www.google.com 200
    http://www.amazon.com 200
    http://www.alibaba.com 200
    http://www.ebay.com 200

    Elapsed time: 0.689890861511

    解释:
    意识到了程序在执行时间上的提升
    我们写了一个多线程程序来减少cpu的等待时间,当我们在等待一个线程内的网络请求返回时,这时cpu可以切换到其他线程去进行其他线程内的网络请求。
    我们期望一个线程处理一个url,所以实例化线程类的时候我们传了一个url。
    线程运行意味着执行类里的run()方法。
    无论如何我们想每个线程必须执行run()。
    为每个url创建一个线程并且调用start()方法,这告诉了cpu可以执行线程中的run()方法了。
    我们希望所有的线程执行完毕的时候再计算花费的时间,所以调用了join()方法。
    join()可以通知主线程等待这个线程结束后,才可以执行下一条指令。
    每个线程我们都调用了join()方法,所以我们是在所有线程执行完毕后计算的运行时间。
    关于线程:
    cpu可能不会在调用start()后马上执行run()方法。
    你不能确定run()在不同线程建间的执行顺序。
    对于单独的一个线程,可以保证run()方法里的语句是按照顺序执行的。
    这就是因为线程内的url会首先被请求,然后打印出返回的结果。

    示例2,全局变量的线程安全问题(race condition)

    1、BUG 版

    我们将会用一个程序演示一下多线程间的资源竞争,并修复这个问题。

    from threading import Thread
      
    #define a global variable
    some_var = 0
      
    class IncrementThread(Thread):
        def run(self):
            #we want to read a global variable
            #and then increment it
            global some_var
            read_value = some_var
            print "some_var in %s is %d" % (self.name, read_value)
            some_var = read_value + 1
            print "some_var in %s after increment is %d" % (self.name, some_var)
      
    def use_increment_thread():
        threads = []
        for i in range(50):
            t = IncrementThread()
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        print "After 50 modifications, some_var should have become 50"
        print "After 50 modifications, some_var is %d" % (some_var,)
      
    use_increment_thread()

    多次运行这个程序,你会看到多种不同的结果。
    解释:
    有一个全局变量,所有的线程都想修改它。
    所有的线程应该在这个全局变量上加 1 。
    有50个线程,最后这个数值应该变成50,但是它却没有。
    为什么没有达到50?
    在some_var是15的时候,线程t1读取了some_var,这个时刻cpu将控制权给了另一个线程t2。
    t2线程读到的some_var也是15
    t1和t2都把some_var加到16
    当时我们期望的是t1 t2两个线程使some_var + 2变成17
    在这里就有了资源竞争。
    相同的情况也可能发生在其它的线程间,所以出现了最后的结果小于50的情况。

    2、解决资源竞争

    from threading import Lock, Thread
    lock = Lock()
    some_var = 0
      
    class IncrementThread(Thread):
        def run(self):
            #we want to read a global variable
            #and then increment it
            global some_var
            lock.acquire()
            read_value = some_var
            print "some_var in %s is %d" % (self.name, read_value)
            some_var = read_value + 1
            print "some_var in %s after increment is %d" % (self.name, some_var)
            lock.release()
      
    def use_increment_thread():
        threads = []
        for i in range(50):
            t = IncrementThread()
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        print "After 50 modifications, some_var should have become 50"
        print "After 50 modifications, some_var is %d" % (some_var,)
      
    use_increment_thread()

    再次运行这个程序,达到了我们预期的结果。
    解释:
    Lock 用来防止竞争条件
    如果在执行一些操作之前,线程t1获得了锁。其他的线程在t1释放Lock之前,不会执行相同的操作
    我们想要确定的是一旦线程t1已经读取了some_var,直到t1完成了修改some_var,其他的线程才可以读取some_var
    这样读取和修改some_var成了逻辑上的原子操作。

    示例3,多线程环境下的原子操作

    让我们用一个例子来证明一个线程不能影响其他线程内的变量(非全局变量)。
    time.sleep()可以使一个线程挂起,强制线程切换发生。

    1、BUG 版

    from threading import Thread
    import time
      
    class CreateListThread(Thread):
        def run(self):
            self.entries = []
            for i in range(10):
                time.sleep(0.01)
                self.entries.append(i)
            print self.entries
      
    def use_create_list_thread():
        for i in range(3):
            t = CreateListThread()
            t.start()
      
    use_create_list_thread()

    运行几次后发现并没有打印出正确的结果。当一个线程正在打印的时候,cpu切换到了另一个线程,所以产生了不正确的结果。我们需要确保print self.entries是个逻辑上的原子操作,以防打印时被其他线程打断。

    2、加锁保证操作的原子性

    我们使用了Lock(),来看下边的例子。

    from threading import Thread, Lock
    import time
      
    lock = Lock()
      
    class CreateListThread(Thread):
        def run(self):
            self.entries = []
            for i in range(10):
                time.sleep(0.01)
                self.entries.append(i)
            lock.acquire()
            print self.entries
            lock.release()
      
    def use_create_list_thread():
        for i in range(3):
            t = CreateListThread()
            t.start()
      
    use_create_list_thread()

    这次我们看到了正确的结果。证明了一个线程不可以修改其他线程内部的变量(非全局变量)。

    示例4,Python多线程简易版:线程池 threadpool

    上面的多线程代码看起来有点繁琐,下面我们用 treadpool 将案例 1 改写下:

    import threadpool
    import time
    import urllib2
     
    urls = [
        'http://www.google.com', 
        'http://www.amazon.com', 
        'http://www.ebay.com', 
        'http://www.alibaba.com', 
        'http://www.reddit.com'
    ]
     
    def myRequest(url):
        resp = urllib2.urlopen(url)
        print url, resp.getcode()
     
     
    def timeCost(request, n):
      print "Elapsed time: %s" % (time.time()-start)
     
    start = time.time()
    pool = threadpool.ThreadPool(5)
    reqs = threadpool.makeRequests(myRequest, urls, timeCost)
    [ pool.putRequest(req) for req in reqs ]

    解释关键代码:

    • ThreadPool(poolsize)  

    表示最多可以创建poolsize这么多线程;
     

    • makeRequests(some_callable, list_of_args, callback)  

    makeRequests创建了要开启多线程的函数,以及函数相关参数和回调函数,其中回调函数可以不写,default是无,也就是说makeRequests只需要2个参数就可以运行;

    注意:threadpool 是非线程安全的。

    参考

    1、线程安全及Python中的GIL

    线程安全

    作者博客链接.

    Thread safety is a computer programming concept applicable in the context of multi-threaded programs.
    A piece of code is thread-safe if it functions correctly during simultaneous execution by multiple threads.
    In particular, it must satisfy the need for multiple threads to access the same shared data,
    and the need for a shared piece of data to be accessed by only one thread at any given time.
    

    上面是wikipedia中的解释, 换句话说, 线程安全 是在多线程的环境下, 线程安全能够保证多个线程同时执行时程序依旧运行正确, 而且要保证对于共享的数据,可以由多个线程存取,但是同一时刻只能有一个线程进行存取.

    既然,多线程环境下必须存在资源的竞争,那么如何才能保证同一时刻只有一个线程对共享资源进行存取?

    加锁, 对, 加锁可以保证存取操作的唯一性, 从而保证同一时刻只有一个线程对共享数据存取.

    通常加锁也有2种不同的粒度的锁:

    1. fine-grained(所谓的细粒度), 那么程序员需要自行地加,解锁来保证线程安全
    2. coarse-grained(所谓的粗粒度), 那么语言层面本身维护着一个全局的锁机制,用来保证线程安全

    前一种方式比较典型的是 javaJython 等, 后一种方式比较典型的是 CPython (即Python).

    前一种本文不进行讨论, 具体可参考 java 中的多线程编程部分.

    至于Python中的全局锁机制,也即 GIL (Global Interpreter Lock), 下面主要进行一些讨论.

    GIL

    作者博客链接.

    什么是 GIL ? 答案可参考wikipedia中的说明, 简单地说就是:

    每一个interpreter进程,只能同时仅有一个线程来执行, 获得相关的锁, 存取相关的资源.

    那么很容易就会发现,如果一个interpreter进程只能有一个线程来执行, 多线程的并发则成为不可能, 即使这几个线程之间不存在资源的竞争.

    从理论上讲,我们要尽可能地使程序更加并行, 能够充分利用多核的功能, 那么Python为什么要使用 全局的 GIL 来限制这种并行呢?

    这个问题,其实已经得到了很多的讨论, 不止十年, 可以参考下面的文档:

    反对 GIL 的声音:

    1. An open letter to Guido van Rossum (这个文章值得一看,下面有很多的留言也值得一看)

    认为 GIL 不能去除的:

    1. It isn't Easy to Remove the GIL (这个文章来自python作者 Guido, 他说明了什么要使用 GIL)

    其它的一些讨论很容易从Google来搜索得到, 譬如: GIL at google.

    那么,简单总结下双方的观点.

    认为应该去除 GIL 的:

    1. 不顺应计算机的发展潮流(多核时代已经到来, 而 GIL 会很影响多核的使用)
    2. 大幅度提升多线程程序的速度

    认为不应该去除 GIL 的(如果去掉,会):

    1. 写python的扩展(module)时会遇到锁的问题,程序员需要繁琐地加解锁来保证线程安全
    2. 会较大幅度地减低单线程程序的速度

    后者是 Guido 最为关切的, 也是不去除 GIL 最重要的原因, 一个简单的尝试是在1999年(十年前), 最终的结果是导致单线程的程序速度下降了几乎2倍.

    归根结底,其实就是多进程与多线程的选择问题, 有一段话比较有意思, 可以参考 http://www.artima.com/forums/flat.jsp?forum=106&thread=214235.

    我引用如下:

    I actually don't think removing the GIL is a good solution.
    But I don't think threads are a good solution, either.
    They're too hard to get right, and I say that after spending literally years studying threading in both C++ and Java.
    Brian Goetz has taken to saying that no one can get threading right.
    

    引自 Bruce Eckel 对 Guido 的回复. 而 Bruce Eckel 是何许人, 如果你了解 java 或者 C++, 那么应该不会不知道他.

    个人的观点

    作者博客链接.

    那么,从我自己的角度来看(我没有太多的多线程编程经验), 先不论多线程的速度优势等,我更加喜欢多进程的是:

    1. 简单,无需要人为(或者语言级别)的加解锁. 想想 java 中的多线程编程,程序员通常会在此处出错(java程序员可以思考下)
    2. 安全, 这也是浏览器为什么开始使用多进程的一个原因

    依照Python自身的哲学, 简单 是一个很重要的原则,所以, 使用 GIL 也是很好理解的.

    当然你真的需要充分利用多核的速度优势,此时python可能并非你最佳的选择,请考虑别的语言吧,如 javaerlang 等.

    2、Python 不能利用多核的问题以后能被解决吗?

    本文主要讨论了 python 中的 单线程、多线程、多进程、异步、协程、多核、VM、GIL、GC、greenlet、Gevent、性能、IO 密集型、CPU 密集型、业务场景 等问题,以这些方面来判断去除 GIL 实现多线程的优劣:

    http://www.zhihu.com/question/21219976

  • 相关阅读:
    atitit.按钮光标滑过高亮切换以及其他动态效果的实现css html js attilax总结
    atitit. 文件上传带进度条 atiUP 设计 java c# php
    atitit.新增编辑功能 跟orm的实现 attilax p31
    atitit. java jsoup html table的读取解析 总结
    atitit.设计文档操作日志的实现
    atitit.资源释放机制attilax总结
    (转)Android的消息机制,用Android线程间通信的Message机制,Android中Handler的使用方法——在子线程中更新界面,handler机制
    (转)Android笔记handler机制
    (转)数据存储
    (转)android连网详解
  • 原文地址:https://www.cnblogs.com/wjx1/p/5077716.html
Copyright © 2011-2022 走看看