1.线程queue :会有锁
q=queue.Queue(3)
q.get()
q.put()
先进先出 队列
后进先出 堆栈
优先级队列
1 """先进先出 队列"""
2 import queue
3 q=queue.Queue(3) #先进先出->队列
4
5 q.put('first')
6 q.put(2)
7 # q.put('third')
8 # q.put(4)
9 q.put(4,block=False) #q.put_nowait(4)
10 # q.put_nowait(4)
11 # q.put(4,block=True) # True 阻塞 False 不阻塞 直接告诉你 队列满了
12 # q.put(4,block=True,timeout=3) # 阻塞等待3秒 还没有拿走数据就抛异常
13 #
14 print(q.get())
15 print(q.get())
16 print(q.get())
17 print(q.get(block=True,timeout=2)) # false 不阻塞没有数据就抛异常 默认是阻塞 block=True
18 print(q.get_nowait()) # 相当于block=false
19 # def get(self, block=True, timeout=None):
20
21
22 """后进先出 堆栈"""
23 import queue
24 q=queue.LifoQueue(3) #后进先出->堆栈
25 q.put('first')
26 q.put(2)
27 q.put('third')
28
29 print(q.get())
30 print(q.get())
31 print(q.get())
32
33 """优先级队列 """
34 import queue
35 q=queue.PriorityQueue(3) #优先级队列
36
37 q.put((10,{'alice':12})) # 数字越小 优先级越高 优先拿出来
38 q.put((40,'two'))
39 q.put((30,'three'))
40
41 print(q.get())
42 print(q.get())
43 print(q.get())
2.线程池进程池:
client server 是IO 操作应该用多线程
计算密集型: 用多进程
io密集型:用多线程
池:对数目加以限制,保证机器正常运行
1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
2 import os,time,random
3
4 def task(name):
5 print('name:%s pid:%s run' %(name,os.getpid()))
6 time.sleep(random.randint(1,3))
7
8
9 if __name__ == '__main__':
10 pool=ProcessPoolExecutor(4) # 不指定 默认是cpu的核数
11 # pool=ThreadPoolExecutor(5)
12
13 for i in range(10):
14 pool.submit(task,'egon%s' %i) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行
15
16 pool.shutdown(wait=True) # 类似join 代表往池子里面丢任务的入口封死了 计数器-1
17
18
19 print('主')
20 """
21 主 # # 异步调用池子收了10个任务,但同一时间只有4个任务在进行
22 name:egon0 pid:60056 run # 只有4个pid
23 name:egon1 pid:64700 run
24 name:egon2 pid:59940 run
25 name:egon3 pid:60888 run
26
27 name:egon4 pid:60888 run
28
29 name:egon5 pid:60056 run
30 name:egon6 pid:60888 run
31
32 name:egon7 pid:60056 run
33 name:egon8 pid:64700 run
34 name:egon9 pid:59940 run
35 """
36 # pool.shutdown(wait=True) # 代表往池子里面丢任务的入口封死了 计数器-1
37 """
38 name:egon0 pid:57124 run
39 name:egon1 pid:62252 run
40 name:egon2 pid:55736 run
41 name:egon3 pid:62060 run
42 name:egon4 pid:57124 run
43 name:egon5 pid:62252 run
44 name:egon6 pid:55736 run
45 name:egon7 pid:55736 run
46 name:egon8 pid:62060 run
47 name:egon9 pid:55736 run
48 主
49 """
50
51 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
52 from threading import currentThread
53 import os,time,random
54
55 def task():
56 print('name:%s pid:%s run' %(currentThread().getName(),os.getpid()))
57 time.sleep(random.randint(1,3))
58
59
60 if __name__ == '__main__':
61 pool=ThreadPoolExecutor(5)
62
63 for i in range(10):
64 pool.submit(task)
65
66 pool.shutdown(wait=True)
67
68
69 print('主')
70 """
71 name:ThreadPoolExecutor-0_0 pid:61508 run
72 name:ThreadPoolExecutor-0_1 pid:61508 run
73 name:ThreadPoolExecutor-0_2 pid:61508 run
74 name:ThreadPoolExecutor-0_3 pid:61508 run
75 name:ThreadPoolExecutor-0_4 pid:61508 run
76 name:ThreadPoolExecutor-0_2 pid:61508 run
77 name:ThreadPoolExecutor-0_4 pid:61508 run
78 name:ThreadPoolExecutor-0_0 pid:61508 run
79 name:ThreadPoolExecutor-0_3 pid:61508 run
80 name:ThreadPoolExecutor-0_1 pid:61508 run
81 主
82 """
3.异步调用与回调机制:
提交任务的两种方式:
同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行,效率低
异步调用:提交完任务后,不等待任务执行完毕。异步调用+回调机制 自动触发叫回调
1 """同步调用"""
2 from concurrent.futures import ThreadPoolExecutor
3 import time
4 import random
5
6 def la(name):
7 print('%s is laing' %name)
8 time.sleep(random.randint(3,5))
9 res=random.randint(7,13)*'#'
10 return {'name':name,'res':res}
11
12 def weigh(shit):
13 name=shit['name']
14 size=len(shit['res'])
15 print('%s 拉了 《%s》kg' %(name,size))
16
17
18 if __name__ == '__main__':
19 pool=ThreadPoolExecutor(13)
20
21 shit1=pool.submit(la,'alex').result()
22 weigh(shit1)
23
24 shit2=pool.submit(la,'wupeiqi').result()
25 weigh(shit2)
26
27 shit3=pool.submit(la,'yuanhao').result()
28 weigh(shit3)
29
30
31 """异步调用 + 回调机制 自动触发叫回调"""
32 from concurrent.futures import ThreadPoolExecutor
33 import time
34 import random
35
36 def la(name):
37 print('%s is laing' %name)
38 time.sleep(random.randint(3,5))
39 res=random.randint(7,13)*'#'
40 return {'name':name,'res':res}
41 # weigh({'name':name,'res':res}) # 这样写不好 所有功能 写在一起了
42
43
44 def weigh(shit):
45 shit=shit.result() # 拿到是 对象 需要result()
46 name=shit['name']
47 size=len(shit['res'])
48 print('%s 拉了 《%s》kg' %(name,size))
49
50
51 if __name__ == '__main__':
52 pool=ThreadPoolExecutor(13)
53
54 # pool.submit(la, 'alex')
55 # pool.submit(la, 'wupeiqi')
56 # pool.submit(la, 'yuanhao')
57
58 pool.submit(la,'alex').add_done_callback(weigh) # 实现了程序的解耦合
59 pool.submit(la,'wupeiqi').add_done_callback(weigh)
60 pool.submit(la,'yuanhao').add_done_callback(weigh)
4.异步调用与回调机制应用:
pip3 install requests
requests
异步调用+回调机制的 应用场景:
1 from concurrent.futures import ThreadPoolExecutor
2 import requests
3 import time
4
5 def get(url): # io操作 基于线程 数目有限 用线程池
6 print('GET %s' %url)
7 response=requests.get(url)
8 time.sleep(3)
9 return {'url':url,'content':response.text}
10
11
12 def parse(res):
13 res=res.result()
14 print('%s parse res is %s' %(res['url'],len(res['content'])))
15
16
17 if __name__ == '__main__':
18 urls=[
19 'http://www.cnblogs.com/linhaifeng',
20 'https://www.python.org',
21 'https://www.openstack.org',
22 ]
23
24 pool=ThreadPoolExecutor(2)
25
26 for url in urls:
27 pool.submit(get,url).add_done_callback(parse)