zoukankan      html  css  js  c++  java
  • 2018年5月22日笔记

    • Python共享内存

    共享内存有两个结构,一个是 Value, 一个是 Array,这两个结构内部都实现了锁机制,因此是多进程安全的。

    Value 和 Array 都需要设置其中存放值的类型,d 是 double 类型,i 是 int 类型,具体的对应关系在Python 标准库的 sharedctypes 模块中查看。

    • 习题1
     1 from multiprocessing import Value, Array, Process
     2 
     3 def woker(arr):
     4     for i in range(len(arr)):
     5         arr[i] = -arr[i]
     6 
     7 if __name__ == '__main__':
     8     arr = Array('i', [x for x in range(10)])
     9     print(arr)
    10     print(arr[:])
    11     p = Process(target=woker, args=(arr,))
    12     p.start()
    13     p.join()            # 等子进程先执行完,否则两次print结果相同
    14     print(arr[:])
    <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_int_Array_10 object at 0x102c76510>>
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
    • Manager -- 专门用来做数据共享的模块

    上面的共享内存支持两种结构 Value 和 Array。 Python 中还有一个强大的Manager,专门用来做数据共享。

    其支持的类型非常多,比如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array。

    一个 Manager 对象是一个服务进程,推荐多进程程序中,数据共享就用一个 manager 管理。

    • 习题2
     1 from multiprocessing import Manager, Process
     2 
     3 def worker(dt, lt):
     4     for i in range(10):
     5         dt[i] = i*i
     6     lt += [x for x in range(11, 20)]
     7 
     8 if __name__ == '__main__':
     9     manager = Manager()
    10     dt = manager.dict()
    11     lt = manager.list()
    12     p = Process(target=worker, args=(dt, lt))
    13     p.start()
    14     p.join(timeout=3)
    15     print(type(dt))
    16     print(dt)
    17     print(type(lt))
    18     print(lt)
    <class 'multiprocessing.managers.DictProxy'>
    {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
    <class 'multiprocessing.managers.ListProxy'>
    [11, 12, 13, 14, 15, 16, 17, 18, 19]
    • pool -- 进程管理

    如果有50个任务要执行, 但是 CPU 只有4核, 你可以创建50个进程来做这个事情。但是大可不必,徒增管理开销。如果你只想创建4个进程,让他们轮流替你完成任务,不用自己去管理具体的进程的创建销毁,那 Pool 是非常有用的。

    Pool 是进程池,进程池能够管理一定的进程,当有空闲进程时,则利用空闲进程完成任务,直到所有任务完成为止。

    Pool 进程池创建4个进程,不管有没有任务,都一直在进程池中等候,等到有数据的时候就开始执行。
    Pool 的 API 列表如下:

      • apply(func[, args[, kwds]])
      • apply_async(func[, args[, kwds[, callback]]])
      • close()
      • terminate()
      • join()

    注意:在调用pool.join()之前,必须先调用pool.close(),否则会出错。执行完close()后不会有新的进程加入到pool中,join()会等待所有子进程结束。

    pool.apply_async  非阻塞(异步执行),定义的进程池最大进程数可同时执行

    pool.apply      阻塞,一个进程结束后释放回池,下一个进程才开始执行

    • 习题3
     1 import multiprocessing
     2 import time
     3 
     4 
     5 def fun(msg):
     6     print("#########start#### {0}".format(msg))
     7     time.sleep(3)
     8     print("#########end###### {0}".format(msg))
     9 
    10 
    11 if __name__ == '__main__':
    12     print("start main")
    13     pool = multiprocessing.Pool(processes=3)
    14     for i in range(1, 7):
    15         msg = "hello {0}".format(i)
    16         # pool.apply_async(fun, (msg,))# 执行时间 6s+
    17         pool.apply(fun, (msg,))   #执行时间 6*3=18+
    18     pool.close()#在调用join之前,要先调用close,否则会报错,close执行完不会有新的进程加入到pool
    19     pool.join()#join 是等待所有的子进程结束
    20     print("end main")
    start main
    #########start#### hello 1
    #########end###### hello 1
    #########start#### hello 2
    #########end###### hello 2
    #########start#### hello 3
    #########end###### hello 3
    #########start#### hello 4
    #########end###### hello 4
    #########start#### hello 5
    #########end###### hello 5
    #########start#### hello 6
    #########end###### hello 6
    end main
     1 import multiprocessing
     2 import time
     3 
     4 
     5 def fun(msg):
     6     print("#########start#### {0}".format(msg))
     7     time.sleep(3)
     8     print("#########end###### {0}".format(msg))
     9 
    10 
    11 if __name__ == '__main__':
    12     print("start main")
    13     pool = multiprocessing.Pool(processes=3)
    14     for i in range(1, 7):
    15         msg = "hello {0}".format(i)
    16         pool.apply_async(fun, (msg,))# 执行时间 6s+
    17         # pool.apply(fun, (msg,))   #执行时间 6*3=18+
    18     pool.close()#在调用join之前,要先调用close,否则会报错,close执行完不会有新的进程加入到pool
    19     pool.join()#join 是等待所有的子进程结束
    20     print("end main")
    start main
    #########start#### hello 1
    #########start#### hello 2
    #########start#### hello 3
    #########end###### hello 1
    #########end###### hello 2
    #########end###### hello 3
    #########start#### hello 4
    #########start#### hello 5
    #########start#### hello 6
    #########end###### hello 5
    #########end###### hello 6
    #########end###### hello 4
    end main
    • threading -- 多线程,IO密集型操作

    多线程实现方式有两种:

      方法一:将要执行的方法作为参数传给Thread的构造方法(和多进程类似)

          t = threading.Thread(target=func, args=(i,))

      方法二:从Thread()继承,并重写run()

    线程 < 进程,1个父进程中含多个线程。

    多线程和多进程的不同之处在于:多线程本身是可以和父进程共享内存的。这也是为什么其中一个线程挂掉会导致其他线程也死掉的道理。

    • 习题4
     1 import threading
     2 import time
     3 
     4 def worker(args):
     5     print("开始子进程 {0}".format(args))
     6     time.sleep(args)
     7     print("结束子进程 {0}".format(args))
     8 
     9 if __name__ == '__main__':
    10 
    11     print("start main")
    12     t1 = threading.Thread(target=worker, args=(1,))
    13     t2 = threading.Thread(target=worker, args=(2,))
    14     t1.start()
    15     t2.start()
    16     t1.join()
    17     t2.join()
    18     print("end main")
    start main
    开始子进程 1
    开始子进程 2
    结束子进程 1
    结束子进程 2
    end main
    • 习题5
     1 import threading
     2 import time
     3 
     4 class Hello(threading.Thread):
     5     def __init__(self, args):
     6         super(Hello, self).__init__()
     7         self.args = args
     8 
     9     def run(self):
    10         print("开始子进程 {0}".format(self.args))
    11         time.sleep(1)
    12         print("结束子进程 {0}".format(self.args))
    13 
    14 if __name__ == '__main__':
    15 
    16     a = 1
    17     print("start main")
    18     t1 = Hello(1)
    19     t2 = Hello(2)
    20     t1.start()
    21     t2.start()
    22     print("end main")
    start main
    开始子进程 1
    开始子进程 2
    end main
    结束子进程 1
    结束子进程 2
    • 习题6
     1 import threading
     2 import time
     3 
     4 class Hello(threading.Thread):
     5     def __init__(self, args):
     6         super(Hello, self).__init__()
     7         self.args = args
     8         global a
     9         print("a = {0}".format(a))
    10         a += 1
    11 
    12     def run(self):
    13         print("开始子进程 {0}".format(self.args))
    14         print("结束子进程 {0}".format(self.args))
    15 
    16 if __name__ == '__main__':
    17     a = 1
    18     print("start main")
    19     t1 = Hello(5)
    20     time.sleep(3)
    21     t2 = Hello(5)
    22     t1.start()
    23     t2.start()
    24     print("#####a = {0}####".format(a))
    25     print("end main")
    start main
    a = 1
    a = 2
    开始子进程 5
    结束子进程 5
    开始子进程 5
    结束子进程 5
    #####a = 3####
    end main
    • 习题7 (线程池)
     1 import threadpool
     2 
     3 def hello(m, n, o):
     4     print("m = {0}  n={1}  o={2}".format(m, n, o))
     5 
     6 if __name__ == '__main__':
     7     # 方法1
     8     lst_vars_1 = ['1', '2', '3']
     9     lst_vars_2 = ['4', '5', '6']
    10     func_var = [(lst_vars_1, None), (lst_vars_2, None)]
    11     # 方法2
    12     # dict_vars_1 = {'m': '1', 'n': '2', 'o': '3'}
    13     # dict_vars_2 = {'m': '4', 'n': '5', 'o': '6'}
    14     # func_var = [(None, dict_vars_1), (None, dict_vars_2)]
    15 
    16     pool = threadpool.ThreadPool(2)
    17     requests = threadpool.makeRequests(hello, func_var)
    18     [pool.putRequest(req) for req in requests]
    19     pool.wait()
    m = 1  n=2  o=3
    m = 4  n=5  o=6
  • 相关阅读:
    剑指offer39-平衡二叉树
    剑指offer37-数字在排序数组中出现的次数
    剑指offer36-两个链表的第一个公共结点
    剑指offer31-整数中1出现的次数
    剑指offer30-连续子数组的最大和
    剑指offer28-数组中出现次数超过一半的数字
    剑指offer26-二叉搜索树与双向链表
    剑指offer21-栈的压入、弹出序列
    剑指offer16-合并两个排序的链表
    C#-杂碎
  • 原文地址:https://www.cnblogs.com/karl-python/p/9080421.html
Copyright © 2011-2022 走看看