一、多线程编程
- 示例
import threading
import time
def Say_Hi(num):
print("hello %s" %num)
time.sleep(3)
print("hello2 %s" %num)
if __name__ == '__main__':
t1 = threading.Thread(target=Say_Hi, args=(111,)) #创建一个线程对象
t1.start()
t2 = threading.Thread(target=Say_Hi, args=(222,)) #创建一个线程对象
t2.start()
print("ending...")
1、join方法
import threading
import time
def Listen_Music(name):
print("%s begin to listen %s" %(name,time.ctime()))
time.sleep(3)
print("%s end to listen %s" %(name,time.ctime()))
def Play_Game(name):
print("%s begin to play game %s" %(name,time.ctime()))
time.sleep(5)
print("%s end to play game %s" %(name,time.ctime()))
if __name__ == '__main__':
music = threading.Thread(target=Listen_Music,args=("dongfei",))
game = threading.Thread(target=Play_Game,args=("dongfei",))
music.start()
music.join() #将等待music执行完成后再继续下边代码
game.start()
# music.join()
# game.join()
print("ending....")
2、setDaemon守护线程
import threading
import time
def Listen_Music(name):
print("%s begin to listen %s" %(name,time.ctime()))
time.sleep(3)
print("%s end to listen %s" %(name,time.ctime()))
def Play_Game(name):
print("%s begin to play game %s" %(name,time.ctime()))
time.sleep(5)
print("%s end to play game %s" %(name,time.ctime()))
music = threading.Thread(target=Listen_Music,args=("dongfei",))
game = threading.Thread(target=Play_Game,args=("dongfei",))
l = []
l.append(music)
l.append(game)
if __name__ == '__main__':
for i in l:
i.setDaemon(True) #设置为守护线程,主线程结束就退出
i.start()
print("ending....")
3、线程对象和threading模块的其他方法
-
run()
-
start() 将线程处于就绪状态
-
getName() 获取线程名
-
isAlive() 判断线程是否活动
-
setName() 设置线程名
-
threading.currentThread() 返回当前的线程变量
-
threading.enumerate() 返回一个正在运行的线程列表
-
threading.activeCount() 统计正在运行的线程数
4、线程锁(同步锁,递归锁)
-
GIL:全局解释器锁,同一时刻,只有一个线程在执行
- IO密集型任务:建议多线程执行
- CPU密集型任务:建议多进程 + 协成执行,由于GIL锁的原因,不推荐使用线程
-
同步锁(互斥锁)
import threading
import time
lock = threading.Lock() #创建锁
def sub():
global num
# num -= 1
lock.acquire() #加锁
temp = num
time.sleep(0.0001) #在此中间的过程只有一个线程执行
num = temp - 1
lock.release() #释放锁
num = 100
l = []
for i in range(100):
t = threading.Thread(target=sub)
t.start()
l.append(t)
for t in l:
t.join()
print(num)
- 死锁
import threading
import time
class MyThread(threading.Thread):
def actionA(self):
A.acquire()
print(self.name, "gotA", time.ctime())
time.sleep(2)
B.acquire()
print(self.name, "gotB", time.ctime())
time.sleep(1)
B.release()
A.release()
def actionB(self):
B.acquire()
print(self.name, "gotB", time.ctime())
time.sleep(2)
B.acquire()
print(self.name, "gotA", time.ctime())
time.sleep(1)
A.release()
B.release()
def run(self):
self.actionA()
self.actionB()
if __name__ == '__main__':
A = threading.Lock()
B = threading.Lock()
L = []
for i in range(5):
t = MyThread()
t.start()
L.append(t)
for i in L:
i.join()
print("ending...")
- 递归锁
import threading
import time
class MyThread(threading.Thread):
def actionA(self):
r_lock.acquire()
print(self.name, "gotA", time.ctime())
time.sleep(2)
r_lock.acquire()
print(self.name, "gotB", time.ctime())
time.sleep(1)
r_lock.release()
r_lock.release()
def actionB(self):
r_lock.acquire()
print(self.name, "gotB", time.ctime())
time.sleep(2)
r_lock.acquire()
print(self.name, "gotA", time.ctime())
time.sleep(1)
r_lock.release()
r_lock.release()
def run(self):
self.actionA()
self.actionB()
if __name__ == '__main__':
# A = threading.Lock()
# B = threading.Lock()
r_lock = threading.RLock() #递归锁
L = []
for i in range(5):
t = MyThread()
t.start()
L.append(t)
for i in L:
i.join()
print("ending...")
5、同步条件对象(EVENT对象)
- 设置flag,wait非阻塞;清除flag,wait阻塞
event = threading.Event()
event.set()
print(event.isSet()) #False or Ture
event.clear()
event.wait()
6、信号量
- 限制同时可以开几个线程
import threading, time
class MyThread(threading.Thread):
def run(self):
if semaphore.acquire(): #锁定,每次只能放5个执行
print(self.name)
time.sleep(2)
semaphore.release() #释放
if __name__ == '__main__':
semaphore = threading.Semaphore(5) #同时可以开5个线程
thrs = []
for i in range(100):
thrs.append(MyThread())
for t in thrs:
t.start()
7、线程队列(queue)
import queue
q = queue.Queue(3) #FIFO,默认先进先出;3表示只能放3个元素
q2 = queue.LifoQueue() #LIFO,后进先出
q3 = queue.PriorityQueue() #按优先级出
q3.put([2,"abc"]) #2为优先级,数字越大优先级越低
q.put(111)
q.put("hello")
q.put({"name":"dongfei"})
# q.put("abc",False) #如果传不进去则报错
while True:
data = q.get(block=False) #如果没有值则报错
print(data)
print("----")
- 队列模块的常见使用方法
print(q.qsize()) #队列的当前的值
print(q.empty()) #队列是否为空
print(q.full()) #队列是否已满
q.get_nowait() # == q.get(Flase)
q.task_done() #发信号
q.join() #收信号
- 消费者生产者模型
import time
import random
import queue
import threading
q = queue.Queue()
def Producer(name) #生产者
count = 0
while count < 10:
print("making...")
time.sleep(random.randrange(3))
q.put(count)
print("Producer %s has produced %s baozi." %(name,count))
count += 1
print("ok.")
def Consumer(name): #消费者
count = 0
while count < 10:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
print("Consumer %s has eat %s baozi." %(name,data))
else:
print("no baozi anymore!")
count += 1
p1 = threading.Thread(target=Producer, args=("A",))
c1 = threading.Thread(target=Consumer, args=("B",))
p1.start()
c1.start()
二、多进程编程
- 多进程示例1
from multiprocessing import Process
import time
def show_time(name):
time.sleep(1)
print(name, time.ctime())
if __name__ == '__main__':
p_list = []
for i in range(3):
p = Process(target=show_time,args=('Tom',))
p_list.append(p)
p.start()
for i in p_list:
i.join()
print("end.")
- 多进程示例2
from multiprocessing import Process
import time
class MyProcess(Process):
def run(self):
time.sleep(1)
print(self.name, time.ctime()) #self.name 进程名
if __name__ == '__main__':
p_list = []
for i in range(3):
p = MyProcess()
p_list.append(p)
p.start()
for i in p_list:
i.join()
print("end.")
1、守护进程
from multiprocessing import Process
import time
class MyProcess(Process):
def run(self):
time.sleep(3)
print(self.name, time.ctime())
if __name__ == '__main__':
p_list = []
for i in range(3):
p = MyProcess()
p_list.append(p)
p.daemon = True #设置为守护进程
p.start()
# for i in p_list:
# i.join()
print("end.")
2、获取进程号
from multiprocessing import Process
import os
import time
def info(title):
print("title:", title)
print("parent process:", os.getppid())
print("process id:", os.getpid())
if __name__ == '__main__':
info("main process line")
time.sleep(1)
print("===========")
p = Process(target=info, args=("tom",))
p.start()
p.join()
3、实例方法和属性
- 实例方法
is_alive(): 判断进程是否运行
join([timeout]): 阻塞当前上下文,直到此实例完成
start(): 进程就绪,等待CPU调度运行
run(): start()调用run()方法
terminate(): 立即停止工作进程
- 属性
daemon: 同线程的setDeamon功能,设置为守护进程
name: 进程名
pid: 进程号
4、进程间通信
- 进程队列
from multiprocessing import Process
from multiprocessing import Queue
def foo(q):
q.put("dongfei")
if __name__ == '__main__':
q = Queue() #进程队列
p = Process(target=foo,args=(q,))
p.start()
p.join()
print(q.get())
- 管道
from multiprocessing import Process, Pipe
def foo(conn):
conn.send("dad hello")
response = conn.recv()
print("father say:", response)
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=foo, args=(child_conn,))
p.start()
print("son say:", parent_conn.recv())
parent_conn.send("son hello")
p.join()
- 数据共享
from multiprocessing import Process, Manager
def foo(d,i):
d.append(i)
if __name__ == '__main__':
with Manager() as manager:
d = manager.list()
for i in range(10):
p = Process(target=foo,args=(d,i))
p.start()
p.join()
print(d)
5、进程池
线程池的作用:比如现在有100个任务,我们可以1个1个的运行,但是这样运行时间太长;我们也可以直接起100个进程去处理,但是这样消耗的资源庞大,反而会更慢;所有我们选择一个折中的方法,每次起4个进程去处理,这个时候就需要用到进程池了
from multiprocessing import Process, Pool
import time,os
def foo(i):
time.sleep(1)
print(i)
return i+100
def bar(arg): #被主进程调用
print(arg) #arg是foo函数的返回值
print("is ok...")
if __name__ == '__main__':
pool = Pool(4)
for i in range(100):
# pool.apply(func=foo,args=(i,)) #同步
# pool.apply_async(func=foo,args=(i,)) #异步
pool.apply_async(func=foo,args=(i,), callback=bar) #回调函数:foo函数执行成功后,再去执行bar函数
pool.close() #此处必须先close,再join
pool.join()
print("end")
三、协程
又称微线程,纤程,Coroutine
概念:协作式,非抢占式
协程的优势:
- 没有切换的消耗
- 没有锁的概念
- 可以使用多进程 + 协程利用多核工作
- yield语法
import time
def consumer(name):
print("--->ready to eat baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name,new_baozi))
time.sleep(1)
def producer():
r = con.__next__()
r2 = con2.__next__()
n = 0
while True:
time.sleep(1)
print("[producer is making baozi %s and %s]" % (n, n+1))
con.send(n)
con2.send(n+1)
if __name__ == '__main__':
con = consumer("c1")
con2 = consumer("c2")
p = producer()
- greenlet模块:手动切换
from greenlet import greenlet
def t1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def t2():
print(56)
gr1.switch()
print(78)
gr1 = greenlet(t1)
gr2 = greenlet(t2)
gr2.switch()
- gevent模块:自动切换
import gevent
import requests,time
start = time.time()
def f(url):
print('GET: %s' %url)
resp = requests.get(url)
data = resp.text
f = open('new.html', 'w',encoding="utf8")
f.write(data)
print('%d bytes received from %s.' %(len(data), url))
gevent.joinall([
gevent.spawn(f, 'https://www.python.org'),
gevent.spawn(f, 'https://www.baidu.com'),
gevent.spawn(f, 'https://www.aliyun.com'),
])
end = time.time()
print(end - start)