zoukankan      html  css  js  c++  java
  • python 线程与进程

    python多进程与多线程

      1 ##############################
      2 #多进程
      3 ##############################
      4 #import os
      5 '''
      6 print('1Process start, pid:',os.getpid())
      7 print('Process start, ppid:',os.getppid())
      8 '''
      9 '''
     10 #unix环境支持
     11 pid = os.fork()
     12 if pid == 0:
     13     print('child process,id:', os.getpid(), 'parent id is:', os.getppid())
     14 else:
     15     print('pid:', os.getpid(), 'create child process:', pid)
     16 '''
     17 
     18 #################
     19 #multiprocessing
     20 #################
     21 '''
     22 from multiprocessing import Process
     23 print('2Process start, pid:',os.getpid())
     24 def run_proc(name):
     25 #    while(1):
     26 #        pass
     27     print('run child process %s (%s)...' %(name, os.getpid()))
     28 
     29 print('4Process start, pid:',os.getpid())
     30 if __name__ == '__main__':
     31     print('1Parent process %s.' % os.getpid())
     32     p = Process(target=run_proc, args=('test',))
     33     print('Child process will start.', os.getpid())
     34     #启动子进程
     35     p.start()
     36     print('3Process start, pid:',os.getpid())
     37     #等待子进程结束, 类似于wait()
     38     p.join()
     39     print('Child process end.', os.getpid())
     40 '''
     41 
     42 #由上面这个例子可以看出,python的代码是顺序执行的。
     43 #执行p.start()方法后,开始执行子进程
     44 #子进程的执行范围是2、4之间的代码行
     45 #p.join()等待子进程返回
     46 #单纯从打印的顺序,无法准确的判断父子进程的执行顺序
     47 
     48 
     49 '''
     50 from multiprocessing import Pool
     51 import os, time, random
     52 def long_time_task(name):
     53     print('Run task %s (%s)...' % (name, os.getpid()))
     54     start = time.time()
     55     time.sleep(random.random()*3)
     56     end = time.time()
     57     print('Task %s runs %0.2f seconds.' % (name, (end - start)))
     58 
     59 if __name__=='__main__':
     60     print('Parent process %s.' % os.getpid())
     61     p = Pool(5)#设置最大线程格个数,默认值是cpu核心数
     62     for i in range(5):
     63         p.apply_async(long_time_task, args=(i,))#创建线程
     64     print('Waiting for all subprocesses done...')
     65     p.close()#防止将任何其他任务提交到池中。完成所有任务后,工作进程将退出。
     66     p.join()#调用前,必须调用close() or terminate()
     67     print('All subprocess done.')
     68 '''
     69 #win7下cmd输出内容如下
     70 '''
     71 Parent process 25380.
     72 Waiting for all subprocesses done...
     73 Run task 0 (26008)...
     74 Run task 1 (26216)...
     75 Run task 2 (28108)...
     76 Run task 3 (24732)...
     77 Run task 4 (25008)...
     78 Task 3 runs 0.15 seconds.
     79 Task 4 runs 1.79 seconds.
     80 Task 2 runs 2.45 seconds.
     81 Task 0 runs 2.69 seconds.
     82 Task 1 runs 2.98 seconds.
     83 All subprocess done.
     84 请按任意键继续. . .
     85 '''
     86 
     87 ###########
     88 #子进程
     89 ###########
     90 '''
     91 import subprocess #类似于exec()?
     92 
     93 print ('$ nslookup www.python.org')
     94 r = subprocess.call(['nslookup', 'www.python.org'])
     95 print('Exit code:', r)
     96 '''
     97 ''' win7输出
     98 $ nslookup www.python.org
     99 服务器:  clou-ad.szclou.com
    100 Address:  10.98.94.5
    101 
    102 非权威应答:
    103 名称:    dualstack.python.map.fastly.net
    104 Addresses:  2a04:4e42:36::223
    105           151.101.228.223
    106 Aliases:  www.python.org
    107 
    108 Exit code: 0
    109 请按任意键继续. . .
    110 '''
    111 '''
    112 import subprocess
    113 print('$ nslookup')
    114 p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    115 output, err = p.communicate(b'set q=mx
    python.org
    exit
    ')
    116 s='utf-8'#unix
    117 s1='gbk'#win7系统
    118 print(output.decode(s))
    119 print('Exit code:', p.returncode)
    120 '''
    121 #相当于命令行下执行nslookup,然后输入
    122 #set q=mx
    123 #python.org
    124 #exit
    125 
    126 '''win7输出
    127 $ nslookup
    128 默认服务器:  clou-ad.szclou.com
    129 Address:  10.98.94.5
    130 
    131 > > 服务器:  clou-ad.szclou.com
    132 Address:  10.98.94.5
    133 
    134 python.org      MX preference = 50, mail exchanger = mail.python.org
    135 
    136 mail.python.org internet address = 188.166.95.178
    137 mail.python.org AAAA IPv6 address = 2a03:b0c0:2:d0::71:1
    138 >
    139 Exit code: 0
    140 请按任意键继续. . .
    141 '''
    142 
    143 ###########
    144 #进程间通信
    145 ###########
    146 '''
    147 from multiprocessing import Process, Queue
    148 import os, time, random
    149 
    150 def write(q):
    151     print('Process to write: %s' % os.getpid())
    152     for value in ['A', 'B', 'C']:
    153         print('put %s to queue.' % value)
    154         q.put(value)
    155         time.sleep(random.random())
    156 
    157 def read(q):
    158     print('Process to read: %s' % os.getpid())
    159     while True:
    160         value = q.get(True)
    161         print('Get %s from queue.' % value)
    162 
    163 if __name__=='__main__':
    164     q=Queue()
    165     pw = Process(target=write, args=(q,))
    166     pr = Process(target=read, args=(q,))
    167 
    168     pw.start()
    169     pr.start()
    170     pw.join()
    171     pr.terminate()
    172 '''
    173 '''
    174 Process to write: 28536
    175 Process to read: 27968
    176 put A to queue.
    177 Get A from queue.
    178 put B to queue.
    179 Get B from queue.
    180 put C to queue.
    181 Get C from queue.
    182 请按任意键继续. . .
    183 '''
    184 
    185 ##############################
    186 #多线程:python的多线程是真正的Posix Thread,而不是模拟出来的(linux)
    187 ##############################
    188 
    189 #Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,
    190 #threading是高级模块,对_thread进行了封装。绝大多数情况下,
    191 #我们只需要使用threading这个高级模块。
    192 '''
    193 import time, threading
    194 
    195 print('thread %s is running...' % threading.current_thread().name)
    196 
    197 def loop():
    198     print('thread %s is running...' % threading.current_thread().name)
    199     n = 0
    200     while n < 5:
    201         n = n + 1
    202         print('thread %s >>> %s' % (threading.current_thread().name, n))
    203         time.sleep(1)
    204     print('thread %s ended.' % threading.current_thread().name)
    205     
    206 
    207 print('print something')
    208 t=threading.Thread(target=loop, name='LoopThread')
    209 t.start()
    210 t.join()
    211 print('thread %s ended.' % threading.current_thread().name)
    212 '''
    213 '''
    214 thread MainThread is running...
    215 thread LoopThread is running...
    216 thread LoopThread >>> 1
    217 thread LoopThread >>> 2
    218 thread LoopThread >>> 3
    219 thread LoopThread >>> 4
    220 thread LoopThread >>> 5
    221 thread LoopThread ended.
    222 thread MainThread ended.
    223 请按任意键继续. . .
    224 '''
    225 '''
    226 任何进程默认启动一个进程,名字是MainThread。
    227 threading.current_thread()返回当前线程的实例
    228 线程创建时,如果不指定名称,默认是Thread-1,.......
    229 '''
    230 #定义锁
    231 '''
    232 lock = threading.Lock()
    233 lock.acquire()
    234 lock.release()
    235 '''
    236 
    237 import time, threading, multiprocessing
    238 '''
    239 def loop():
    240     x = 0
    241     while True:
    242         x = x ^ 1
    243 
    244 print('cpu:%s' % multiprocessing.cpu_count())
    245 for i in range(multiprocessing.cpu_count()):
    246     t = threading.Thread(target=loop)
    247     t.start()
    248 '''
    249 
    250 
    251 ###########
    252 #ThreadLocal:管理线程数据的全局变量
    253 ###########
    254 '''
    255 import threading
    256 #全局变量local_school,一个ThreadLocal对象
    257 #但每个属性,如local_school.student都是线程的局部变量,不会相互干扰
    258 
    259 #ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,
    260 #用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
    261 local_school = threading.local()
    262 
    263 def process_student():
    264     std = local_school.student
    265     print('Hello, %s (in %s)' % (std, threading.current_thread().name))
    266 
    267 def process_thread(name):
    268     local_school.student = name
    269     process_student()
    270 
    271 t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
    272 t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
    273 t1.start()
    274 t2.start()
    275 t1.join()
    276 t2.join()
    277 '''
    278 
    279 '''
    280 计算密集型:cpu利用率高,C语言的运行效率最高,脚本语言效率低
    281 IO密集型:脚本语言的开发效率高,c语言最差
    282 协程:单线程的一步编程模型。有了协程的支持,就可以基于事件驱动编写高效的多任务程序
    283 '''
    284 
    285 
    286 ###########
    287 #分布式进程
    288 ###########
    multiProcess.py

    分布式进程示例:

     1 #task_master.py
     2 import random, time, queue
     3 from multiprocessing.managers import BaseManager
     4 #发送任务队列
     5 task_queue = queue.Queue()
     6 #接收结果队列
     7 result_queue = queue.Queue()
     8 
     9 #从BaseManager继承的QueueManager
    10 class QueueManager(BaseManager):
    11     pass
    12 
    13 # 把两个Queue都注册到网络上, callable参数关联了Queue对象:
    14 QueueManager.register('get_task_queue', callable=lambda:task_queue)
    15 QueueManager.register('get_result_queue', callable=lambda:result_queue)
    16 
    17 #创建实例对象
    18 #绑定端口5000, 设置验证码 'abc'
    19 manager = QueueManager(address=('', 5000), authkey=b'abc')
    20 manager.start()
    21 #获得通过网络访问的Queue对象
    22 task = manager.get_task_queue()
    23 result = manager.get_result_queue()
    24 
    25 time.sleep(10)
    26 #队列中添加任务
    27 for i in range(10):
    28     n = random.randint(0, 10000)
    29     print('Put task %d...' % n)
    30     task.put(n)
    31 #读取结果
    32 print('Try get results...')
    33 for i in range(10):
    34     r = result.get(timeout=10)
    35     print('Result:%s' % r)
    36 
    37 manager.shutdown()
    38 print('master exit.')
     1 #task_worker.py
     2 import queue, time, sys
     3 from multiprocessing.managers import BaseManager
     4 
     5 class QueueManager(BaseManager):
     6     pass
     7 
     8 QueueManager.register('get_task_queue')
     9 QueueManager.register('get_result_queue')
    10 
    11 server_addr = '127.0.0.1'
    12 print('Connect to server %s...' % server_addr)
    13 m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
    14 m.connect()
    15 task = m.get_task_queue()
    16 result = m.get_result_queue()
    17 
    18 '''
    19 for i in range(10):
    20     try:
    21         n = task.get(timeout=1)
    22         print('run task %d * %d...' % (n, n))
    23         r = '%d * %d = %d' % (n, n, n*n)
    24         time.sleep(1)
    25         result.put(r)
    26     except queue.Empty:
    27         print('task queue is empty.')
    28 '''
    29 t1 = time.time()
    30 while 1:
    31 #    if not isinstance(task, queue.Queue()):
    32 #        raise ValueError('invalid Queue:%s' % task)
    33     try:
    34         if task.empty():
    35             time.sleep(1)
    36             print('queue is empty')
    37             continue
    38         n = task.get(timeout=1)
    39         print('run task %d * %d...' % (n, n))
    40         r = '%d * %d = %d' % (n, n, n*n)
    41         result.put(r)
    42         time.sleep(1)
    43         if time.time() - t1 > 60:
    44             print('over 60s')
    45             break
    46     except BaseException as f:
    47         print('Error:', f)
    48         break;
    49 
    50 print('worker exit.')

    可以多次执行work文件,创建多个进程,各个work进程中处理的数据之和是master进程中的数据,且各work处理的数据不会重复。

    目前只能在Ubuntu12.04上执行,win7下可能由于权限问题,无法正常执行。

  • 相关阅读:
    跃迁方法论 Continuous practice
    EPI online zoom session 面试算法基础知识直播分享
    台州 OJ 2648 小希的迷宫
    洛谷 P1074 靶形数独
    洛谷 P1433 DP 状态压缩
    台州 OJ FatMouse and Cheese 深搜 记忆化搜索
    台州 OJ 2676 Tree of Tree 树状 DP
    台州 OJ 2537 Charlie's Change 多重背包 二进制优化 路径记录
    台州 OJ 2378 Tug of War
    台州 OJ 2850 Key Task BFS
  • 原文地址:https://www.cnblogs.com/mofei004/p/9438264.html
Copyright © 2011-2022 走看看