进程、线程怎么区分? 最简洁直白的话,多线程一般用于相当于几个人干一件事,多进程相当于几个人分别一件事干一遍。
1、线程
1.1 简单线程
import threading def fo(): print("hello") def f1(a1, a2): fo() t = threading.Thread(target=f1, args=(123, 11)) #创建一个子线程 t.start() t = threading.Thread(target=f1, args=(123, 11)) #再创建一个子线程 t.start()
1.2 主线程等待子线程
import threading import time def fo(): print("hello") def f1(a1, a2): time.sleep(5) fo() t = threading.Thread(target=f1, args=(123, 11)) #创建一个子线程 t.setDaemon(False) #主线程是否等待子线程 ,True为不等待,False为等待; t.start() t = threading.Thread(target=f1, args=(123, 11)) t.setDaemon(True) t.start() 输出结果为: 》》》 hello 只有一个结果,是因为第二个子线程没有执行完成,主线程已经执行完了
1.3 主线程等待,子线程执行
join(1) #最多等待1s
import time def fo(): print("hello") def f1(a1, a2): time.sleep(5) fo() t = threading.Thread(target=f1, args=(123, 11)) t.start() t.join() t = threading.Thread(target=f1, args=(123, 11)) t.start() j结果输出: hello hello 第一个hello出来之后,5s后第二个hello才出来,是因为当运行到t.join()时,等待第一子线程运行完,主线程才执行下一步
1.4 防止脏数据,线程锁
import threading, time globals_num = 0 lock = threading.RLock() def func(): lock.acquire() #锁住线程 global globals_num globals_num +=1 #过程中只有当一个线程执行完毕,下一个线程开始执行 time.sleep(1) print(globals_num) lock.release() #解锁线程 for i in range(10): t = threading.Thread(target = func) #创建十个线程 t.start()
1.5 event ,相当于集合点(可以想象红绿灯)
import threading def do(event): print("start") event.wait() #默认false,线程等待 。。红灯 print("end") event_obj = threading.Event() for i in range(3): t = threading.Thread(target=do, args=(event_obj,)) t.start() #event_obj.clear() #false 改状态 红灯 inp = input(">>>>") if inp == 'true' : event_obj.set() #True 改状态绿灯
2、队列 (使用场景,排队, 12306, 游戏)
import queue
get 等
get_nowait ,不等
3、进程
3.1简单进程
import time def f1(a1): print(a1) if __name__ == '__main__': t = multiprocessing.Process(target=f1, args=(11,)) t.start() t2 = multiprocessing.Process(target=f1, args=(12,)) t2.start() 结果: 11 12
3.2 进程之间不共享数据
from multiprocessing import Process li = [] def foo(i): li.append(i) print(li) if __name__ == '__main__': for i in range(5): p = Process(target=foo, args=(i,)) p.start() 结果: [1] [3] [0] [2] [4]
3.3 进程数据共享
from multiprocessing import Process,Manager def foo(i, dic): dic[i] = 100 + i #第一个进程的dict={0;100},第二个进程在第一个的基础上增加dict[0] = 101 for k, v in dic.items(): print(k, v) if __name__ == '__main__': manager = Manager() dic = manager.dict() #数据共享一般采用此类方法 for i in range(2): p = Process(target=foo, args=(i, dic,)) p.start() p.join()
结果:
0 100
0 100
1 101
5、进程池 pool
pool.apply 每一个任务都是排队进行,进程join()
pool.apply_async 每一个任务都是并发进行,可设置回调函数,无join(),进程daemon为True
from multiprocessing import Pool import time def f1(a): time.sleep(3) print(a) return 100 def f2(arg): print(arg) if __name__ == '__main__': pool = Pool(5) #进程池最大进程数 for i in range(10): pool.apply_async(func=f1, args=(i,), callback=f2) pool.close() pool.join() 结果就不贴了,可以看到是5个进程输出,再5个进程输出
from multiprocessing import Pool import time def f1(a): time.sleep(3) print(a) if __name__ == '__main__': pool = Pool(5) #进程池最大进程数 for i in range(10): pool.apply(func=f1, args=(i,)) pool.close() pool.join() 每间隔3s输出一个结果
6、线程池
6.1简易线程池
import threading import queue import time class ThreadPool: def __init__(self, max_num =20): self.queue = queue.Queue(max_num) for i in range(max_num): self.queue.put(threading.Thread) def get_thread(self): return self.queue.get() def add_thread(self): self.queue.put(threading.Thread) def func(pool, args): time.sleep(2) pool.add_thread() print(args) p = ThreadPool(10) for i in range(100): thread = p.get_thread() r = thread(target=func, args=(p, i)) r.start()
6.2 实际线程池
import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num #最大线程数 self.cancel = False self.terminal = False self.generate_list = [] #实际使用的线程 self.free_list = [] #空闲线程 def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 创建一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread() self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self. free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 执行完所有的任务后,所有线程停止 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 无论是否还有任务,终止线程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.queue.clear() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): #回调函数 # status, execute action status # result, execute action return value pass def action(i): time.sleep(5) print(i) for i in range(30): ret = pool.run(action, (i,), callback) time.sleep(5) print(len(pool.generate_list), len(pool.free_list)) pool.close() #pool.terminate()