from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
print('主线程')
View Code
from threading import Thread
import time
class Sayhi(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
time.sleep(2)
print('%s say hello' % self.name)
if __name__ == '__main__':
t = Sayhi('egon')
t.start()
print('主线程')
View Code
from threading import Thread
from multiprocessing import Process
import os
def work():
print('hello',os.getpid())
if __name__ == '__main__':
#part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
t1=Thread(target=work)
t2=Thread(target=work)
t1.start()
t2.start()
print('主线程/主进程pid',os.getpid())
#part2:开多个进程,每个进程都有不同的pid
p1=Process(target=work)
p2=Process(target=work)
p1.start()
p2.start()
print('主线程/主进程pid',os.getpid())
View Code
from threading import Thread
from multiprocessing import Process
import os
def work():
print('hello')
if __name__ == '__main__':
#在主进程下开启线程
t=Thread(target=work)
t.start()
print('主线程/主进程')
'''
打印结果:
hello
主线程/主进程
'''
#在主进程下开启子进程
t=Process(target=work)
t.start()
print('主线程/主进程')
'''
打印结果:
主线程/主进程
hello
'''
View Code
from threading import Thread
from multiprocessing import Process
import os
def work():
global n
n=0
if __name__ == '__main__':
# n=100
# p=Process(target=work)
# p.start()
# p.join()
# print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100
n=1
t=Thread(target=work)
t.start()
t.join()
print('主',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据
同一进程内的线程共享该进程的数据?
View Code
import time
from threading import Thread
from multiprocessing import Process
效率差别
def func(a):
a = a + 1
if __name__ == '__main__':
start = time.time()
t_l = []
for i in range(50):
t = Thread(target=func,args=(i,))
t.start()
t_l.append(t)
# t_l 50个线程对象
for t in t_l : t.join()
print('主线程')
print(time.time() - start)
start = time.time()
t_l = []
for i in range(50):
t = Process(target=func, args=(i,))
t.start()
t_l.append(t)
# t_l 50个线程对象
for t in t_l: t.join()
print('主进程')
print(time.time() - start)
start join
View Code
terminate (强制结束一个正在运行的进程)--非阻塞 在线程中没有
Thread实例对象的方法
线程之间的数据共享 (可以引用全局变量)
from threading import Thread
n = 100
def func():
global n
n -= 1
if __name__ == '__main__':
t_l = []
for i in range(100):
t = Thread(target=func)
t.start()
t_l.append(t)
for t in t_l:
t.join()
print(n)
View Code
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
t.join()
print('主线程')
print(t.is_alive())
'''
egon say hello
主线程
False
'''
View Code
import time
from threading import Thread
def thread1():
while True:
print(True)
time.sleep(0.5)
def thread2():
print('in t2 start')
time.sleep(3)
print('in t2 end')
if __name__ == '__main__':
t1 = Thread(target=thread1)
t1.setDaemon(True)
t1.start()
t2 = Thread(target=thread2)
t2.start()
time.sleep(1)
print('主线程')
View Code
主线程如果结束了 那么整个进程就结束
守护线程 会等待主线程(及其他子线程)结束之后才结束.
主进程 等待 守护进程 子进程
守护进程 只守护主进程的代码就可以了
守护线程不行 主线程如果结束了,其他子进程也结束了, 那么整个进程就结束 所有的线程就都结束
使用多线程实现tcp协议的socket server
客户端:
import multiprocessing
import threading
import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)
def action(conn):
while True:
data=conn.recv(1024)
print(data)
conn.send(data.upper())
if __name__ == '__main__':
while True:
conn,addr=s.accept()
p=threading.Thread(target=action,args=(conn,))
p.start()
View Code
import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
s.send(msg.encode('utf-8'))
data=s.recv(1024)
print(data)
View Code
from threading import Thread,get_ident
开启线程的第二种方式和查看线程id
class MyThread(Thread):
def __init__(self,args):
super().__init__() # Thread类的init,在这个方法中做了很多对self的赋值操作,都是给创建线程或者使用线程的时候用的
self.args = args
def run(self):
print('in my thread : ',get_ident(),self.args)
print('main',get_ident())
t = MyThread('wahaha')
t.start()
View Code
import time
from threading import Thread,get_ident,currentThread,enumerate,activeCount
开启线程的第二种方式和查看线程id
class MyThread(Thread):
def __init__(self,args):
super().__init__() # Thread类的init,在这个方法中做了很多对self的赋值操作,都是给创建线程或者使用线程的时候用的
self.args = args
def run(self):
time.sleep(0.1)
# print(currentThread())
print('in my thread : ',get_ident(),self.args) #查看id
print('main',get_ident())
t = MyThread('wahaha')
# print(t.is_alive())
t.start()
print(activeCount()) # 正在运行的线程的数量 len(enumerate())
# print(enumerate()) #有相同的结果。
# print('t : ',t)
# print(t.is_alive())
View Code
Thread实例对象的方法:
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。
threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
thread对象的其他方法 : isAlive ,setname,getname
threading模块的方法 : currentTread,activeCount,enumerate
在多个进程线程同时访问一个数据的时候就会产生数据不安全的现象
多进程 访问文件
多线程
同时去访问一个数据
GIL 全局解释器锁
在同一个进程里的每一个线程同一时间只能有一个线程访问cpu
尽量不要设置全局变量
只要在多线程/进程之间用到全局变量 就加上锁
from threading import Lock,Thread
lock = Lock()
lock.acquire()
# lock.acquire()
noodle = 100
def func(name,lock):
global noodle
lock.acquire()
noodle -= 1
lock.release()
print('%s吃到面了'%name)
if __name__ == '__main__':
lock = Lock() # 线程锁 互斥锁
t_lst = []
for i in range(10):
t = Thread(target=func,args=(i,lock))
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(noodle)
View Code
import time
from threading import Thread,Lock
lock = Lock()
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print('%s拿到了面' % name)
fork_lock.acquire()
print('%s拿到了叉子' % name)
print('%s在吃面'%name)
time.sleep(0.5)
fork_lock.release() # 0.01
noodle_lock.release() # 0.01
def eat2(name):
fork_lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
noodle_lock.acquire()
print('%s拿到了面' % name)
print('%s在吃面'%name)
time.sleep(0.5)
noodle_lock.release()
fork_lock.release()
eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()
View Code
from threading import RLock
rlock = RLock()
rlock.acquire()
print(1)
rlock.acquire()
print(2)
rlock.acquire()
print(3)
View Code
import time
from threading import Thread,RLock
lock = RLock()
def eat1(name):
lock.acquire()
print('%s拿到了面' % name)
lock.acquire()
print('%s拿到了叉子' % name)
print('%s在吃面'%name)
time.sleep(0.5)
lock.release() # 0.01
lock.release() # 0.01
def eat2(name):
lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
lock.acquire()
print('%s拿到了面' % name)
print('%s在吃面'%name)
time.sleep(0.5)
lock.release()
lock.release()
eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()
View Code
import time
from threading import Thread,Lock
lock = Lock()
def eat1(name):
lock.acquire()
print('%s拿到了面' % name)
print('%s拿到了叉子' % name)
print('%s在吃面'%name)
time.sleep(0.5)
lock.release() # 0.01
def eat2(name):
lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
print('%s拿到了面' % name)
print('%s在吃面'%name)
time.sleep(0.5)
lock.release()
eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()
View Code
死锁
多把锁同时应用在多个线程中
互斥锁和递归锁哪个好
递归锁 快速恢复服务
死锁问题的出现 是程序的设计或者逻辑的问题
还应该进一步的排除和重构逻辑来保证使用互斥锁也不会发生死锁
互斥锁和递归锁的区别
互斥锁 就是在一个线程中不能连续多次ACQUIRE
递归锁 可以在同一个线程中acquire任意次,注意acquire多少次就需要release多少次
信号量
锁 + 计数器
信号量与进程池的效率对比:
import time
from multiprocessing import Semaphore,Process,Pool
def ktv1(sem,i):
sem.acquire()
i += 1
sem.release()
def ktv2(i):
i += 1
if __name__ == '__main__':
sem = Semaphore(5)
start = time.time()
p_l = []
for i in range(100):
p = Process(target=ktv1,args=(sem,i))
p.start()
p_l.append(p)
for p in p_l : p.join()
print('###',time.time() - start)
start = time.time()
p = Pool(5)
p_l = []
for i in range(100):
ret = p.apply_async(func=ktv2, args=(sem, i))
p_l.append(ret)
p.close()
p.join()
print('***',time.time() - start)
View Code
池 和 信号量
池 效率高
池子里有几个一共就起几个
不管多少任务 池子的个数是固定的
开启进程和关闭进程这些事都是需要固定的开销
就不产生额外的时间开销
且进程程池中的进程数控制的好,那么操作系统的压力也小
信号量
有多少个任务就起多少进程/线程
可以帮助你减少操作系统切换的负担
但是并不能帮助你减少进/线程开启和关闭的时间
事件
wait
等 到 事件内部的信号变成True就不阻塞了
set
设置信号变成True
clear
设置信号变成False
is_set
查看信号是否为True
import time
import random
from threading import Event,Thread
def check(e):
'''检测一下数据库的网络和我的网络是否通'''
print('正在检测两台机器之间的网络情况 ...')
time.sleep(random.randint(1,3))
e.set()
def connet_db(e):
e.wait()
print('连接数据库 ... ')
print('连接数据库成功~~~')
e = Event()
Thread(target=connet_db,args=(e,)).start()
Thread(target=check,args=(e,)).start()
import time
import random
from threading import Event,Thread
def check(e):
'''检测一下数据库的网络和我的网络是否通'''
print('正在检测两台机器之间的网络情况 ...')
time.sleep(random.randint(0,2))
e.set()
def connet_db(e):
n = 0
while n < 3:
if e.is_set():
break
else:
e.wait(0.5)
n += 1
if n == 3:
raise TimeoutError
print('连接数据库 ... ')
print('连接数据库成功~~~')
e = Event()
Thread(target=connet_db,args=(e,)).start()
Thread(target=check,args=(e,)).start()
View Code
from threading import Condition
acquire
release
wait 阻塞
notify 让wait解除阻塞的工具
wait 或是 notify在执行这两个方法的前后 必须执行acquire和release
from threading import Condition,Thread
def func(con,i):
con.acquire()
# 判断某条件
con.wait()
print('threading : ',i)
con.release()
con = Condition()
for i in range(20):
Thread(target=func,args=(con,i)).start()
con.acquire()
# 帮助wait的子线程处理某个数据直到满足条件
con.notify_all()
con.release()
while True:
num = int(input('num >>>'))
con.acquire()
con.notify(num)
con.release()
View Code
定时器:
from threading import Timer
def func():
print('执行我啦')
interval 时间间隔
Timer(0.2,func).start() # 定时器
创建线程的时候,就规定它多久之后去执行
队列:
from multiprocessing import Queue,JoinableQueue # 进程IPC队列
from queue import Queue # 线程队列 先进先出的
from queue import LifoQueue # 后进先出的
put get put_nowait get_nowait full empty qsize
队列Queue
先进先出
自带锁 数据安全
栈 LifoQueue
后进先出
自带锁 数据安全
lq = LifoQueue(5)
lq.put(123)
lq.put(456)
lq.put('abc')
lq.put('abc')
lq.put('abc')
lq.put('abc')
lq.put('abc')
print(lq)
print(lq.get())
print(lq.get())
print(lq.get())
print(lq.get())
from queue import PriorityQueue # 优先级队列
pq = PriorityQueue()
pq.put((10,'aaa'))
pq.put((5,'zzz'))
pq.put((5,'bbb'))
pq.put((20,'ccc'))
print(pq.get())
print(pq.get())
print(pq.get())
print(pq.get())
View Code
线程池:
Threading 没有线程池的
Multiprocessing Pool
concurrent.futures帮助你管理线程池和进程池
import time
from threading import currentThread,get_ident
from concurrent.futures import ThreadPoolExecutor # 帮助你启动线程池的类
from concurrent.futures import ProcessPoolExecutor # 帮助你启动线程池的类
def func(i):
time.sleep(1)
print('in %s %s'%(i,currentThread()))
return i**2
def back(fn):
print(fn.result(),currentThread())
map启动多线程任务
t = ThreadPoolExecutor(5)
t.map(func,range(20))
for i in range(20):
t.submit(func,i)
submit异步提交任务
t = ThreadPoolExecutor(5)
for i in range(20):
t.submit(fn=func,)
t.shutdown()
print('main : ',currentThread())
起多少个线程池
5*CPU的个数
获取任务结果
t = ThreadPoolExecutor(20)
ret_l = []
for i in range(20):
ret = t.submit(func,i)
ret_l.append(ret)
t.shutdown()
for ret in ret_l:
print(ret.result())
print('main : ',currentThread())
回调函数
t = ThreadPoolExecutor(20)
for i in range(100):
t.submit(func,i).add_done_callback(back)
import os
import time
from concurrent.futures import ProcessPoolExecutor # 帮助你启动线程池的类
def func(i):
time.sleep(1)
print('in %s %s'%(i,os.getpid()))
return i**2
def back(fn):
print(fn.result(),os.getpid())
if __name__ == '__main__':
print('main : ',os.getpid())
t = ProcessPoolExecutor(20)
for i in range(100):
t.submit(func,i).add_done_callback(back)
View Code
multiprocessing模块自带进程池的
threading模块是没有线程池的
concurrent.futures 进程池 和 线程池
高度封装
进程池/线程池的统一的使用方式
创建线程池/进程池 ProcessPoolExecutor ThreadPoolExecutor
ret = t.submit(func,arg1,arg2....) 异步提交任务
ret.result() 获取结果,如果要想实现异步效果,应该是使用列表
map(func,iterable)
shutdown close+join 同步控制的
add_done_callback 回调函数,在回调函数内接收的参数是一个对象,需要通过result来获取返回值
回调函数仍然在主进程中执行
线程总结:
线程
守护线程
在主线程结束(包括所有的子线程)之后守护线程才结束
面向对象开启线程
原本写在func中的代码挪到run方法中
同步的机制
锁
GIL锁 是全局解释器锁 线程没有并行
是Cpython解释器的
在开发python解释器的时候可以减少很多细粒度的锁
为什么有了GIL锁还会产生数据安全问题呢?
因为GIL锁得是线程 而不是具体的内存
互斥锁 不能连续acquire两次
递归锁 可以在用一个线程/进程中被acquire多次
死锁 : 是一种现象,而不是一个工具
为什么产生死锁 : 代码的逻辑有问题
如何解决死锁 :
如果在服务阶段 -> 递归锁 ->排查逻辑 -> 互斥锁
如果在测试阶段 -> ->排查逻辑 -> 互斥锁
信号量 锁+计数器
和池的区别 : 信号量是有几个任务起几个进程/线程 池是固定的线程/进程数,不限量的任务
现象 : 信号量慢 且耗资源 池快
事件 Event wait set clear is_set
条件 condition wait notify acquire release
定时器 Timer
join 同步控制 用来获取结果
锁 数据安全
池 提高效率,解决并发