zoukankan      html  css  js  c++  java
  • Python的多线程(threading)与多进程(multiprocessing )

    进程:程序的一次执行(程序载入内存,系统分配资源运行)。每个进程有自己的内存空间,数据栈等,进程之间可以进行通讯,但是不能共享信息。

    线程:所有的线程运行在同一个进程中,共享相同的运行环境。每个独立的线程有一个程序入口,顺序执行序列和程序的出口。

    线程的运行可以被强占,中断或者暂时被挂起(睡眠),让其他的线程运行。一个进程中的各个线程共享同一片数据空间。

    多线程

    import threading 
    
    def thread_job():
        print "this is added thread,number is {}".format(threading.current_thread())
        
    def main():
        added_thread = threading.Thread(target = thread_job) #添加线程
        added_thread.start() #执行添加的线程    
        
        print threading.active_count() #当前已被激活的线程的数目
        print threading.enumerate()  #激活的是哪些线程
        print threading.current_thread() #正在运行的是哪些线程
        
    if __name__ == "__main__":
        main()
    this is added thread,number is <Thread(Thread-6, started 6244)>6
    [<HistorySavingThread(IPythonHistorySavingThread, started 7588)>, <ParentPollerWindows(Thread-3, started daemon 3364)>, <Heartbeat(Thread-5, started daemon 3056)>, <_MainThread(MainThread, started 1528)>, <Thread(Thread-6, started 6244)>, <Thread(Thread-4, started daemon 4700)>]
    <_MainThread(MainThread, started 1528)>
    
    #join 功能 等到线程执行完之后 再回到主线程中去
    import threading 
    import time
    def T1_job():
        print "T1 start
    "
        for i in range(10):
            time.sleep(0.1)
        print "T1 finish"
    def T2_job():
        print 'T2 start'
        print 'T2 finish'
        
    def main():
        thread1 = threading.Thread(target = T1_job) #添加线程
        thread2 = threading.Thread(target = T2_job)
        thread1.start() #执行添加的线程 
        thread2.start()
        
        thread1.join()
        thread2.join()
        print 'all done
    '
        
    if __name__ == "__main__":
        main()
    
    
    T1 start
    T2 start
    T2 finish
    
    T1 finish
    all done
    

      

    #queue 多线程各个线程的运算的值放到一个队列中,到主线程的时候再拿出来,以此来代替
    #return的功能,因为在线程是不能返回一个值的
    import time
    import threading
    from Queue import Queue
    
    def job(l,q):
        q.put([i**2 for i in l])
        
    def multithreading(data):
        q = Queue()
        threads = []
        for i in xrange(4):
            t = threading.Thread(target = job,args = (data[i],q))
            t.start()
            threads.append(t)
        for thread in threads:
            thread.join()
        results = []
        for _ in range(4):
            results.append(q.get())
        print results
            
    if __name__ == "__main__":
        data = [[1,2,3],[4,5,6],[3,4,3],[5,5,5]]
        multithreading(data)
    
    [[1, 4, 9], [16, 25, 36], [9, 16, 9], [25, 25, 25]]
    

      

    #多线程的锁
    import threading 
    import time
    
    def T1_job():
        global A,lock
        lock.acquire()
        
        for i in xrange(10):
            A += 1
            print 'T1_job',A
            
        lock.release()
        
    def T2_job():
        global A,lock
        lock.acquire()
        
        for i in xrange(10):
            A += 10
            print 'T2_job',A
            
        lock.release()
        
    if __name__ == "__main__":
        lock = threading.Lock()
        A = 0 #全局变量
        thread1 = threading.Thread(target = T1_job) #添加线程
        thread2 = threading.Thread(target = T2_job)
        
        thread1.start() #执行添加的线程 
        thread2.start()
        
        thread1.join()
        thread2.join()  

    全局解释器锁GIL(Global Interpreter Lock)

    GIL并不是Python的特性,他是CPython引入的概念,是一个全局排他锁。

    解释执行python代码时,会限制线程对共享资源的访问,直到解释器遇到I/O操作或者操作次数达到一定数目时才会释放GIL。
     
    所以,虽然CPython的线程库直接封装了系统的原生线程,但CPython整体作为一个进程,同一时间只会有一个获得GIL的线程在跑,其他线程则处于等待状态。这就造成了即使在多核CPU中,多线程也只是做着分时切换而已,所以多线程比较适合IO密集型,不太适合CPU密集型的任务。

    同一时刻一个解释进程只有一行bytecode 在执行

    #python中 多线程的效率不一定就是 3 个线程就 三倍的效率
    #在python中有GIL,线程锁,保证只有一个线程在计算,在不停的切换
    #所以 如果是不同的任务,任务之间差别很大,线程之间可以分工合作,可以提高效率,如一个发送消息,另一个接收消息。
    #如果处理一大堆的数据,多线程帮不上,需要mutliprocessing 因为每个核有单独的逻辑空间,互相不影响
    import time
    import threading
    from Queue import Queue
    def job(l,q):
        q.put(sum(l))
    
    def normal(l):
        print sum(l)
    
    def multithreading(l):
        q = Queue()
        threads = []
        for i in range(3):
            t = threading.Thread(target = job,args = (l,q),name = 'T{}'.format(i))
            t.start()
            threads.append(t)
        [t.join() for t in threads]
        total = 0
        for _ in range(3):
            total += q.get()
        print total
        
    if __name__ == '__main__':
        l = list(xrange(1000000))
        s_t = time.time()
        normal(l*3)
        print 'normal time:',time.time()-s_t
        s_t = time.time()
        multithreading(l)
        print 'multithreading time:',time.time() -s_t
    
    
    1499998500000
    normal time: 0.297999858856
    1499998500000
    multithreading time: 0.25200009346
    

     多进程

    multiprocessing库弥补了thread库因为GIL而低效的缺陷。完整的复制了一套thread所提供的接口方便迁移,唯一的不同就是他使用了多进程而不是多线程。每个进程都有自己独立的GIL。但是在windows下多进程的开销要比多线程要大好多,Linux下是差不多的。多进程更加稳定,

    multiprocessing Process类代表一个进程对象。

    import multiprocessing as mp
    import threading as td
    import time
    
    def job(q):
        res = 0
        for i in range(100000):
            res += i + i **2
        q.put(res)
    
    def normal():
        res = 0
        for i in range(100000):
            res += i + i **2
        print 'normal:',res
    
    def multithread():
        q = mp.Queue() #这里用多进程的queue没问题的
        t1 = td.Thread(target = job,args = (q,))
    #     t2 = td.Thread(target = job(q,))
        t1.start()
    #     t2.start()
        t1.join()
    #     t2.join()
        res1 = q.get()
    #     res2 = q.get()
        print 'thread:',res1
    
    def multiprocess():
        q = mp.Queue()
        p1 = mp.Process(target = job,args = (q,))
    #     p2 = mp.Process(target = job(q,))
        p1.start()
    #     p2.start()
        p1.join()
    #     p2.join()
        res1 = q.get()
    #     res2 = q.get()
        print 'multiprocess:',res1
    
    if __name__ == '__main__':
        st = time.time()
        normal()
        st1 = time.time()
        print 'normal time:',st1 - st
        multithread()
        st2 = time.time()
        print 'thread:',st2 - st1
        multiprocess()
        print 'process:',time.time() - st2
    #进程池 ,Pool中是有return的
    import multiprocessing as mp
    def job(x):
        return x**2
    
    def multiprocess():
        pool = mp.Pool()   #默认是有几个核就用几个,可以自己设置processes = ?
        res = pool.map(job,range(10)) #可以放入可迭代对象,自动分配进程
        print res
        
        res = pool.apply_async(job,(2,)) #一次只能在一个进程里计算,要达到map的效果,要迭代
        print res.get()
        
        multi_res = [pool.apply_async(job,(i,)) for i in range(10)] #迭代器 
        print ([res.get() for res in multi_res])
    if __name__ == '__main__':
        multiprocess()
    

      

    #多进程中的global的全局变量 分给不同的cpu,难以交流
    #使用 shared memory 进行交流
    import multiprocessing as mp
    
    value = mp.Value('d',1) #d就是double,i是一个signed int
    array = mp.Array('i',[1,3,4]) #只是个一维的而已 ,和numpy的不一样
    

      

    #锁
    import multiprocessing as mp
    import time
    
    def job(v,num,l):
        l.acquire()
        for i in range(10):
            time.sleep(0.1)
            v.value += num
            print v.value
        l.release()
            
    def multiprocess():
        v = mp.Value('i',0) #共享内存
        l = mp.Lock()
        q = mp.Queue()
        p1 = mp.Process(target = job,args = (v,1,l))
        p2 = mp.Process(target = job,args = (v,3,l))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    if __name__ == '__main__':
        multiprocess()
    

      

    fork操作:调用一次,返回两次。操作系统自动把当前进程复制一份,分布在父进程和子进程中返回,子进程永远返回0,父进程永远返回子进程的ID。子进程getppid()就可以拿到父进程的ID ,getpid()可以获得当前进程的ID。

  • 相关阅读:
    MySQL 多列索引优化小记
    Spring MVC + Security 4 初体验(Java配置版)
    Java程序通过代理访问网络
    Spring RESTful + Redis全注解实现恶意登录保护机制
    WinSCP 中普通用户以 root 身份登录 Linux
    Linux下修改系统时区
    Git如何检出指定目录或文件
    朴素贝叶斯
    console.log 被重写覆盖以后如何恢复
    MongoDB 基础命令使用学习记录
  • 原文地址:https://www.cnblogs.com/zephyr-1/p/6043785.html
Copyright © 2011-2022 走看看