池的概念:
回调函数:pool 这个类中 回调函数是主进程执行
如果有两个任务,我的第二个任务在第一个任务执行完毕之后能够立即被主进程执行
线程的概念:
进程时操作系统中的最小资源分配单位
线程是cpu调度的最小单位
线程和进程之间的对比:
线程不能独立存在,必须在一个进程里
线程的开启 关闭以及切换的开销要远远小于进程
同一个进程之间的多个线程之间数据共享
全局解释锁GIL
使得一个进程的多个线程不能充分利用多核
进程号还是线程好
什么时候用多进程 什么时候用多线程
并发
cpu使用率:计算 -多进程
IO操作:网络/文件/数据库 -多线程
2000
进程+线程
线程 Thread模块
同步控制 : 锁 事件 信号量 条件 定时器 队列
线程池/进程池
线程Tread模块:和进程的大致使用方法一样,福利啊
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主线程') 创建线程的方式1
from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主线程') 创建线程的方式2
线程和进程的差异
pid的比较:在一个进程内的线程是同一个pid,而进程的pid不一样
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主线程/主进程pid',os.getpid()) #part2:开多个进程,每个进程都有不同的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主线程/主进程pid',os.getpid()) pid的比较
开启效率的问题:这两个适用的方面不同
# import time # from threading import Thread # from multiprocessing import Process # 效率差别 # def func(a): # a = a + 1 # # if __name__ == '__main__': # start = time.time() # t_l = [] # for i in range(50): # t = Thread(target=func,args=(i,)) # t.start() # t_l.append(t) # # t_l 50个线程对象 # for t in t_l : t.join() # print('主线程') # print(time.time() - start) # # start = time.time() # t_l = [] # for i in range(50): # t = Process(target=func, args=(i,)) # t.start() # t_l.append(t) # # t_l 50个线程对象 # for t in t_l: t.join() # print('主进程') # print(time.time() - start)
内存共享问题:进程里的多个线程共享进程里的资源
# 线程之间的数据共享 # from threading import Thread # n = 100 # def func(): # global n # n -= 1 # # if __name__ == '__main__': # t_l = [] # for i in range(100): # t = Thread(target=func) # t.start() # t_l.append(t) # for t in t_l: # t.join() # print(n)
多线程起socket:和进程相比,这个好
#_*_coding:utf-8_*_ #!/usr/bin/env python import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start() server
#_*_coding:utf-8_*_ #!/usr/bin/env python import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue s.send(msg.encode('utf-8')) data=s.recv(1024) print(data) client
擦一个:线程的查看方法:get_ident,需要添加到threading后
类的其他方法:
Thread实例对象的方法:
isAlive()判断线程的状态
getName()返回线程的名字
setName()设置线程的名字
threading模块提供的方法:
currentTread()返回当前线程的变量
enumerate返回正在运行的线程,list形式
active返回线程的数量,注意提前关闭就看不到了
from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #在主进程下开启线程 t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) #主线程 print(threading.enumerate()) #连同主线程在内有两个运行的线程 print(threading.active_count()) print('主线程/主进程') ''' 打印结果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主线程/主进程 Thread-1 ''' 代码示例
join方法,线程/进程执行完以后进行主线程/进程的执行,注意有多个时候需要放入列表中
import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主线程') print(t.is_alive()) ''' egon say hello 主线程 False ''' join方法
守护进程:
主线程如果结束了 那么整个进程就结束 # 守护线程 会等待主线程结束之后才结束. # 主进程 等待 守护进程 子进程 # 守护进程 只守护主进程的代码就可以了 # 守护线程不行 主线程如果结束了 那么整个进程就结束 所有的线程就都结束
无论是进程还是线程,都遵循:守护**会等待**运行完毕后被销毁,需要强调的是:运行完毕并非终止运行
对主进程来说:运行完毕时指主进程代码运行完毕
对于主线程来说:运行完毕时指所在的进程内所有非瘦护线程统统运行完毕,主进程才算运行完毕
1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束, #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
import time # from threading import Thread # def thread1(): # while True: # print(True) # time.sleep(0.5) # # def thread2(): # print('in t2 start') # time.sleep(3) # print('in t2 end') # # if __name__ == '__main__': # t1 = Thread(target=thread1) # t1.setDaemon(True) # t1.start() # t2 = Thread(target=thread2) # t2.start() # time.sleep(1) # print('主线程')
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必须在t.start()之前设置 t.start() print('主线程') print(t.is_alive()) ''' 主线程 True ''' 守护线程例1
锁与GIL:cpython:全局解释锁:
在多个进程线程同时访问一个数据的时候就会产生数据不安全的现象 # 多进程 访问文件 # 多线程 # 同时去访问一个数据 # GIL 全局解释器锁 # 在同一个进程里的每一个线程同一时间只能有一个线程访问cpu # 尽量不要设置全局变量 # 只要在多线程/进程之间用到全局变量 就加上锁
from threading import Lock,Thread # lock = Lock() # lock.acquire() # lock.acquire() # noodle = 100 # def func(name,lock): # global noodle # lock.acquire() # noodle -= 1 # lock.release() # print('%s吃到面了'%name) # # if __name__ == '__main__': # lock = Lock() # 线程锁 互斥锁 # t_lst = [] # for i in range(10): # t = Thread(target=func,args=(i,lock)) # t.start() # t_lst.append(t) # for t in t_lst: # t.join() # print(noodle)
递归锁:
死锁 # 多把锁同时应用在多个线程中 # 互斥锁和递归锁哪个好 # 递归锁 快速恢复服务 # 死锁问题的出现 是程序的设计或者逻辑的问题 # 还应该进一步的排除和重构逻辑来保证使用互斥锁也不会发生死锁 # 互斥锁和递归锁的区别 # 互斥锁 就是在一个线程中不能连续多次ACQUIRE # 递归锁 可以在同一个线程中acquire任意次,注意acquire多少次就需要release多少次
进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release() 死锁
解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release() 递归锁RLock
科学家吃面:
科学家吃面问题 # import time # from threading import Thread,Lock # lock = Lock() # noodle_lock = Lock() # fork_lock = Lock() # def eat1(name): # noodle_lock.acquire() # print('%s拿到了面' % name) # fork_lock.acquire() # print('%s拿到了叉子' % name) # print('%s在吃面'%name) # time.sleep(0.5) # fork_lock.release() # 0.01 # noodle_lock.release() # 0.01 # # def eat2(name): # fork_lock.acquire() # 0.01 # print('%s拿到了叉子' % name) # 0.01 # noodle_lock.acquire() # print('%s拿到了面' % name) # print('%s在吃面'%name) # time.sleep(0.5) # noodle_lock.release() # fork_lock.release() # # eat_lst = ['alex','wusir','太白','yuan'] # for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2 # Thread(target=eat1,args=(name,)).start() # Thread(target=eat2,args=(name,)).start()
解决方案:
1:
递归锁解决死锁问题 # import time # from threading import Thread,RLock # lock = RLock() # def eat1(name): # lock.acquire() # print('%s拿到了面' % name) # lock.acquire() # print('%s拿到了叉子' % name) # print('%s在吃面'%name) # time.sleep(0.5) # lock.release() # 0.01 # lock.release() # 0.01 # # def eat2(name): # lock.acquire() # 0.01 # print('%s拿到了叉子' % name) # 0.01 # lock.acquire() # print('%s拿到了面' % name) # print('%s在吃面'%name) # time.sleep(0.5) # lock.release() # lock.release() # # eat_lst = ['alex','wusir','太白','yuan'] # for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2 # Thread(target=eat1,args=(name,)).start() # Thread(target=eat2,args=(name,)).start()
2:
互斥锁解决死锁问题 # import time # from threading import Thread,Lock # lock = Lock() # def eat1(name): # lock.acquire() # print('%s拿到了面' % name) # print('%s拿到了叉子' % name) # print('%s在吃面'%name) # time.sleep(0.5) # lock.release() # 0.01 # # def eat2(name): # lock.acquire() # 0.01 # print('%s拿到了叉子' % name) # 0.01 # print('%s拿到了面' % name) # print('%s在吃面'%name) # time.sleep(0.5) # lock.release() # # eat_lst = ['alex','wusir','太白','yuan'] # for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2 # Thread(target=eat1,args=(name,)).start() # Thread(target=eat2,args=(name,)).start()
信号量:锁 + 计数器
同进程的一样
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
import time # from multiprocessing import Semaphore,Process,Pool # def ktv1(sem,i): # sem.acquire() # i += 1 # sem.release() # # def ktv2(i): # i += 1 # # if __name__ == '__main__': # sem = Semaphore(5) # start = time.time() # p_l = [] # for i in range(100): # p = Process(target=ktv1,args=(sem,i)) # p.start() # p_l.append(p) # for p in p_l : p.join() # print('###',time.time() - start) # # start = time.time() # p = Pool(5) # p_l = [] # for i in range(100): # ret = p.apply_async(func=ktv2, args=(sem, i)) # p_l.append(ret) # p.close() # p.join() # print('***',time.time() - start)
池 和 信号量
# 池 效率高
# 池子里有几个一共就起几个
# 不管多少任务 池子的个数是固定的
# 开启进程和关闭进程这些事都是需要固定的开销
# 就不产生额外的时间开销
# 且进程程池中的进程数控制的好,那么操作系统的压力也小
# 信号量
# 有多少个任务就起多少进程/线程
# 可以帮助你减少操作系统切换的负担
# 但是并不能帮助你减少进/线程开启和关闭的时间
事件:
同进程的一样
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作
# wait
# 等 到 事件内部的信号变成True就不阻塞了
# set
# 设置信号变成True
# clear
# 设置信号变成False
# is_set
# 查看信号是否为True
# import time
# import random
# from threading import Event,Thread
# def check(e):
# '''检测一下数据库的网络和我的网络是否通'''
# print('正在检测两台机器之间的网络情况 ...')
# time.sleep(random.randint(1,3))
# e.set()
#
# def connet_db(e):
# e.wait()
# print('连接数据库 ... ')
# print('连接数据库成功~~~')
#
# e = Event()
# Thread(target=connet_db,args=(e,)).start()
# Thread(target=check,args=(e,)).start()
# import random
# from threading import Event,Thread
# def check(e):
# '''检测一下数据库的网络和我的网络是否通'''
# print('正在检测两台机器之间的网络情况 ...')
# time.sleep(random.randint(0,2))
# e.set()
#
# def connet_db(e):
# n = 0
# while n < 3:
# if e.is_set():
# break
# else:
# e.wait(0.5)
# n += 1
# if n == 3:
# raise TimeoutError
# print('连接数据库 ... ')
# print('连接数据库成功~~~')
#
# e = Event()
# Thread(target=connet_db,args=(e,)).start()
# Thread(target=check,args=(e,)).start()
条件:
和信号量,信号量是在做判断的时候用的,不限线程
条件是可以指定执行几个线程
使得线程等待,只有满足某条件时,才释放n个线程
# acquire
# release
# wait 阻塞
# notify 让wait解除阻塞的工具
# wait还是notify在执行这两个方法的前后 必须执行acquire和release
# from threading import Condition,Thread
# def func(con,i):
# con.acquire()
# # 判断某条件
# con.wait()
# print('threading : ',i)
# con.release()
#
# con = Condition()
# for i in range(20):
# Thread(target=func,args=(con,i)).start()
# con.acquire()
# # 帮助wait的子线程处理某个数据直到满足条件
# con.notify_all()
# con.release()
# while True:
# num = int(input('num >>>'))
# con.acquire()
# con.notify(num)
# con.release()
Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
定时器:指定运行时间,到时间后自动运行,
定时器,指定n秒后执行某个操作
from threading import Timer def func(): print('执行我啦') # interval 时间间隔 Timer(0.2,func).start() # 定时器 # 创建线程的时候,就规定它多久之后去执行