队列
队列遵从先进先出,即存入队列的值按照先后顺序进行排列,取值时优先取出先存入的值,取出后即从队列中删除该值:
import queue q = queue.Queue(10) print(bool(q)) q.put(11) q.put(22) q.put(33) print(q.qsize()) print(q.get()) print(q.qsize()) 执行结果: True 3 11 2
队列中使用put存放数据,get取数据,二者皆可设置超时事件,get可以设置阻塞状态,默认为阻塞。
import queue q = queue.Queue(10) print(bool(q)) q.put(11) q.put(22) q.put(33,block = False) q.put(44,block = False,timeout=2) print(q.qsize()) print(q.get()) print(q.get()) q.put(44,block = False,timeout=2) print(q.get(timeout=2)) print(q.get()) 执行结果: True 4 11 22 33 44
q = queue.Queue(5) q.put(123) # q.put(456) print(q.get()) # q.task_done() print(q.get(timeout=1)) # q.task_done() # q.join() 执行结果(报出异常): raise Empty queue.Empty
队列也可执行先进后出的场景,通过调用queue.LifoQueue可以实现。
import queue q = queue.LifoQueue() q.put(11) q.put(22) print(q.get()) 执行结果: 22
队列同样可以设置优先级,即权重,按照权重的级别,优先取出权重高的值:
import queue q = queue.PriorityQueue() q.put((1,'11')) q.put((2,'21')) q.put((1,'12')) q.put((2,'22')) q.put((1,'10')) for i in range(q.qsize()): print(q.get()) 执行结果: (1, '10') (1, '11') (1, '12') (2, '21') (2, '22')
有双向队列,即你可以选择从左或右存取队列中的值:
import queue q = queue.deque() q.append(123) q.append(456) q.appendleft(789) q.appendleft(101) print(q.pop()) # print(q.popleft()) print(q.pop()) print(q.pop()) print(q.pop()) 执行结果: 456 123 789 101
锁
python线程中有个锁的概念,这里的锁,是指如果被锁的代码块被调用时,会查看是否被锁,如果是,则会阻塞,等待之前的线程释放改代码块,则继续:
import threading, time NUM = 10 def f1(l, lr): global NUM l.acquire() NUM -= 1 time.sleep(1) print(NUM) l.release() lock = threading.Lock() lockr = threading.RLock() for i in range(10): t = threading.Thread(target=f1, args=(lock, lockr)) t.start() 执行结果: 9 hello world! 8 7 6 5 4 3 2 1 0
如果一个线程调用后,另外的线程不能调用该模块,这个应用场景就会变得很狭隘,python提供多个(自定义)线程锁的东西,就是一次最多只能执行那么多,如果超了就会阻塞。
import threading, time NUM = 10 def f1(l): global NUM l.acquire() NUM -= 1 time.sleep(2) print(NUM) l.release() # lock = threading.Lock() # lockr = threading.RLock() locks = threading.BoundedSemaphore(2)#最多俩 for i in range(10): t = threading.Thread(target=f1, args=(locks,)) t.start() 执行结果: hello world! 8 7 6 6 4 3 2 1 0 0
Event可以设置阻塞,将所有线程都拦截下来,然后满足某个条件,所有线程阻塞都 放开。满足统一解锁的需求
import threading, time def f1(ar ,e): print(ar) e.wait() print(ar+100) ev = threading.Event() for i in range(10): t = threading.Thread(target=f1, args=(i,ev)) t.start() ev.clear()#默认值,在此状态下代码处于阻塞 inp = input(">>>") if inp == '1': ev.set()
同样可以设置单线程满足某个条件就会执行:
import threading def func(i,con): print(i) con.acquire() con.wait() print(i+100) con.release() c = threading.Condition() for i in range(10): t = threading.Thread(target=func, args=(i, c,)) t.start() while True: inp = input('>>>>') if inp == 'q': break c.acquire() c.notify(int(inp)) c.release()
线程和进程和协程
一个进程中可以有多个线程,线程依赖于进程的存在,进程运行中是线装并发的,可能会同时执行多个操作,这里就会用到多线程。线程基本概念不多说,多线程之前也有提到,但是进程运行时,并不是说想要开多少个线程就开多少的,这里会涉及到系统资源的损耗,所以会有一个线程池的概念。线程池也是多线程,但是线程池会实现一个资源回收,和任务请求等待,和线程超时。。。话不多讲,上图
提前定义好,有多少个线程,如果有任务请求过来,则去调用空闲线程,任务执行结束,释放线程,继续执行任务队列中的请求。这样能够更有效的利用系统资源。下面是个阉割版的线程池。
#!/user/bin/env python # -*- coding:utf-8 -*- # Author:hft import threading, time, queue q = queue.Queue(20) def pro(arg): while True: q.put(str(arg)+' - 包子') time.sleep(1) def summer(arg): for i in range(2): print(str(arg) + '-' + str(i) + '_' + q.get()) time.sleep(2) for i in range(3): t = threading.Thread(target=pro, args=(i,)) t.start() # time.sleep(1) for j in range(20): t = threading.Thread(target=summer, args=(j,)) t.start() # time.sleep(1)
在windows中进程只能写在main函数中,在linux和mac中可以写在其他位置
from multiprocessing import Process,queues import multiprocessing def foo(i, arg): arg.put(i) print('say hi', i , arg.qsize()) if __name__ == '__main__': li = queues.Queue(20, ctx = multiprocessing) for i in range(10): p = Process(target=foo, args=(i,li,)) # p.daemon = True p.start() # p.join()
通过调用multiprocessing 模块中的Process类,来实例化进程对象,start则是进程开始。daemon是否添加守护进程,默认为False,因为守护进程是脱离了终端的,所以所有的stdout,stdin,stderr是不会输出到终端的,所以指定了stdout,stderr输出到日志文件。join方法的作用是阻塞主进程(挡住,无法执行join以后的语句),专注执行多线程。
下面模拟多进程场景
from multiprocessing import Process, queues, Array import multiprocessing def foo(i, arg): arg[i] = i + 100 for item in arg: print(item, end=" ") print() if __name__ == '__main__': li = Array('i', 10) for i in range(10): p = Process(target=foo, args=(i,li,)) p.start() 执行结果: 0 0 0 0 0 0 106 0 0 0 0 0 0 0 0 0 106 107 0 0 0 0 102 0 0 0 106 107 0 0 0 0 102 103 0 0 106 107 0 0 0 0 102 103 0 0 106 107 108 0 0 101 102 103 0 0 106 107 108 0 100 101 102 103 0 0 106 107 108 0 100 101 102 103 104 0 106 107 108 0 100 101 102 103 104 105 106 107 108 0 100 101 102 103 104 105 106 107 108 109
说明进程也是可以同步实现多进程的。以下还有几种进程共享的例子,分别是通过数组和字典实现的进程存储
# from multiprocessing import queues, Manager # import multiprocessing # from multiprocessing.dummy import Process # # # def foo(i, arg): # arg[i] = i+100 # print(arg.values()) # # if __name__ == '__main__': # obj = Manager() # li = obj.dict() # for i in range(10): # p = Process(target=foo, args = (i,li,)) # p.start() # # import time # # time.sleep(0.1) # from multiprocessing.dummy import Process,Pool # import time # def f1(arg): # time.sleep(10) # print(arg) # # if __name__ == '__main__': # pool = Pool(5) # for i in range(30): # pool.apply_async(func=f1, args=(i,)) # # pool.close() # time.sleep(1) # pool.terminate() # pool.join()
关于协程,网上搜了很多,都不能概括其真实作用何在,以下摘自百度
下面是两个协程的例子:
from greenlet import greenlet def test1(): print("test1",1) gr2.switch() print("test1",2) gr2.switch() def test2(): print('test2',1) gr1.switch() print('test2', 2) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() # from gevent import monkey; monkey.patch_all() # import gevent # import requests # def f(url): # print('GET:%s'%url) # resp = requests.get(url) # data = resp.text # print('%d bytes received from %s.'%(len(data),url)) # gevent.joinall([ # gevent.spawn(f,'https://www.python.org/'), # gevent.spawn(f,'https://www.yahoo.com/'), # gevent.spawn(f,'https://www.github.com/') # ])