一. 线程同步之信号量--semaphore
作用:控制进入数量的锁
举个例子:
写文件的时候,一般只用于一个线程写;读文件的时候可以用多个线程读,我们可以用信号量来控制多少个线程读文件
做爬虫的时候,也可以用信号量来控制并发数量,以免访问量过多而被反爬,如下面代码
import threading
import time
# 模拟2秒钟抓取一个html
class HtmlSpider(threading.Thread):
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
time.sleep(2)
print("success")
class UrlProducer(threading.Thread):
def run(self):
for i in range(10):
html_thread = HtmlSpider("https://baidu.com/{}".format(i))
html_thread.start()
if __name__ == "__main__":
url_producer = UrlProducer()
url_producer.start()
这样会生成10个线程,并一次输出10个success,那么如果想每次生成3个线程去爬取,怎么做
import threading
import time
# 模拟2秒钟抓取一个html
class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
time.sleep(2)
print("success")
self.sem.release() # 第三步:在这里释放锁,因为线程里运行的是爬虫
class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(10):
self.sem.acquire() # 第二步:获得锁,每获得一个锁信号量中的值就减一。获得3个锁时暂停程序,等待锁释放,看Semaphore源码
html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
html_thread.start()
if __name__ == "__main__":
sem = threading.Semaphore(3) # 第一步,设置3个并发
url_producer = UrlProducer(sem)
url_producer.start()
二.ThreadPoolExcutor线程池
为什么要用线程池
1)线程池提供一个最大线程允许的数量,当任务请求过多而超过线程池最大值时,就会造成阻塞。这个功能信号量也能做到
2)线程池允许主线程中获得某一个线程的状态,或者某一个任务的状态以及返回值
3)当一个线程完成时,主线程能立即知道
4)futures模块可以让多线程和多进程编码接口一致,如果想把多线程切换为多进程就会很方便
例子1
from concurrent.futures import ThreadPoolExecutor
import time
def get_html(times):
time.sleep(times)
print("use {} to success".format(times))
return "运行的时间是{}秒".format(times)
executor = ThreadPoolExecutor(max_workers=2) # 生成一个线程池对象,设置线程池里同时运行的数量
# 通过submit函数提交执行的函数到线程池中,返回一个Future对象
task1 = executor.submit(get_html, (2)) #(2)为函数get_html中的参数值
task2 = executor.submit(get_html, (1))
# 返回对象的done方法可用于判断任务是否执行成功,并且是立即执行,这里用task1为例子
print(task1.done())
time.sleep(3) #等待3秒后,在用done方法测试,结果为True. 可能是pychram内部计算问题,这里不能写2,否则会显示False
print(task1.done())
print(task1.result()) #result方法可获得get_html函数的返回值
输出为
False
use 1 to success
use 2 to success
True
运行的时间是2秒
例子2:获取已经成功的任务返回,可以用as_completed库
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def get_html(times):
time.sleep(times)
print("use {} to success".format(times))
return "我运行的时间是{}秒".format(times)
executor = ThreadPoolExecutor(max_workers=2) # 设置线程池里同时运行的数量
# 模拟各线程爬取时间为urls列表
urls = [3, 4, 9, 7]
all_task = [executor.submit(get_html, (url)) for url in urls]
for future in as_completed(all_task):
data = future.result()
print(data)
按每一个线程完成时,分4步输出下面信息
use 3 to success
我运行的时间是3秒
use 4 to success
我运行的时间是4秒
use 7 to success
我运行的时间是7秒
use 9 to success
我运行的时间是9秒
注意:不管urls列表中爬取时间顺序如何,主线程的输出都是按照时间先后顺序输出的
例子3,通过executor获取已经完成的task,需要用到map函数
from concurrent.futures import ThreadPoolExecutor
import time
def get_html(times):
time.sleep(times)
print("use {} to success".format(times))
return "我运行的时间是{}秒".format(times)
executor = ThreadPoolExecutor(max_workers=2) # 设置线程池里同时运行的数量
# 模拟各线程爬取时间为urls列表
urls = [3, 4, 9, 7]
# 通过executor获取已经完成的task, 使用map(),和python中的map函数类似
for data in executor.map(get_html, urls):
print(data)
输出
use 3 to success
我运行的时间是3秒
use 4 to success
我运行的时间是4秒
use 7 to success
use 9 to success
我运行的时间是9秒
我运行的时间是7秒
注意:主线程的输出顺序和urls列表中的时间顺序一样,和上面的例子注意区分
例子4:wait函数,可用于等待某一个任务完成时,输出自定义状态,在例子2的基础上修改如下
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
import time
def get_html(times):
time.sleep(times)
print("use {} to success".format(times))
return "我运行的时间是{}秒".format(times)
executor = ThreadPoolExecutor(max_workers=2) # 设置线程池里同时运行的数量
# 模拟各线程爬取时间为urls列表
urls = [3, 4, 9, 7]
all_task = [executor.submit(get_html, (url)) for url in urls]
# 添加wait函数,其中的return_when表示第一个线程完成时执行下一行代码
wait(all_task, return_when=FIRST_COMPLETED)
print("main")
for future in as_completed(all_task):
data = future.result()
print(data)
输出
use 3 to success
main
我运行的时间是3秒
use 4 to success
我运行的时间是4秒
use 7 to success
我运行的时间是7秒
use 9 to success
我运行的时间是9秒
注意输出中main的位置,看下wait的源码理解下return_when
三. ThreadPoolExecutor源码分析
1. 未来对象:Future对象
from concurrent.futures import Future: 主要用于作为task的返回容器
现在感觉看不懂,以后再看
四. 多进程和多线程对比
因为多线程只能使用1个核。如果想提高效率,使用多个核运行程序,可以用多进程编程
对于I/O操作,一般使用多线程编程,因为进程切换代价要高于线程
例子1:斐波那契的计算是一个耗CPU的操作,下面用线程和进程执行一个10个数的斐波那契值所需要的时间
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor
import time
def fib(n):
if n<=2:
return 1
return fib(n-1)+fib(n-2)
if __name__ == "__main__":
with ThreadPoolExecutor(3) as executor:
all_task = [executor.submit(fib, (num)) for num in range(25, 35)]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
#print("fib值为: {}".format(data))
print("多线程用时:{}".format(time.time()-start_time))
with ProcessPoolExecutor(3) as executor:
all_task = [executor.submit(fib, (num)) for num in range(25, 35)]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
#print("fib值为: {}".format(data))
print("多进程用时 :{}".format(time.time()-start_time))
输出结果
多线程用时:4.100145578384399
多进程用时 :2.9355385303497314
例子2:I/O操作的例子
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor
import time
def random_sleep(n):
time.sleep(n)
return n
if __name__ == "__main__":
with ProcessPoolExecutor(3) as executor:
all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("多进程用时 :{}".format(time.time()-start_time))
with ThreadPoolExecutor(3) as executor:
all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("多进程用时 :{}".format(time.time()-start_time))
输出结果如下
多进程用时 :20.17968201637268
多线程用时 :20.00802493095398
虽然差别不大,还是线程快点,而且线程比进程耗费的内存少的多
五. 多进程编程
1. 先来了解下fork函数
fork在linux中用于创建子进程,不能在windows中使用,如下代码存在一个文件比如1.py中
import os
import time
pid = os.fork()
print("jack")
if pid == 0:
print('子进程 {} ,父进程是: {}.' .format(os.getpid(), os.getppid()))
else:
print('我是父进程:{}.'.format(pid))
time.sleep(2)
输出结果
jack
我是父进程:1016.
jack
子进程 1016 ,父进程是: 1015.
说明:
1)执行文件1.py时,会生成一个主进程;代码里的fork()又创建了一个子进程,pid不会是0。所以会先输出前两行的内容
2)1.py的主进程执行完后,会执行里面的子进程,它会复制os.fork()后的所有代码,重新执行一次,所有得到后两行输出
2. multiprocessing来实现多进程,比ProcessPoolExecutor更底层
import multiprocessing
import time
def get_html(n):
time.sleep(n)
print("sub_progress sucess")
return n
if __name__ == "__main__":
# 使用mulproccessing中的线程池
pool = multiprocessing.Pool(multiprocessing.cpu_count())
result = pool.apply_async(get_html, args=(3,)) # 这里的3是给get_html的参数设置为3秒
# 等待所有任务完成
pool.close() #要先把进程池关闭,否则会抛异常
pool.join()
print(result.get())
输出结果
sub_progress sucess
3
3. imap,对应线程中的map,按列表中的时间顺序输出
import multiprocessing
import time
def get_html(n):
time.sleep(n)
print("sub_progress sucess")
return n
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for result in pool.imap(get_html, [1, 5, 3]): #result为get_html的返回值
print("{} success".format(result))
输出结果
sub_progress sucess
1 success
sub_progress sucess
sub_progress sucess
5 success
3 success
4. imap_unordered方法,按执行时间先后顺序输出
import multiprocessing
import time
def get_html(n):
time.sleep(n)
print("sub_progress sucess")
return n
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for result in pool.imap_unordered(get_html, [1, 5, 3]):
print("{} sleep success".format(result))
输出结果
sub_progress sucess
1 sleep success
sub_progress sucess
3 sleep success
sub_progress sucess
5 sleep success
六. 进程间的通信:Queue,Pipe, Manager
1. 使用multiprocessing中的Queue进行通信
import time
from multiprocessing import Process, Queue
def producer(queue):
queue.put("a")
time.sleep(2)
def consumer(queue):
time.sleep(2) #需等待producer执行完再拿数据
data = queue.get()
print(data)
if __name__ == "__main__":
queue = Queue(5)
my_producer = Process(target=producer, args=(queue,))
my_consumer = Process(target=consumer, args=(queue,))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
输出a,说明进程my_producer和进程my_consumer间通信成功
2. 进程池中的进程间通信需要使用Manager实例化中的Queue
import time
from multiprocessing import Pool, Manager
def producer(queue):
queue.put("a")
time.sleep(2)
def consumer(queue):
time.sleep(2)
data = queue.get()
print(data)
if __name__ == "__main__":
queue = Manager().Queue(5) # 使用Manage实例化后的Queue
pool = Pool(2)
pool.apply_async(producer, args=(queue,))
pool.apply_async(consumer, args=(queue,))
pool.close()
pool.join()
输出同样是a,表示通信成功
3. 使用pipe实现进程间通信
from multiprocessing import Process, Pipe
def producer(pipe):
pipe.send("a")
def consumer(pipe):
print(pipe.recv())
if __name__ == "__main__":
recevie_pipe, send_pipe = Pipe()
# pipe只能用于2个进程间的通信
my_producer = Process(target=producer, args=(send_pipe,))
my_consumer = Process(target=consumer, args=(recevie_pipe,))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
输出同样是a
注意:pipe只能用于2个进程间的通信,pipe的性能是高于queue的
4. 进程间使用共享内存
from multiprocessing import Manager, Process
def add_data(p_dict, key, value):
p_dict[key] = value
if __name__ == "__main__":
progress_dict = Manager().dict()
first_progress = Process(target=add_data, args=(progress_dict, "jack", 22))
second_progress = Process(target=add_data, args=(progress_dict, "hong", 34))
first_progress.start()
second_progress.start()
first_progress.join()
second_progress.join()
print(progress_dict)
输出结果
{'jack': 22, 'hong': 34}
说明:
1)结果显示2个进程间的数据合并了,都写入了主进程中的同一个内存中
2)本例子是用dict来做说明,其实Manager()里还有list, tuple等数据结构都可以使用