进程与线程
- 进程:计算机程序只是存储在磁盘上可执行的二进制文件。只有把他们加载到内存中并被操作系统调用,才拥有生命周期。进程则是一个执行中的程序。每个进程都有自己的地址空间,内存,数据栈以及其他用于跟踪执行的辅助数据。操作系统管理其上所有进程的执行,并为这些进程合理地分配时间。进程可以通过派生新的进程来执行其他任务。各个进程之间不能直接通信。
- 线程:可以被称作轻量级进程,他们是在同一个进程下面执行的,并共享上下文。线程包括开始,执行顺序和结束三部分。它有一个指令指针,用于记录当前运行的上下文。当其他线程运行时,它可以被抢占和临时挂起,这种行为成为让步。 一个进程中的各个线程与主进程共享同一片内存空间,因此相对于独立的进程而言,线程间的信息共享和通信更加容易。
线程一般是以并发方式执行的,正是由于这种并发和数据共享机制,使得多任务之间的协作成为可能。但是,在单核cpu中,真正的并发是不可能的,所以多线程的执行实际上是这样的:每个线程运行一会,然后让步给其余的线程,在某个时间点只有一个线程执行。因为上下文的切换时间很快,因此对用户来说就是类似的多并发执行。python的并发就是采用的这种执行效果。
因为线程之间共享同一片数据空间,可能会存在两个或多个线程对同一个数据进行运算的情况,这样就会导致运算结果不一致的情况,这种情况称为竟态条件。可以使用锁来解决这样的问题。
python的全局解释器
python代码的执行是由python虚拟机进行控制的。python在设计时是这样考虑的,在主循环中同时只能有一个控制线程在执行,就像单核cpu系统中的多进程一样。内存中可以有很多程序,但是在任意给定的时刻只能有一个程序在运行。
对python虚拟机的访问是由全局解释器锁GIL控制的,这个锁用来保证同事只能有一个线程运行。在多线程环境中,python虚拟机将按照下面所述的方式执行。
- 设置GIL
- 切换进一个进程中执行
- 执行下面操作之一
- 指定数量的字节码指令
- 线程主动让出控制权
- 把线程设置回睡眠状态
- 解锁GIL
- 重复以上步骤
python2.x中虽然有thread和threading两个模块,但推荐使用threading模块,并且在python3.x中只有threading模块,因此这里只说明threading模块的用法。
在使用threading模块中thread类来启动多线程时,可以使用以下三种方法:
- 创建Thread实例,传给他一个函数。
- 创建thread实例,传给他一个可调用的类实例。
- 派生thread子类,并创建子类的实例。
经常使用的第一个或者第三个
第一种:
import threading from time import sleep, ctime loops = [4,2] def loop(nloop, nsec): print "start loop", nloop, "at:", ctime() sleep(nsec) print "loop", nloop, "done at:", ctime() def main(): print 'starting at:', ctime() threads = [] nloops = range(len(loops)) for i in nloops: t = threading.Thread(target=loop, args=(i,loops[i])) threads.append(t) for i in nloops: threads[i].start() # for i in nloops: # 注意有无join的区别 # threads[i].join() print "all Done at:" , ctime() if __name__ == "__main__": main()
上述代码中利用threading模块的Thread类开启了两个线程,然后把开启的线程放入列表中,循环列表开始执行每一个线程。其中使用了join()方法,join的作用是等待所有的子线程完成之后,再继续执行主函数。可以对join代码进行注释以测试join函数的作用。
第二种:
import threading from time import ctime, sleep loops = [4,2] class MyThread(object): def __init__(self, func, args, name = "",): self.name = name self.func = func self.args = args def __call__(self): self.func(*self.args) def loop(nloop, nsec): print "start loop", nloop, "at:", ctime() sleep(nsec) print "loop", nloop, "done at:", ctime() def main(): print("starting at: ", ctime()) threads = [] nloops = range(len(loops)) for i in nloops: t = threading.Thread(target=MyThread(loop, (i, loops[i]), loop.__name__)) # 创建Thread类实例,然后把要传入的函数及参数封装成一个类实例传入。注意创建的类需调用内置的__call__方法。 # 注意内置__call__方法怎么调用 # A = class(object): # def __call__(self): # print("call") # a = A() # a() # 调用内置call方法 threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join() print("all done!", ctime()) main()
注意:这里在实例化Thread对象时,同事实例化了自己创建的MyThread类,因此这里实际上完成了两次实例化。
注意:__call__函数的调用,在代码中已经给出调用的方法。
第三种:
import threading from time import ctime, sleep loops = [4,2] class MyThread(threading.Thread): def __init__(self, func, args, name = "",): super(MyThread,self).__init__() self.name = name self.func = func self.args = args def run(self): self.func(*self.args) def loop(nloop, nsec): print "start loop", nloop, "at:", ctime() sleep(nsec) print "loop", nloop, "done at:", ctime() def main(): print("starting at: ", ctime()) threads = [] nloops = range(len(loops)) for i in nloops: t = MyThread(loop, (i, loops[i]), loop.__name__) threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join() print("all done!", ctime()) main()
注意:
- 新创建的类继承了基类Thread,必须对构造函数进行重写。
- 必须重写run方法,run函数就是每一个线程都要运行的函数。
daemon设置守护线程
import time import threading def run(n): print('[%s]------running---- ' % n) time.sleep(2) print('--done--') def main(): for i in range(5): t = threading.Thread(target=run, args=[i, ]) t.start() t.join(1) print('starting thread', t.getName()) m = threading.Thread(target=main, args=[]) # m.setDaemon(True) # 将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务 m.start() m.join(timeout=2) print("---main thread done----")
解释:在没有设置守护线程时,主线程退出时,开辟的子线程仍然会执行;若设置为守护进程,则在子线程退出是,对应的子线程会直接退出。
没有设置守护线程结果如下:
设置守护线程执行结果如下:
单线程与多线程之间执行效率对比:
# *-* coding:utf-8 *-* # Auth: wangxz from MyThread import MyThread from time import ctime, sleep def fib(x): sleep(0.005) if x < 2: return 1 return (fib(x - 2) + fib(x - 1)) def fac(x): sleep(0.05) if x < 2: return 1 return (x * fac(x - 1)) def sum(x): sleep(0.05) if x < 2: return 1 return (x + sum(x - 1 )) func = [fib, fac, sum] n = 12 def main(): nfunc = range(len(func)) print("single thread".center(50,"-")) for i in nfunc: print("The %s starting at %s" % (func[i].__name__, ctime())) print(func[i](n)) print("The %s stop at %s" % (func[i].__name__, ctime())) print("Multiple threads".center(50,"-")) threads = [] for i in nfunc: t = MyThread(func[i],(n,),func[i].__name__) threads.append(t) for i in nfunc: threads[i].start() for i in nfunc: threads[i].join() print(threads[i].getResult()) print("all Done!") main()
注意:因为执行太快因此在程序中加入的sleep函数,减慢执行的速度。
线程中的同步原语
在之前我们提到过多个线程之间,共享一段内存空间,因此当多个线程同时修改内存空间中的某个数值的时候,可能会出现不能达到预期效果的情况。针对这种情况,我们可以使用threading模块中的锁来控制线程的修改。
python的多线程适合用于IO密集型的任务当中,而不适合CPU密集型。其中IO操作不占用CPU,而计算占用CPU。
python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
多线程中锁的争用问题,https://www.cnblogs.com/lidagen/p/7237674.html
下面实例摘自:网络编程基础一书
#!/usr/bin/env python #*-* coding:utf-8 *-* import threading, time a = 50 b = 50 c = 50 d = 50 def printvars(): print "a = ", a print "b = ", b print "c = ", c print "d = ", d def threadcode(): global a, b, c, d a += 50 time.sleep(0.01) b = b + 50 c = 100 d = "Hello" print "[ChildThread] values of variabled in child thread:" printvars() print "[Mainchild] values of variables before child thread:" printvars() #create new thread t = threading.Thread(target = threadcode, name = "childThread") #This thread won't keep the program from terminating. t.setDaemon(1) #start the new thread t.start() #wait for the child thread to exit. t.join() print "[MainThread] values of variables after child thread:" printvars()
程序开始的时候定义了4个变量。他们的值都是50,显示这些值,然后建立一个线程,线程使用不同的方法改变每个变量,输出变量,然后终止。主线程接着在join()和再次打印出值后取得控制权。主线程没有修改这些值。
函数执行结果如下:
[Mainchild] values of variables before child thread:
a = 50
b = 50
c = 50
d = 50
[ChildThread] values of variabled in child thread:
a = 100
b = 100
c = 100
d = Hello
[MainThread] values of variables after child thread:
a = 100
b = 100
c = 100
d = Hello
分析:如果两个不同的线程同时给50加b,那么结果就会有以下两种情况。
- 150, 如果一个线程在另外一个线程之前运行,且都可以加50.
- 100,。如果两个线程同时执行计算。在这里,两个线程同时得到b=50的值,计算加50后的新值,并把新值写回b。在计算a+=50时,它的结果总是150.因为在整型上执行+=,被认为是原子的。系统会保证操作在其他任何线程开始之前结束。
为了防止这种争用的现象,引入了锁的概念。
python的threading模块提供了一个Lock对象。这个对象可以被用来同步访问代码。Lock对象有两个方法,acquire()和release()。acquire()方法负责取得一个锁,如果没有线程正持有锁,acquire()方法会立刻得到锁,否则,它需要等到锁被释放。
release()会释放一个锁,如果有其他的线程正等待这个锁(通过acquire),当release释放的时候,他们中的一个线程就会被唤醒,也就是说,某个线程中的acquire()将会被调用。
实例如下:
#!/usr/bin/env python #*-* coding:utf-8 *-* import threading, time #Initialize a aimple variable b = 50 #And a lock object l = threading.Lock() def threadcode(): """This is run in the created threads""" global b print "Thread %s invoked" % threading.currentThread().getName() # Acquire the lock (will not return until a lock is acquired) l.acquire() try: print "Thread %s running" % threading.currentThread().getName() time.sleep(1) b = b + 50 print "Thread %s set b to %d" % (threading.currentThread().getName(),b) finally: l.release() print "Value of b at start of program:", b childthreads = [] for i in xrange(1, 5): # create new thread t = threading.Thread(target = threadcode, name = "Thread-%d" % i) #This thread won't keep the program from terminating t.setDaemon(1) #Start the new thread t.start() childthreads.append(t) for t in childthreads: #wait for the child thread to exit: t.join() print "New value of b:", b -------------------------------------------------------------------------------- Value of b at start of program: 50 Thread Thread-1 invoked Thread Thread-1 running Thread Thread-2 invoked Thread Thread-3 invoked Thread Thread-4 invoked Thread Thread-1 set b to 100 Thread Thread-2 running Thread Thread-2 set b to 150 Thread Thread-3 running Thread Thread-3 set b to 200 Thread Thread-4 running Thread Thread-4 set b to 250 New value of b: 250
以上加的需要等到阻塞的锁,也可以称为互斥锁。
实例:之前看到过一片博文,但是把博文地址忘了,博文中的实例大概如下:
#!/usr/bin/env python import threading import time a = 1 def addNum(): global a b = a time.sleep(0.001) # 记得这里是用停顿,模拟大量的IO操作 a = b + 1 time.sleep(1) threads = [] for i in range(100): t = threading.Thread(target=addNum) t.start() threads.append(t) for t in threads: t.join() -----------执行--------------- [root@ct02 ~]# python one.py The end num is 9 [root@ct02 ~]# python one.py The end num is 11 [root@ct02 ~]# python one.py The end num is 10 [root@ct02 ~]# python one.py The end num is 11
然后加入锁之后:
cat one.py #!/usr/bin/env python import threading import time a = 1 def addNum(): global a lock.acquire() b = a time.sleep(0.001) a = b + 1 lock.release() time.sleep(1) threads = [] lock = threading.Lock() for i in range(100): t = threading.Thread(target=addNum) t.start() threads.append(t) for t in threads: t.join() print "The end num is %s" % a
死锁
可以参考博文:https://blog.csdn.net/u013210620/article/details/78723704
如果一个线程想不止一次访问某一共享资源,可能就会发生死锁现象。第一个acquire没有释放,第二次的acquire就继续请求。
#!/usr/bin/env python #*-* coding:utf-8 *-* import threading import time def foo(): lockA.acquire() print "foo获得A锁" lockB.acquire() print "foo获得B锁" lockB.release() lockA.release() def bar(): lockB.acquire() print "bar获得A锁" time.sleep(2) # 模拟io或者其他操作,第一个线程执行到这,在这个时候,lockA会被第二个进程占用 # 所以第一个进程无法进行后续操作,只能等待lockA锁的释放 lockA.acquire() print "bar获得B锁" lockB.release() lockA.release() def run(): foo() bar() lockA=threading.Lock() lockB=threading.Lock() for i in range(10): t=threading.Thread(target=run,args=()) t.start() 输出结果:只有四行,因为产生了死锁阻断了 foo获得A锁 foo获得B锁 bar获得A锁 foo获得A锁
第一个线程(系统中产生的第一个)获得A锁,获得B锁,因此会执行foo函数,然后释放得到的锁。接着执行bar函数,获得B锁(未释放),执行 print "bar获得A锁"语句,然后进入等待;第二个线程执行foo函数,获得A锁,但获得B锁时候需要等待。
#!/usr/bin/env python #coding:utf-8 from threading import Thread,Lock,RLock import time mutexA=mutexB=RLock() # 注意只能这样连等的生成锁对象 class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 拿到A锁' %self.name) mutexB.acquire() print('%s 拿到B锁' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 拿到B锁' % self.name) time.sleep(0.1) mutexA.acquire() print('%s 拿到A锁' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(5): t=MyThread() t.start()
信号量semaphore
信号量是python的一个内置计数器。
每当调用acquire()时,内置计数器-1
, 每当调用release()时,内置计数器+
1
,计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
如果不加入计数,代码如下:
#!/usr/bin/env python #coding:utf-8 import threading, time def hello(): print "Hello wolrd at %s" % time.ctime() time.sleep(2) for i in range(12): t = threading.Thread(target=hello) t.start() -------------执行----------------- Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018 Hello wolrd at Mon May 28 11:46:18 2018
加入计数,代码执行如下:
#!/usr/bin/env python #coding:utf-8 import threading, time sem = threading.BoundedSemaphore(3) # 加入计数 def hello(): sem.acquire() print "Hello wolrd at %s" % time.ctime() time.sleep(2) sem.release() for i in range(12): t = threading.Thread(target=hello) t.start() ---------------------执行----------------------- Hello wolrd at Mon May 28 11:48:12 2018 Hello wolrd at Mon May 28 11:48:12 2018 Hello wolrd at Mon May 28 11:48:12 2018 Hello wolrd at Mon May 28 11:48:14 2018 Hello wolrd at Mon May 28 11:48:14 2018 Hello wolrd at Mon May 28 11:48:14 2018 Hello wolrd at Mon May 28 11:48:16 2018 Hello wolrd at Mon May 28 11:48:16 2018 Hello wolrd at Mon May 28 11:48:16 2018 Hello wolrd at Mon May 28 11:48:18 2018 Hello wolrd at Mon May 28 11:48:18 2018 Hello wolrd at Mon May 28 11:48:18 2018 # 每次最多有三个线程执行,防止线程产生过多阻塞操作系统
timer锁
多久之后执行代码:
#!/usr/bin/env python #coding:utf-8 import threading, time def hello(): print "Hello wolrd at %s" % time.ctime() time.sleep(2) t = threading.Timer(5.0, hello) # 5s之后执行hello函数 print "before starting %s" % time.ctime() t.start() ----------------------执行---------------------- before starting Mon May 28 11:52:14 2018 Hello wolrd at Mon May 28 11:52:19 2018
上面所有的实例,都是各个线程独自运行。下面,我们考虑两个线程之间的交互。
Events
threading.Event机制类似于一个线程向其它多个线程发号施令的模式,其它线程都会持有一个threading.Event的对象,这些线程都会等待这个事件的“发生”,如果此事件一直不发生,那么这些线程将会阻塞,直至事件的“发生”。
- Python通过threading.Event()产生一个event对象。event对象维护一个内部标志(标志初始值为False),通过set()将其置为True。
- wait(timeout)则用于堵塞线程直至Flag被set(或者超时,可选的)。
- isSet()用于查询标志位是否为True。
- Clear()则用于清除标志位(使之为False)。
一个线程之间通信的实例:
import threading import time event = threading.Event() def lighter(): count = 0 while True: if count < 5: event.set() print("