zoukankan      html  css  js  c++  java
  • day8-多进程和多线程

    Python线程

    Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

    import threading
    import time
     
    def sayhi(num): #定义每个线程要运行的函数
     
        print("running on number:%s" %num)
     
        time.sleep(3)
     
    if __name__ == '__main__':
     
        t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
        t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
     
        t1.start() #启动线程
        t2.start() #启动另一个线程
     
        print(t1.getName()) #获取线程名
        print(t2.getName())

    上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

    更多方法:

      • start            线程准备就绪,等待CPU调度
      • setName      为线程设置名称
      • getName      获取线程名称
      • setDaemon   设置为后台线程或前台线程(默认)
                           如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                            如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
      • join              逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
      • run              线程被cpu调度后执行Thread类对象的run方法

     

    通过继承threading类的方式创建多线程:

    #!/usr/bin/env  python
    #coding:utf8
    import threading
    
    class Mytest(threading.Thread):
        def __init__(self,num):
            super(Mytest,self).__init__()
            self.num=num
    
        def run(self):  #定义每个线程要运行的函数
            print self.num
    
    
    if __name__=='__main__':
        for i in range(10):
            P1=Mytest(i)
            P1.start()
    setDaemon的作用:
    #!/usr/bin/env  python
    #coding:utf8
    import threading
    import time
    
    class Mytest(threading.Thread):
        def __init__(self,num):
            super(Mytest,self).__init__()
            self.num=num
    
        def run(self):
            time.sleep(0.5)
            print self.num
            print 'hahaha'
    
    if __name__=='__main__':
        for i in range(5):
            P1=Mytest(i)
            P1.setDaemon(True)  ##将主线程设置为Daemon线程,它退出时,其它子线程会同时退出,不管是否执行完任务
            P1.start()
            print '---test---'
    print '----end----'

    ---test---
    ---test---
    ---test---
    ---test---
    ---test---
    ----end----

    Process finished with exit code 0

    主线程创建了5个子线程  主线程 P1.setDaemon(True)设置为守护线程  主线程执行完就退出不会等待子线程

    join的作用:

    #!/usr/bin/env  python
    #coding:utf8
    import threading
    import time
    
    class Mytest(threading.Thread):
        def __init__(self,num):
            super(Mytest,self).__init__()
            self.num=num
    
        def run(self):
            time.sleep(0.5)
            print self.num
    
    if __name__=='__main__':
        lists=[]
        for i in range(5):
            P1=Mytest(i)
            P1.start()
            lists.append(P1)
            print '---test---'
    
        for j in lists:
            j.join()
    
    print '----end----'
    ---test---
    ---test---
    ---test---
    ---test---
    ---test---
    0
    1
    42
    
    3
    ----end----
    
    Process finished with exit code 0

    使用join之后会等待每个子线程结束之后才会继续向下执行

    线程锁(互斥锁Mutex)

    一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

    import time
    import threading
     
    def addNum():
        global num #在每个线程中都获取这个全局变量
        print('--get num:',num )
        time.sleep(1)
        num  -=1 #对此公共变量进行-1操作
     
    num = 100  #设定一个共享变量
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)
     
    for t in thread_list: #等待所有线程执行完毕
        t.join()
     
     
    print('final num:', num )

    正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。 

    *注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

    加锁版本

    import time
    import threading
     
    def addNum():
        global num #在每个线程中都获取这个全局变量
        print('--get num:',num )
        time.sleep(1)
        lock.acquire() #修改数据前加锁
        num  -=1 #对此公共变量进行-1操作
        lock.release() #修改后释放
     
    num = 100  #设定一个共享变量
    thread_list = []
    lock = threading.Lock() #生成全局锁
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)
     
    for t in thread_list: #等待所有线程执行完毕
        t.join()
     
    print('final num:', num )

      

    RLock(递归锁)

    说白了就是在一个大锁中还要再包含子锁

    import threading,time
     
    def run1():
        print("grab the first part data")
        lock.acquire()
        global num
        num +=1
        lock.release()
        return num
    def run2():
        print("grab the second part data")
        lock.acquire()
        global  num2
        num2+=1
        lock.release()
        return num2
    def run3():
        lock.acquire()
        res = run1()
        print('--------between run1 and run2-----')
        res2 = run2()
        lock.release()
        print(res,res2)
     
     
    if __name__ == '__main__':
     
        num,num2 = 0,0
        lock = threading.RLock()
        for i in range(10):
            t = threading.Thread(target=run3)
            t.start()
     
    while threading.active_count() != 1:
        print(threading.active_count())
    else:
        print('----all threads done---')
        print(num,num2)

      

    Semaphore(信号量)

    互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

    import threading,time
     
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print("run the thread: %s
    " %n)
        semaphore.release()
     
    if __name__ == '__main__':
     
        num= 0
        semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
        for i in range(20):
            t = threading.Thread(target=run,args=(i,))
            t.start()
     
    while threading.active_count() != 1:
        pass #print threading.active_count()
    else:
        print('----all threads done---')
        print(num)

     

    通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

    import threading,time
    import random
    def light():
        if not event.isSet():
            event.set() #wait就不阻塞 #绿灯状态
        count = 0
        while True:
            if count < 10:
                print('33[42;1m--green light on---33[0m')
            elif count <13:
                print('33[43;1m--yellow light on---33[0m')
            elif count <20:
                if event.isSet():
                    event.clear()
                print('33[41;1m--red light on---33[0m')
            else:
                count = 0
                event.set() #打开绿灯
            time.sleep(1)
            count +=1
    def car(n):
        while 1:
            time.sleep(random.randrange(10))
            if  event.isSet(): #绿灯
                print("car [%s] is running.." % n)
            else:
                print("car [%s] is waiting for the red light.." %n)
    if __name__ == '__main__':
        event = threading.Event()
        Light = threading.Thread(target=light)
        Light.start()
        for i in range(3):
            t = threading.Thread(target=car,args=(i,))
            t.start()

    多进程multiprocessing

    from multiprocessing import Process
    import os
     
    def info(title):
        print(title)
        print('module name:', __name__)
        print('parent process:', os.getppid())
        print('process id:', os.getpid())
        print("
    
    ")
     
    def f(name):
        info('33[31;1mfunction f33[0m')
        print('hello', name)
     
    if __name__ == '__main__':
        info('33[32;1mmain process line33[0m')
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()

    进程间通讯  

    不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

    Queues

    使用方法跟threading里的queue差不多

    from multiprocessing import Process, Queue
     
    def f(q):
        q.put([42, None, 'hello'])
     
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()

    Pipes

    The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

    from multiprocessing import Process, Pipe
     
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
     
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        p.join()
     
     
    Managers

    A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

    A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example,

    from multiprocessing import Process, Manager
     
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.append(1)
        print(l)
     
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
     
            l = manager.list(range(5))
            p_list = []
            for i in range(10):
                p = Process(target=f, args=(d, l))
                p.start()
                p_list.append(p)
            for res in p_list:
                res.join()
     
            print(d)
            print(l)

      

     
     

    进程同步

    Without using the lock output from the different processes is liable to get all mixed up.

    from multiprocessing import Process, Lock
     
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
     
    if __name__ == '__main__':
        lock = Lock()
     
        for num in range(10):
            Process(target=f, args=(lock, num)).start()

      

    进程池  

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply
    • apply_async
    from  multiprocessing import Process,Pool
    import time
     
    def Foo(i):
        time.sleep(2)
        return i+100
     
    def Bar(arg):
        print('-->exec done:',arg)
     
    pool = Pool(5)
     
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)
        #pool.apply(func=Foo, args=(i,))
     
    print('end')
    pool.close()
    pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
  • 相关阅读:
    文本特殊符号汇集
    十大编程算法助程序员走上高手之路
    单例模式(Singleton)
    flink time and watermark
    关于maven依赖关系的问题
    幂等
    乐观锁和悲观锁的一个例子
    Elasticsearch logstash filter
    ELK filebeat的安装
    使用 Python 验证数据集中的体温是否符合正态分布
  • 原文地址:https://www.cnblogs.com/xuyanmei/p/5275854.html
Copyright © 2011-2022 走看看