zoukankan      html  css  js  c++  java
  • python多进程(二)

    之前实现的数据共享的方式只有两种结构Value和Array。Python中提供了强大的Manager专门用来做数据共享的,Manager是进程间数据共享的高级接口。 Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。Manager支持的类型有list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value和Array。

    下面看一个用于多进程共享dict和list数据:

    from multiprocessing import Manager, Process
    
    
    def worker(dt, lt):
        for i in range(10):
            dt[i] = i*i  # 访问共享数据
    
        lt += [x for x in range(11, 16)]
    
    if __name__ == '__main__':
        manager = Manager()
        dt = manager.dict()
        lt = manager.list()
        p = Process(target=worker, args=(dt, lt))
        p.start()
        p.join(timeout=3)
        print(dt)
        print(lt)
    结果: {0: 0,
    1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81} [11, 12, 13, 14, 15]

    进程池

    当使用Process类管理非常多(几十上百个)的进程时,就会显得比较繁琐,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。

    import time
    
    import multiprocessing
    
    
    def fun(msg):
        print("#########start#### {0}".format(msg))
        time.sleep(3)
        print("#########end###### {0}".format(msg))
    
    
    if __name__ == '__main__':
        print("start main")
        pool = multiprocessing.Pool(processes=3) #在进程池中创建3个进程
        for i in range(1, 7):
            msg = "hello {0}".format(i)
            pool.apply_async(fun, (msg,))# 执行时间6s+
            # pool.apply(fun, (msg,))    #执行时间 6*3=18s+
        pool.close()   #在调用join之前,要先调用close,否则会报错,close执行完不会有新的进程加入到pool
        pool.join()    #join 是等待所有的子进程结束,必须在close之后调用
        print("end main")
    
    结果:
    start main
    start hello 1
    start hello 2
    start hello 3
    end hello 1
    start hello 4
    end hello 2
    start hello 5
    end hello 3
    start hello 6
    end hello 4
    end hello 5
    end hello 6
    end main
    6.18000006676  # 使用pool.apply_async()所花费时间
    18.2129998207  # 使用pool.apply()所花费时间

    pool.apply_async   非阻塞,定义的进程池最大数的同时执行

    pool.apply         一个进程结束,释放回进程池,开始下一个进程

    从上面的结果中我们可以看到,使用pool.apply_async所花费的时间较少。

    多线程

    Python中提供了threading模块来对多线程进行操作,线程是应用程序中工作的最小单元,多线程适用于密集型io。

    多线程的实现方式有两种:
    方法一:将要执行的方法作为参数传给Thread的构造方法(和多进程类似)
    t = threading.Thread(target=action, args=(i,))

    方法二:从Thread类继承,并重写run()方法。

    方法一示例:

    import threading
    
    def worker(args):
        print("开始子进程 {0}".format(args))
        print("结束子进程 {0}".format(args))
    
    if __name__ == '__main__':
    
        print("start main")
        t1 = threading.Thread(target=worker, args=(1,))  # 和多进程类似
        t2 = threading.Thread(target=worker, args=(2,))
        t1.start()
        t2.start()
        print("end main")
        
    结果:
    start main
    开始子进程 1
    结束子进程 1
    开始子进程 2
    end main
    结束子进程 2

    方法二示例:

    import threading
    
    import time
    
    
    class Hello(threading.Thread):
    # 对run()方法进行重写
    def __init__(self, args): super(Hello, self).__init__() self.args = args def run(self): print("开始子进程 {0}".format(self.args)) time.sleep(1) print("结束子进程 {0}".format(self.args)) if __name__ == '__main__': a = 1 print("start main") t1 = Hello(1) t2 = Hello(2) t1.start() t2.start() print("end main") 结果: start main 开始子进程 1 开始子进程 2 end main 结束子进程 2 结束子进程 1

    线程锁

    当多线程争夺锁时,允许第一个获得锁的线程进入临街区,并执行代码。所有之后到达的线程将被阻塞,直到第一个线程执行结束,退出临街区,并释放锁。
    需要注意,那些阻塞的线程是没有顺序的。

    import time
    import threading
    
    def sub():
        global count
        lock.acquire()  #上锁,第一个线程如果申请到锁,会在执行公共数据的过程中持续阻塞后续线程
                        #即后续第二个或其他线程依次来了发现已经被上锁,只能等待第一个线程释放锁
                        #当第一个线程将锁释放,后续的线程会进行争抢
    
        '''线程的公共数据'''
        temp=count
        print("temp is {0}".format(temp))
        time.sleep(0.001)
        count=temp+1
        print("count is {0}".format(count))
        '''线程的公共数据'''
    
        lock.release()  # 释放锁
        time.sleep(2)
    count=0
    
    l=[]
    lock=threading.Lock()   
    for i in range(100):
        t=threading.Thread(target=sub,args=())
        t.start()
        l.append(t)
    for t in l:
        t.join()
    print(count)
    
    结果:
    100

    线程池


    采用线程池,花费的时间更少。

    语法结构示例:

    import threadpool  
    def ThreadFun(arg1,arg2):  
        pass  
    def main():  
        device_list=[object1,object2,object3......,objectn]#需要处理的设备个数  
        task_pool=threadpool.ThreadPool(8)#8是线程池中线程的个数  
        request_list=[]#存放任务列表  
        #首先构造任务列表  
        for device in device_list:  
            request_list.append(threadpool.makeRequests(ThreadFun,[((device, ), {})]))  
        #将每个任务放到线程池中,等待线程池中线程各自读取任务,然后进行处理,使用了map函数,不了解的可以去了解一下。  
        map(task_pool.putRequest,request_list)  
        #等待所有任务处理完成,则返回,如果没有处理完,则一直阻塞  
        task_pool.poll()  
    if __name__=="__main__":  
        main()

    pip install threadpool   # 安装threadpool
    from threadpool import *   #导入模块
    pool = ThreadPool(size) # 定义一个线程池,最多能创建size个线程
    requests = makeRequests() # 调用makeRequests创建了要开启多线程的函数,以及函数相关参数和回调函数,其中回调函数可以不写,default是无,也就是说makeRequests只需要2个参数就可以运行;
    [pool.putRequest(req) for req in requests] # 将所有要运行多线程的请求扔进线程池
    pool.wait() # 等待所有的线程完成工作后退出。

    [pool.putRequest(req) for req in requests]等同于:
    for req in requests:
        pool.putRequest(req)

    例子:

    import threadpool
    
    
    def hello(m, n, o):
        print("m = {0}  n={1}  o={2}".format(m, n, o))
    
    if __name__ == '__main__':
        # 方法1
        lst_vars_1 = ['1', '2', '3']   # 需要处理的线程个数
        lst_vars_2 = ['4', '5', '6']
        func_var = [(lst_vars_1, None), (lst_vars_2, None)]
        # 方法2
        # dict_vars_1 = {'m': '1', 'n': '2', 'o': '3'}
        # dict_vars_2 = {'m': '4', 'n': '5', 'o': '6'}
        # func_var = [(None, dict_vars_1), (None, dict_vars_2)]
    
        pool = threadpool.ThreadPool(2)  # 线程池中的线程个数
        requests = threadpool.makeRequests(hello, func_var)  # 创建了要开启多线程的函数,创建需要线程池处理的任务,只需要两个参数
        [pool.putRequest(req) for req in requests]  # 将每个任务放入到线程池中
        pool.wait()
    
    结果:
    m = 1  n=2  o=3
    m = 4  n=5  o=6
  • 相关阅读:
    开发记录4
    开发记录3
    豆瓣的基础架构读后感
    开发记录2
    开发记录1
    大数据技术大作业1
    新浪微博平台架构读后感
    第一阶段冲刺第五天
    第一阶段冲刺第四天
    第一阶段冲刺第三天
  • 原文地址:https://www.cnblogs.com/yangjian319/p/9079325.html
Copyright © 2011-2022 走看看