进程
# *********通过process创建单个进程 from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) # 获取当前进程的名字,pid值 if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) # 传入子进程的函数名,给子进程起个名字 print('Child process will start.') p.start() p.join() print('Child process end.') # Parent process 9976. # Child process will start. # Run child process test (9368)... # Child process end. # *************通过Pool创建多个进程,Pool内的参数为最多同时执行的进程数 from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') # Parent process 8808. # Waiting for all subprocesses done... # Run task 0 (2956)... # Run task 1 (5296)... # Run task 2 (8432)... # Run task 3 (8412)... # Task 3 runs 0.10 seconds. # Run task 4 (8412)... # Task 2 runs 0.41 seconds. # Task 4 runs 1.61 seconds. # Task 1 runs 1.90 seconds. # Task 0 runs 2.09 seconds. # All subprocesses done. #****************** subprocess可以让我们方便的启动一个子进程,然后控制其输入和输出,nslookup是终端下的一个命令,能查到指定网址的ip import subprocess print('$ nslookup www.python.org') r = subprocess.call(['nslookup', 'www.python.org']) # 创建了一个子进程并输入了 nslookup www.python.org print('Exit code:', r) ''' ''' #************** 如果子进程还需要输入,则可以通过communicate()方法输入 import subprocess print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx python.org exit ') print(output.decode('gbk')) # print(output.decode('utf-8')) 源码是utf-8 但是会报错,看底下的评论是编码问题,换成gbk就好了 # 因为pycharm默认的是utf-8,这也导致了上面的例子有乱码, '$ nslookup www.baidu.com'相当于是运行在cmd中的命令,命令运行后显示的结果也是cmd中的结果, # 而cmd本身默认编码是GBK的,所以会出现乱码。解决方法:File>>Settings>>File Encodings>>Project Encoding:(选择GBK) print('Exit code:', p.returncode) # 上面的代码相当于在命令行执行命令nslookup,然后手动输入: # set q=mx # python.org # exit # 输出结果如下 # $ nslookup # Server: 192.168.19.4 # Address: 192.168.19.4#53 # # Non-authoritative answer: # python.org mail exchanger = 50 mail.python.org. # # Authoritative answers can be found from: # mail.python.org internet address = 82.94.164.166 # mail.python.org has AAAA address 2001:888:2000:d::a6 # # # Exit code: 0 #*************进程间通信 from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): # 通过q 传入Queue() 队列 这样就可以保存数据了 print('Process to read: %s' % os.getpid()) while True: value = q.get(True) # 这里的True其实是一个参数(block=True) ,表示当queue中没有数据的时候阻塞 print('Get %s from queue.' % value) if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,无法等待其结束,只能强行终止: pr.terminate()
#****使用threading 创建线程 import time, threading # 新线程执行的代码: def loop(): print('thread %s is running...' % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1) print('thread %s ended.' % threading.current_thread().name) print('thread %s is running...' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopThread') # 创建线程, target选择线程的函数 ,name 给这个线程起个名字 t.start() # 开始线程 t.join() # 等待线程结束 print('thread %s ended.' % threading.current_thread().name) # thread MainThread is running... # thread LoopThread is running... # thread LoopThread >>> 1 # thread LoopThread >>> 2 # thread LoopThread >>> 3 # thread LoopThread >>> 4 # thread LoopThread >>> 5 # thread LoopThread ended. # thread MainThread ended. # *** Lock 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享, # 所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了 import time, threading # 假定这是你的银行存款: balance = 0 def change_it(n): # 先存后取,结果应该为0: global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(100000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) # 开启线程,线程函数run_thread 传入的参数 n = 5 t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance) # 这样可能会出问题,因为 t1 ,t2是交替运行的,可能会把balance的值改乱了 # 如果我们要确保balance计算正确,就要给change_it()上一把锁,当某个线程开始执行change_it()时, # 我们说,该线程因为获得了锁,因此其他线程不能同时执行change_it(),只能等待,直到锁被释放后, # 获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。 # 创建一个锁就是通过threading.Lock()来实现 import time, threading # 假定这是你的银行存款: balance = 0 def change_it(n): # 先存后取,结果应该为0: global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(100000): # 先要获取锁: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了一定要释放锁: lock.release() t1 = threading.Thread(target=run_thread, args=(5,)) # 开启线程,线程函数run_thread 传入的参数 n = 5 t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance) balance = 0 lock = threading.Lock() # 当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁, # 然后继续执行代码,其他线程就继续等待直到获得锁为止。获得锁的线程用完后一定要释放锁, # 否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally来确保锁一定会被释放
#***ThreadLocal多线程中函数的传参问题 def process_student(name): # 这个函数作为一个线程函数,通过这个函数创建多个线程, # 执行这个函数时再调用 do_task_1() 函数 和 do_task_2(std)函数 std = Student(name) # std是局部变量,但是每个函数都要用它,因此必须传进去: do_task_1(std) do_task_2(std) def do_task_1(std): do_subtask_1(std) # 只能从上层函数一层一层的传参,不能把str作为全局变量,因为每个线程的name都是不同的,所以std也是不同的 do_subtask_2(std) def do_task_2(std): do_subtask_2(std) do_subtask_2(std) #如果用一个全局dict存放所有的Student对象,然后以thread自身作为key获得线程对应的Student对象,这样就不必一层层传函数了 global_dict = {} def std_thread(name): std = Student(name) # 把std放到全局变量global_dict中: global_dict[threading.current_thread()] = std do_task_1() do_task_2() def do_task_1(): # 不传入std,而是根据当前线程查找: std = global_dict[threading.current_thread()] # 不用从上层函数获取参数,直接在全局变量中找参数 ... def do_task_2(): # 任何函数都可以查找出当前线程的std变量: std = global_dict[threading.current_thread()] ... #这种方式理论上是可行的,它最大的优点是消除了std对象在每层函数中的传递问题,但是,每个函数获取std的代码有点丑

# 使用ThreadLocal,不用查找dict,ThreadLocal帮你自动做这件事 import threading # 创建全局ThreadLocal对象: local_school = threading.local() def process_student(): # 获取当前线程关联的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): # 线程执行的函数 # 绑定ThreadLocal的student: local_school.student = name # 给全局ThreadLocal赋值 process_student() # 执行process_student()函数 t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join() # Hello, Alice (in Thread-A) # Hello, Bob (in Thread-B) # 一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。 # ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题
# 这个代码的py文件名为 mymode.py
from multiprocessing import Process import os def func(): print('in func',os.getpid(),os.getppid()) print(__name__) if __name__ == '__main__': # 当直接运行当前py文件时,__name__被赋值为 __main__, # 当文件作为模块被其他py文件引用执行其他py时,__name__ 为引用的模块名即 mymode print('in main', os.getpid(), os.getppid()) p = Process(target=func) p.start() # __main__ # in main 10616 5764 # __mp_main__ # 按照引用来说这里应该是 mymode,可能是Process这个模块做的处理,使得它的名字变为 __mp_main__ # in func 13564 10616
如果在另一个 py文件写如下代码
import mymode # 引用上面的文件作为模块
print(__name__)
# mymode mymode 中的 print(__name__)
# __main__ 本文件的 __name__
''' 进程:资源分配 的 最小单位 进程是一个正在运行的程序 程序和进程的区别 : 进程是正在运行的,程序没有运行 同步异步 阻塞非阻塞 同步 就是顺序执行 异步 可以同时执行 阻塞 停住 非阻塞 不停 并行和并发 并行 有多个CPU在同时执行任务 并发 只有一个CPU,交替执行多个任务 进程调度 : 就是多个进程(运行中的程序)在操作系统的控制下被CPU执行,去享用计算机的资源 先来先服务 短作业优先 时间片轮转 多级反馈队列 程序的并行与并发 并行更快 并发只是宏观上的同时执行 进程一共有三个状态: 就绪 运行 阻塞 os.getpid() 获取当前进程pid os.getppid() 获取父进程pid ''' import os import time from multiprocessing import Process # 进程模块 def func(): time.sleep(2) print('in func',os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p1 = Process(target=func) # 进程对象 p1.start() # 向操作系统提交了一个开启子进程的申请 p2 = Process(target=func) # 进程对象 p2.start() # 向操作系统提交了一个开启子进程的申请 print('主进程 的 代码执行结束了') # 原理 # if __name__ == '__main__': # 使用python都是调用操作系统的命令来启动进程 # 同样使用python 不同的操作系统的操作是不同的 # 对于windows来说 必要加if __name__ == '__main__': # 对于linux ios来说 不必要加if __name__ == '__main__': # 给子进程传参数 Process(target=func,args=(1,)) 使用args元组传给子进程函数参 def func(num): time.sleep(2) print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p1 = Process(target=func,args=(1,)) # 进程对象 p1.start() # 向操作系统提交了一个开启子进程的申请 p2 = Process(target=func,args=(2,)) # 进程对象 p2.start() # 向操作系统提交了一个开启子进程的申请 print('主进程 的 代码执行结束了') # 其他方法和属性 # 1.开启多个子进程 def func(num): print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) for i in range(10): p = Process(target=func,args=(i,)) p.start() # start不是运行一个程序,而是调用操作系统的命令,要创建子进程 print('主进程 的 代码执行结束了') # 2.join方法 def func(num): time.sleep(1) print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p = Process(target=func,args=(1,)) p.start() # start不是运行一个程序,而是调用操作系统的命令,要创建子进程 p.join() # 阻塞,直到p这个子进程执行完毕之后再继续执行 print('主进程 的 代码执行结束了') # 3.一批任务使用join def func(num): print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p_l = [] for i in range(10): p = Process(target=func,args=(i,)) p.start() # start不是运行一个程序,而是调用操作系统的命令,要创建子进程,非阻塞 p_l.append(p) # 如果在for循环里直接 p.join()的话则会一个子进程执行完了之后才会开启下一个子进程然后执行 print(p_l) for p in p_l : p.join() # 阻塞,直到p这个子进程执行完毕之后再继续执行,在上一个for循环中所有的子进程都开启了,这里把这些子进程都设置为join,主进程等待这些子进程都执行完后才继续执行 print('主进程 的 代码执行结束了') # 4.is_alive terminate def func(num): time.sleep(2) print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p1 = Process(target=func,args=(1,)) # 进程对象 p1.start() # 向操作系统提交了一个开启子进程的申请 print(p1.is_alive()) # 检测进程是否在执行任务 p1.terminate() # 强制结束子进程 - 非阻塞,就是说当执行这条命令后立即执行print(p1.is_alive())可能得到的结果是True,因为 p1.terminate()这条命令可能还没有执行完 print(p1.is_alive()) # 检测进程是否在执行任务 print('主进程 的 代码执行结束了') # 用面向对象的方式开启子进程 class MyProcess(Process): # 创建一个类继承Process方法 def __init__(self,num): # 使用num 传参 super().__init__() # 保留Process()方法的属性 self.num = num # 传参 def run(self): print('in run ',self.num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main ', os.getpid(), os.getppid()) p = MyProcess(1) p.start()
=================================================================================================== ''' 数据隔离 进程与进程之间的数据是隔离的;内存空间是不能共享的;所以要想进行通信,必须借助其他手段(socket通信中的文件家族或网络家族);且这两个进程都是自愿的 # 主进程和子进程数据隔离''' from multiprocessing import Process import time def func(i): time.sleep(1) print('子进程',i) if __name__ == '__main__': i = [] p = Process(target=func, args=(i,)) p.start() i.append(10) print('主进程',1) # 主进程 1 # 子进程 [] # 主进程创建子进程时传参 from multiprocessing import Process def func(i): print('子进程',i) if __name__ == '__main__': i = 10 p = Process(target=func, args=(i,)) p.start() print('主进程',i) # 主进程 10 # 子进程 10 ''' # 守护进程 *** # 守护进程会在主进程的代码执行完毕之后直接结束,无论守护进程是否执行完毕(注意这里是主进程的代码执行完毕而不是主进程结束,因为主进程需要给其他子进程收尸,即等待其他子进程结束后回收资源) # 守护进程常被用来报活,当服务器执行一段死循环的主进程时,使用子进程作为守护进程来报活,只有当主进程因为出错而跳出循环后然后执行完代码这时守护进程就结束了也就停止报活了(报活是守护进程每隔一段时间发送一条信息) '''def func1(): print('begin') time.sleep(3) print('wahaha') def func2(): while True: print('in func2') time.sleep(0.5) if __name__ == '__main__': Process(target=func1).start() p = Process(target=func2) p.daemon = True # 守护进程的属性,默认是False,如果设置成True,就表示设置这个子进程为一个守护进程 # 设置守护进程的操作应该在开启子进程之前 # p.start() # time.sleep(1) # print('主进程') # begin # in func2 # in func2 # 主进程 # wahaha ''' # 锁 Lock ***** # 当多个进程共享一段数据(文件,因为进程的内存的隔离的)的时候,数据会出现不安全的现象, # 需要加锁来维护数据的安全性 ''' lock = Lock() # 创造了一把锁 lock.acquire() # 获取了这把锁的钥匙 这个代码放到需要加锁的代码前 lock.release() # 归还这把锁的钥匙 这个代码放到需要加锁的代码后 # 可以用 with 上下文管理语句 with lock: 等价于 lock.acquire() 代码 <====> 代码 lock.release() # 这是因为 with as 的机制,先执行 __enter__ 语句,然后执行with内代码, 最后执行 __exit__ 语句 # Lock()类中一定是有 __enter__方法,这个方法中有lock.acquire()命令,也一定有__exit__方法,这个方法中有lock.release()命令 # 买票 多人买票 import json import time from multiprocessing import Lock from multiprocessing import Process def search(i): with open('db','r') as f:count_dic = json.load(f) # db 文件中存放中json格式的一个字典票数信息,{"count":1} time.sleep(0.2) # 模拟网络延时 print('person %s 余票 : %s张'%(i,count_dic['count'])) def buy(i): with open('db','r') as f:count_dic = json.load(f) time.sleep(0.2) if count_dic['count'] > 0: count_dic['count'] -= 1 print('person %s 购票成功'%i) time.sleep(0.2) with open('db','w') as f:json.dump(count_dic,f) def task(i,lock): search(i) lock.acquire() # 如果之前已经被acquire了 且 没有被release 那么进程会在这里阻塞 buy(i) lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=task,args=(i,lock)) p.start() # 信号量 Semaphore *** # 信号量的机制就是 锁 + 计数器 # sem = Semaphore(3) # 设置钥匙个数,即有几个进程可以执行锁内的代码 # sem.acquire() # 拿钥匙执行代码 # print(1) # sem.acquire() # 拿钥匙执行代码 # print(2) # sem.acquire() # 拿钥匙执行代码 # print(3) # sem.acquire() # 阻塞,钥匙被拿完,但是没有还钥匙会阻塞,知道有进程还钥匙 # print(4) import time import random from multiprocessing import Process,Semaphore def ktv(num,sem): sem.acquire() print('person%s进入了ktv' % num) time.sleep(random.randint(1,4)) print('person%s走出了ktv' % num) sem.release() if __name__ == '__main__': sem = Semaphore(4) # 设置了信号量的个数为4,只能有四个进程同时拿到钥匙 for i in range(10): p = Process(target=ktv,args=(i,sem)) # 把Semaphore()的对象传到子进程中 p.start() ''' # 事件 Event **3 通过一个标记来控制对多个进程进行同步控制 wait() 方法,看Event()对象的标志,默认是False阻塞 如果事件对象标志是False 那么就阻塞 如果事件对象标志是True 那么就非阻塞 is_set()查看标志 set()将标志设置为True clear() 将标志设置为False ''' from multiprocessing import Event,Process e = Event() print(e.is_set()) # 在事件的创建之初 默认是False e.set() # 将标志设置为True print(e.is_set()) e.wait() # 相当于什么都没做pass e.clear() # 将标志设置为False # e.wait() # 永远阻塞 e.wait(timeout=10) # 如果信号在阻塞10s之内变为True,那么不继续阻塞直接pass, # 如果就阻塞10s之后状态还是没变,那么继续, print(e.is_set()) # 无论前面的wait的timeout是否通过,我的状态都不会因此改变 # 红绿灯模型 # 控制交通灯的进程 import time import random def traffic_light(e): print('