并行:并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )
并发:并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。
同步:所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。
异步:所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。
阻塞非阻塞:阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的。
python的GIL:无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。这两个
Python根据处理事情的类型,是IO密集型还是计算密集型,选择不同的方式,进程,线程、协程,相互配合来用。但是对于计算密集型,。。。
一. 线程:
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。
线程的引入减小了程序并发执行时的开销,提高了操作系统的并发 性能。线程没有自己的系统资源。
<python的线程与threading模块>
直接调用:
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
if __name__ == '__main__':
t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
t1.start() #启动线程
t2.start() #启动另一个线程
print(t1.getName()) #获取线程名
print(t2.getName())
继承式调用:
import threading
import time
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num
def run(self):#定义每个线程要运行的函数
print("running on number:%s" %self.num)
time.sleep(3)
if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
print("ending......")
join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
setDaemon(True):
将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成 想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦
# run(): 线程被cpu调度后自动执行线程对象的run方法
# start():启动线程活动。
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。
threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
关于锁:
因为线程能直接操作到进程里的所有变量,如果存在多个线程同时操作同一个变量,存在竞争,最后变量是什么样的都不知道,所以在线程需要操作变量前,进行取锁竞争,
拿到锁的线程才能对变量操作,操作完后释放锁。意味着同时时刻只有一个线程运行那部分代码。
1. threading.Lock() 同步锁
import time
import threading
R=threading.Lock()
def addNum():
global num #在每个线程中都获取这个全局变量
#num-=1
R.acquire()
temp=num
#print('--get num:',num )
time.sleep(0.01)
num =temp-1 #对此公共变量进行-1操作
R.release()
num = 100 #设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有线程执行完毕
t.join()
print('final num:', num )
为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
2. threading.RLock() 递归锁
import threading
import time
class MyThread(threading.Thread):
def actionA(self):
r_lcok.acquire() #count=1
print(self.name,"gotA",time.ctime())
time.sleep(2)
r_lcok.acquire() #count=2
print(self.name, "gotB", time.ctime())
time.sleep(1)
r_lcok.release() #count=1
r_lcok.release() #count=0
def actionB(self):
r_lcok.acquire()
print(self.name, "gotB", time.ctime())
time.sleep(2)
r_lcok.acquire()
print(self.name, "gotA", time.ctime())
time.sleep(1)
r_lcok.release()
r_lcok.release()
def run(self):
self.actionA()
self.actionB()
if __name__ == '__main__':
# A=threading.Lock()
# B=threading.Lock()
r_lcok=threading.RLock()
L=[]
for i in range(5):
t=MyThread()
t.start()
L.append(t)
for i in L:
i.join()
print("ending....")
3. 关于 event事件
Python线程的 event 事件 可以看做是 事件驱动模型。当一个线程通过事件驱动模型发送了一个信号,另一个线程通过事件驱动模型获取了该信号,从而做出反应。
只是这个模型比较简单,发送的信号默认是 false ,要么是 true。
event = threading.Event()
event.is_set() event.isSet() # 获取标志位 是 true 还是 false
event.set() # 设置标志位 为 true
event.clear() # 初始化标志位 为 false
event.wait() # 阻塞等待标志位 为 true 才会继续运行
栗子:
from threading import Thread,Event
import time
event=Event()
def light():
print('红灯正亮着')
time.sleep(3)
event.set() #绿灯亮
def car(name):
print('车%s正在等绿灯' %name)
event.wait() #等灯绿 此时event为False,直到event.set()将其值设置为True,才会继续运行.
print('车%s通行' %name)
if __name__ == '__main__':
# 红绿灯
t1=Thread(target=light)
t1.start()
# 车
for i in range(10):
t=Thread(target=car,args=(i,))
t.start()
4. 信号量 —— multiprocess.Semaphore
信号量是比同步锁还能多设置的方式。同步锁只有一把,而信号量可以设置多把锁。信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
进程信号量
from multiprocessing import Process,Semaphore
import time,random
def go_ktv(sem,user):
sem.acquire()
print('%s 占到一间ktv小屋' %user)
time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同
sem.release()
if __name__ == '__main__':
sem=Semaphore(4)
p_l=[]
for i in range(13):
p=Process(target=go_ktv,args=(sem,'user%s' %i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
print('============》')
# coding: utf-8
import threading
import time
import random
semaphore = threading.Semaphore(0)
def consumer():
print("Consumer is waiting.")
semaphore.acquire()
print("Consumer notify: Consumed item number %s" %item)
def producer():
global item
time.sleep(10)
item = random.randint(0, 100)
print("Producer notify: Produced item number %s" %item)
semaphore.release()
if __name__ == "__main__":
for i in range(0, 5):
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
print("Program terminated")
5. 队列 queue-----线程利器, 利用队列,组合 生产者消费模型
创建一个“队列”对象
import queue # python 3 是 queue Python2 是 Queue
q = queue.Queue(maxsize = 10)
Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,
get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
Python Queue模块有三种队列及构造函数:
1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)
此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作
import threading
import time
import queue
class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self): # 定义每个线程要运行的函数
num = qq.get() # 队列里没有就会阻塞等待
numb = num -1
time.sleep(0.1)
num = numb
print("running on number:%s" % self.name,num)
qq.put(num)
if __name__ == '__main__':
qq= queue.Queue()
qq.put(100)
tl = []
for i in range(100):
th = MyThread(i)
tl.append(th)
for th in tl:
th.start()
for th in tl:
th.join()
print("ending......", qq.get())
import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count <10:
print("making........")
time.sleep(5)
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=1
#q.task_done()
q.join()
print("ok......")
def Consumer(name):
count = 0
while count <10:
time.sleep(random.randrange(4))
# if not q.empty():
# print("waiting.....")
#q.join()
data = q.get()
print("eating....")
time.sleep(4)
q.task_done()
#print(data)
print('