简简单单的了解一下:
进程就是一个程序在一个数据集上的一次动态执行过程。也就是程序运行的过程。
进程是资源管理单位,管理线程的就是进程。
进程一般由程序、数据集、进程控制块三部分组成:
我们编写的程序是用来描述进程要完成那些功能以及如何完成;
数据集则是程序在执行过程中所需要使用的资源;
进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以通过他来控制和管理进程,他是系统感知进程存在的唯一标志。
线程是进程中执行运算的最小单位,是进程中的一个实体,是被系统独立调度和分派的基本单位。
线程自己不拥有系统资源,只拥有一点在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所拥有的全部资源。
一个线程可以创建和撤销,同一进程中的多个线程之间可以并发执行。
进程与线程的关系:
一个线程只能属于一个进程,而一个进程可以有多个线程,且至少有一个线程。
资源分配给进程,同一进程的所有线程共享该进程的所有资源。
cpu分给线程,即真正在cop上运行的是线程。
python中的GIL(全局解释器锁):
全局解释器锁(GIL)每个线程在执行的过程都需要先获取GIL,保证同一时刻只有一个线程可以执行代码
在IO操作可能会引起阻塞的system call之前,可以暂时释放GIL,但在执行完毕后必须重新获取GIL,Python3X使用计时器(执行时间达到阀值后,当前线程释放GIL)或Python2X,tickets计数达到100
在执行CPU密集型数据时,需要持续使用CPU的任务时,单线程会比多线程快
在执行IO密集型数据时可能引起阻塞的任务多线程会比单线程快
嗨嗨皮皮的搞一搞:
线程的调用:
直接调用:
1 import threading,time 2 3 def Output(num): 4 print("Output the number:%s"%num) 5 time.sleep(3) 6 7 if __name__ == '__main__': 8 """ 9 使用threading.Thread方法可以创建一个线程 10 target属性是用来指定执行的动作 11 args属性是用来以元组的方式给执行动作传递参数 12 """ 13 t1 = threading.Thread(target=Output,args=(1,)) # 创建一个线程实例 14 t2 = threading.Thread(target=Output,args=(2,)) # 创建另一个线程实例 15 16 t1.start() # 启动线程 17 t2.start() # 启动另一个线程 18 19 print(t1.getName()) # 获取线程名 20 print(t2.getName())
》》》Output the number:1
Output the number:2
Thread-1
Thread-2
继承式调用:
1 import threading,time 2 3 class MyThread(threading.Thread): 4 def __init__(self,num): 5 threading.Thread.__init__(self) 6 self.num = num 7 8 def run(self): 9 print("Output the number:%s"%self.num) 10 time.sleep(3) 11 12 if __name__ == '__main__': 13 t1 = MyThread(11) 14 t2 = MyThread(22) 15 t1.start() 16 t2.start()
》》》Output the number:11
Output the number:22
thread 模块提供的方法:
1 import threading 2 from time import ctime,sleep 3 4 def music(func): 5 for i in range(2): 6 print("begin listening to %s [%s]"%(func,ctime())) 7 sleep(2) 8 print("end listening [%s]"%ctime()) 9 10 def move(func): 11 for i in range(2): 12 print("Begin watching at the %s [%s]"%(func,ctime())) 13 sleep(5) 14 print("end watching [%s]"%ctime()) 15 16 threads = [] 17 t1 = threading.Thread(target=music,args=('小苹果',)) # 子线程 18 threads.append(t1) 19 t2 = threading.Thread(target=move,args=('还珠格格',)) # 子线程 20 threads.append(t2) 21 22 if __name__ == '__main__': 23 for i in threads: 24 # i.setDaemon(True) # 守护线程 25 i.start() 26 # i.join() # 在子线程完成运行前,这个子线程的父线程将一直被阻塞 27 28 print("all over [%s]"%ctime()) # 父线程
》》》begin listening to 小苹果 [Sat Oct 13 12:44:40 2018]
Begin watching at the 还珠格格 [Sat Oct 13 12:44:40 2018]
all over [Sat Oct 13 12:44:40 2018]
end listening [Sat Oct 13 12:44:42 2018]
begin listening to 小苹果 [Sat Oct 13 12:44:42 2018]
end listening [Sat Oct 13 12:44:44 2018]
end watching [Sat Oct 13 12:44:45 2018]
Begin watching at the 还珠格格 [Sat Oct 13 12:44:45 2018]
end watching [Sat Oct 13 12:44:50 2018]
join()方法:等待至线程中止。
1 import threading 2 from time import ctime,sleep 3 4 def music(func): 5 for i in range(2): 6 print("begin listening to %s [%s]"%(func,ctime())) 7 sleep(2) 8 print("end listening [%s]"%ctime()) 9 10 def move(func): 11 for i in range(2): 12 print("Begin watching at the %s [%s]"%(func,ctime())) 13 sleep(5) 14 print("end watching [%s]"%ctime()) 15 16 threads = [] 17 t1 = threading.Thread(target=music,args=('小苹果',)) # 子线程 18 threads.append(t1) 19 t2 = threading.Thread(target=move,args=('还珠格格',)) # 子线程 20 threads.append(t2) 21 22 if __name__ == '__main__': 23 for i in threads: 24 # i.setDaemon(True) # 守护线程 25 i.start() 26 i.join() # 在子线程完成运行前,这个子线程的父线程将一直被阻塞 27 28 print("all over [%s]"%ctime()) # 父线程
》》》begin listening to 小苹果 [Sat Oct 13 12:45:46 2018]
end listening [Sat Oct 13 12:45:48 2018]
begin listening to 小苹果 [Sat Oct 13 12:45:48 2018]
end listening [Sat Oct 13 12:45:50 2018]
Begin watching at the 还珠格格 [Sat Oct 13 12:45:50 2018]
end watching [Sat Oct 13 12:45:55 2018]
Begin watching at the 还珠格格 [Sat Oct 13 12:45:55 2018]
end watching [Sat Oct 13 12:46:00 2018]
all over [Sat Oct 13 12:46:00 2018]
setDaemon(True)方法:守护线程
1 ''' 2 将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。 3 这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程, 4 主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。 5 如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了, 6 不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦 7 ''' 8 import threading 9 from time import ctime,sleep 10 11 def music(func): 12 for i in range(2): 13 print("begin listening to %s [%s]"%(func,ctime())) 14 sleep(2) 15 print("end listening [%s]"%ctime()) 16 17 def move(func): 18 for i in range(2): 19 print("Begin watching at the %s [%s]"%(func,ctime())) 20 sleep(5) 21 print("end watching [%s]"%ctime()) 22 23 threads = [] 24 t1 = threading.Thread(target=music,args=('小苹果',)) # 子线程 25 threads.append(t1) 26 t2 = threading.Thread(target=move,args=('还珠格格',)) # 子线程 27 threads.append(t2) 28 29 if __name__ == '__main__': 30 for i in threads: 31 i.setDaemon(True) # 守护线程 32 i.start() 33 # i.join() # 在子线程完成运行前,这个子线程的父线程将一直被阻塞 34 35 print("all over [%s]"%ctime()) # 父线程
》》》begin listening to 小苹果 [Sat Oct 13 12:49:17 2018]
Begin watching at the 还珠格格 [Sat Oct 13 12:49:17 2018]
all over [Sat Oct 13 12:49:17 2018]
1 threading.currentThread(): 返回当前的线程变量。 2 threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 3 threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。 4 5 除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法: 6 run(): 用以表示线程活动的方法。 7 start():启动线程活动。 8 join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。 9 isAlive(): 返回线程是否活动的。 10 getName(): 返回线程名。 11 setName(): 设置线程名。
同步锁:
1 import threading,time 2 3 def AddNum(): 4 global num 5 6 num-=1 7 # temp = num 8 # time.sleep(0.001) 9 # num=temp-1 10 11 12 num = 100 #设定一个共享变量 13 tread_list=[] 14 for i in range(100): 15 t = threading.Thread(target=AddNum) 16 t.start() 17 tread_list.append(t) 18 19 for t in tread_list: # 等待所有线程执行完毕后再数据最后的结果 20 t.join() 21 22 print("final num:%s"%num)
》》》final num:0
1 # Author : Adair 2 # -*- coding:utf-8 -*- 3 4 import threading,time 5 6 def AddNum(): 7 global num 8 9 # num-=1 10 temp = num 11 time.sleep(0.001) 12 num=temp-1 13 14 15 num = 100 #设定一个共享变量 16 tread_list=[] 17 for i in range(100): 18 t = threading.Thread(target=AddNum) 19 t.start() 20 tread_list.append(t) 21 22 for t in tread_list: # 等待所有线程执行完毕后再数据最后的结果 23 t.join() 24 25 print("final num:%s"%num)
》》》final num:91
为什么num-=1时结果没问题,而用sleep将计算过程分开结果就会出现问题? 1、因为num-=时动作太快,全局解释器锁(GIL)还没到线程切换的时间,这一个计算过程是在同一次计算过程中完成。 2、sleep明显时间超过了线程的切换时间,100个线程每一个一定都没执行完就进行了切换,sleep相当于IO阻塞,在阻塞时间内不会被切换回来,所以前90多次中线程每次取的num值都是100,因为前一个线程根本就没计算完
遇到这样的问题,我们可以使用同步锁的机制来锁定每次计算过程。当然也可以使用join方法,但join会把整个线程给停住,会让多线程失去意义。
同步锁:
1 import threading,time 2 3 def AddNum(): 4 global num 5 6 lock.acquire() # 获取一个锁 7 temp = num 8 time.sleep(1) 9 num=temp-1 10 lock.release() # 释放这个锁 11 12 13 num = 100 #设定一个共享变量 14 tread_list=[] 15 lock=threading.Lock() # 调用进程锁的方法 16 17 for i in range(100): 18 t = threading.Thread(target=AddNum) 19 t.start() 20 tread_list.append(t) 21 22 for t in tread_list: # 等待所有线程执行完毕后再数据最后的结果 23 t.join() 24 25 print("final num:%s"%num)
》》》final num:0
同步锁与GIL的关系? Python的线程在GIL的控制之下,线程之间,对整个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制———用户级互斥。内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。 GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。 但是如果你有个操作比如 x += 1,这个操作需要多个bytecodes操作,在执行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。 那我的同步锁也是保证同一时刻只有一个线程被执行,是不是没有GIL也可以?是的;那要GIL有什么鸟用?你没治;
线程的死锁与递归锁:
在线程之间如果存在多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都在正在使用,所以这两个线程在无外力作用下将一直等待下去:
1 import threading, time 2 3 4 class MyThread(threading.Thread): 5 def doA(self): 6 lockA.acquire() # 获取第一把锁 7 print(self.name, "doA--1", time.ctime()) 8 time.sleep(3) 9 lockB.acquire() # 获取第二把锁 10 print(self.name, "doA--2", time.ctime()) 11 lockB.release() # 释放第二把锁 12 lockA.release() # 释放第一把锁 13 14 def doB(self): 15 lockB.acquire() # 获取第二把锁 16 print(self.name, "doB--1", time.ctime()) 17 time.sleep(2) 18 lockA.acquire() # 获取第一把锁 19 print(self.name, "doB--2", time.ctime()) 20 lockA.release() # 释放第一把锁 21 lockB.release() # 释放第二把锁 22 23 def run(self): 24 self.doA() 25 self.doB() 26 27 28 if __name__ == "__main__": 29 lockA = threading.Lock() # 创建第一个锁 30 lockB = threading.Lock() # 创建第二个锁 31 32 threads = [] 33 for i in range(5): # 创建5个线程 34 threads.append(MyThread()) 35 36 for t in threads: 37 t.start() # 启动线程 38 39 for t in threads: 40 t.join() # 等待线程结束
》》》Thread-1 doA--1 Tue Oct 23 23:27:46 2018
Thread-1 doA--2 Tue Oct 23 23:27:49 2018
Thread-1 doB--1 Tue Oct 23 23:27:49 2018
Thread-2 doA--1 Tue Oct 23 23:27:49 2018
....(出现死锁现象,程序一直等待)
为了支持在同一线程中多次请求同一资源,python提供了“可重用锁”也叫“递归锁”(threading.Rlock)。Rlock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有 acquire都被release,其它的线程才能获得资源。
1 import threading, time 2 3 4 class MyThread(threading.Thread): 5 def doA(self): 6 lock.acquire() # 获取第一把重用锁 7 print(self.name, "doA--1", time.ctime()) 8 time.sleep(3) 9 lock.acquire() # 获取第一把重用锁 10 print(self.name, "doA--2", time.ctime()) 11 lock.release() # 释放一把重用锁 12 lock.release() # 释放一把重用锁 13 14 def doB(self): 15 lock.acquire() # 获取第一把重用锁 16 print(self.name, "doB--1", time.ctime()) 17 time.sleep(2) 18 lock.acquire() # 获取第一把重用锁 19 print(self.name, "doB--2", time.ctime()) 20 lock.release() # 释放一把重用锁 21 lock.release() # 释放一把重用锁 22 23 def run(self): 24 self.doA() 25 self.doB() 26 27 28 if __name__ == "__main__": 29 # lockA = threading.Lock() # 创建第一个锁 30 # lockB = threading.Lock() # 创建第二个锁 31 lock=threading.RLock() # 创建重用锁/递归锁 32 33 threads = [] 34 for i in range(5): # 创建5个线程 35 threads.append(MyThread()) 36 37 for t in threads: 38 t.start() # 启动线程 39 40 for t in threads: 41 t.join() # 等待线程结束
信号量:
信号量是用来控制并发数的,信号量中管理这一个内置的计数器,没当调用acquire()时计数器上-1,调用release()时+1.
注意计数器不能小于 0,当计数器为0时,acquire()将阻塞线程至同步状态,直到其他线程调用release()
1 import threading,time 2 3 class MyThread(threading.Thread): 4 5 def __init__(self,num): 6 threading.Thread.__init__(self) 7 self.num = num 8 9 def run(self): 10 semaphore.acquire() 11 print(self.name,self.num) 12 time.sleep(2) 13 semaphore.release() 14 15 16 if __name__ == "__main__": 17 Threads = [] 18 semaphore=threading.BoundedSemaphore(3) 19 20 for t in range(10): 21 Threads.append(MyThread(t)) 22 for t in Threads: 23 t.start()
举个栗子:
信号量就好比停车场,而threading.BoundedSemaphore(3)就好比设置了停车场里只有3个车位,停车场一次最多只能停三辆车,其他车辆只能在外等待。当有一辆车离开停车场后等待的一辆车才可以进入停车场。依次循环。
信号量一般用于对数据库连接数的限制
条件变量同步:
python还提供了threading.Condition()对象用于条件变量线程的支持,设置线程满一定足条件后才能够继续执行。
创建条件变量同步锁:lock=threading.Condition() 括号中需要加载锁类型(Lock/Rlock),若括号中什么都不填则默认为Rlock()
方法:
wait() :条件不满足时调用,线程会释放锁并进入等待阻塞
notify():条件创造后调用,通知等待池激活一个线程
notifyAll():条件创造后调用,通知等待池激活所有线程
1 import threading,time,random 2 3 class producers(threading.Thread): 4 global ProductContainers 5 def run(self): 6 while True: 7 lock.acquire() 8 ProductNum = random.randint(0,10) 9 ProductContainers.append(ProductNum) 10 print("产品集中有:",ProductContainers) 11 lock.notify() 12 lock.release() 13 time.sleep(2) 14 15 class consumers(threading.Thread): 16 global ProductContainers 17 def run(self): 18 while True: 19 lock.acquire() 20 if len(ProductContainers) == 0: 21 lock.wait() 22 print("线程 %s 消费了产品 %d"%(self.name,ProductContainers[0])) 23 del ProductContainers[0] 24 lock.release() 25 time.sleep(1) 26 27 28 29 if __name__ == "__main__": 30 ProductContainers=[] 31 lock = threading.Condition() 32 threads=[] 33 34 35 for t in range(3): 36 threads.append(producers()) 37 for t in range(4): 38 threads.append(consumers()) 39 40 for t in threads: 41 t.start() 42 43 for t in threads: 44 t.join()
同步条件:
条件同步和条件变量同步差不多,只是少了锁的功能,因为条件同步设计与不访问共享资源的条件环境。
event=threading.Event() #条件环境对象,初始值为False event.isSet() # 返回event的状态值 event.wait() # 如果event.isSet()==False将线程阻塞 event.set() # 设置even的状态为True,所有阻塞池的线程激活进入就绪状态,等待操作系统的调度 event.clear() # 恢复event的状态值为False
1 import threading,time 2 3 class Boos(threading.Thread): 4 def run(self): 5 print("Boos:今天大家加班到22:00!",event.isSet()) 6 event.isSet() or event.set() 7 print("Boos",event.isSet()) 8 time.sleep(1) # 等待时线程进行切换 9 print("Boos:哪算了,下班吧!",event.isSet()) 10 event.isSet() or event.set() 11 12 class Worker(threading.Thread): 13 def run(self): 14 print("Worker",event.isSet()) 15 event.wait() 16 print("Worker:哎呦,卧槽!") 17 event.clear() 18 event.wait() 19 print("Worker:搜嘎!") 20 21 22 if __name__ == "__main__": 23 Treads=[] # 列表是有序的 24 event=threading.Event() 25 26 for w in range(5): 27 Treads.append(Worker()) 28 Treads.append(Boos()) 29 30 for t in Treads: 31 t.start()