python线程
线程的创建
多线程类似于同时执行多个不同程序,多线程有以下优点:
- 使用线程可以把占据长时间的程序中的任务放到后台去处理
- 程序的运行速度可能加快
- 在一些IO密集型操作中,线程就比较有用。可以释放上些内存占用
python3中使用Threading模块提供线程相关的操作。
#!/usr/bin/env python3
import threading
import time
class Mythread(threading.Thread):
def __init__(self, func, arg):
threading.Thread.__init__(self)
self.func = func
self.arg = arg
def run(self):
self.func(self.arg)
def f1(x):
time.sleep(1)
print(x)
if __name__ == '__main__':
for i in range(10):
t = Mythread(f1, i)
t.start()
上面的代码创建了十个线程,然后提交给CPU,让CPU根据指定算法去调度执行。
- getName 获取线程名
- setNmae 设置线程名
- setDaemon 设置线程为前台还是后台,如果线程是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程执行完成后才退出程序。如果线台为后台,则主线程执行完毕后无论后台线程是否执行完毕,程序均会退出
- join 逐个执行每个线程,待执先完毕后主线程才继续往下执行
线程锁
互斥锁
由于线程之间是随机调度的,当多个线程同时修改同一条数据时可能会导致数据混乱,因此,出线了线程锁,这样同上时刻只允许一个线程操作。
#!/usr/bin/env python3
import threading
import time
li = [1, 2, 3]
def fun(x, y):
x.append(y,)
time.sleep(1)
print(x)
if __name__ == '__main__':
for i in range(4, 10):
t = threading.Thread(target=fun, args=(li, i,))
t.start()
以上代码中,先定义了一个列表li,并发产生了6个线程,每个线程往列表中添加一个数字并打印列表li,理论它的输出应该是这样的:
[1, 2, 3, 4]
[1, 2, 3, 4, 6]
[1, 2, 3, 4, 6, 5]
[1, 2, 3, 4, 6, 5, 8]
[1, 2, 3, 4, 6, 5, 8, 7]
[1, 2, 3, 4, 6, 5, 8, 7, 9]
实际上它的输出确是这样的:
[1, 2, 3, 4, 5, 6, 7, 8, 9]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
出现这种情况的原因就是同一时刻多个线程操作同一个数据造成的。下面我们给程序加上线程锁:
#!/usr/bin/env python3
import threading
import time
li = [1, 2, 3]
lock = threading.Lock() #创建锁
def fun(x, y):
lock.acquire() #锁定
x.append(y,)
time.sleep(1)
print(x)
lock.release() #释放
if __name__ == '__main__':
for i in range(4, 10):
t = threading.Thread(target=fun, args=(li, i,))
t.start()
输出结果如下:
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6, 7]
[1, 2, 3, 4, 5, 6, 7, 8]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
信号量(Semaphore)
互斥锁同时只允许一个线程更改数据,而信号量同时可允许一定数量的线程更改数据。
互斥锁用于线程的互斥,信号量用于线程的同步,这是互斥锁与信号量最根本的差别。
Semaphore管理了一个内置的计数器,每当调用acquire()时内置计数器 -1,调用 release()时内置计数器 +1,当计数器为 0 时,acquire()将阻塞线程直到其他线程调用 release().
实例:(每次只有5个线程可以同时执行)
#!/usr/bin/env python3
import threading
import time
num = 0
def f1(i):
global num
sem.acquire()
num += 1
time.sleep(1)
print(num,i)
sem.release()
if __name__ == '__main__':
sem = threading.BoundedSemaphore(5)
for i in range(20):
t = threading.Thread(target=f1,args=(i,))
t.start()
事件(event)
event是由线程设置的信号标志,如果信号标志为真,则其他线程等待直到信号结束。
- set() 设置信号,使用Event的set() 方法可以设置对象内部的信号标置为真
- clear() 清除信号,使用Event的 clear() 方法可以清除Event对象内部的信号标志
- wait() 等待,Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。
实例:
#!/usr/bin/env python3
import threading
import time
def f(i):
print('start')
event.wait()
print(i)
print('end')
event = threading.Event()
if __name__ == '__main__':
for i in range(5):
t = threading.Thread(target=f,args=(i,))
t.start()
event.clear()
inp = input('input your choose:')
if inp:
event.set()
Python进程
简单的创建进程
python的进程一般用 multiprocessing模块,他与threading模块很像,对多核CPU的利用率会比 threading好的多。
实例:(简单的创建进程)
#!/usr/bin/env python3
import multiprocessing
import time
def func(i):
time.sleep(2)
print(i)
if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=func,args=(i,))
p.start()
进程这间默认是无法共享数据的:
#!/usr/bin/env python3
import multiprocessing
import time
li = []
def func(i):
li.append(i)
time.sleep(2)
print(li)
if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=func,args=(i,),name='mywork')
p.start()
输出如下:
[0]
[3]
[1]
[9]
[7]
[6]
[8]
[5]
[2]
[4]
python数据共享的方法
Python中进程间共享数据,除了基本的queue,pipe和value+array外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口.Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
- Array 数组
#!/usr/bin/env python3
from multiprocessing import Array,Process
import time
arry = Array('i',[11,22,33,44,55,66,77,88,99])
def func(i,arry):
arry[i] += 100
print(list(arry))
if __name__ == '__main__':
for i in range(8):
p = Process(target=func,args=(i,arry,))
p.start()
- manage.dict() 字典
#!/usr/bin/env python3
import time
from multiprocessing import Process,Manager
def func(d,i):
d[i] = i
time.sleep(0)
print(d)
if __name__ == '__main__':
d = Manager().dict()
for i in range(2):
p = Process(target=func,args=(d,i,))
p.start()
p.join()
- queue
#!/usr/bin/env python3
from multiprocessing import Process,Queue
import time
def func(q,i):
time.sleep(2)
print(i,q.get())
if __name__ == '__main__':
q = Queue()
q.put('h1')
q.put('h2')
q.put('h3')
for i in range(3):
p = Process(target=func,args=(q,i,))
p.start()
进程锁实例:
#!/usr/bin/env python3
from multiprocessing import Process,Array,Lock
def func(arry,lock,i):
lock.acquire()
arry[i] = i
for item in arry:
print(item)
lock.release()
if __name__ == '__main__':
lock = Lock()
arry = Array('i',[11,22,33,44])
for i in range(4):
p = Process(target=func,args=(arry,lock,i,))
p.start()
p.join()
进程池与线程池
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
进程池:
#!/usr/bin/env python3
from multiprocessing import Process,Pool
import time
def func(i):
time.sleep(2)
print(i)
if __name__ == '__main__':
pool = Pool(5)
for i in range(30):
p = pool.apply_async(func,args=(i,))
pool.close()
pool.join()
线程池:
#!/usr/bin/env python3
from multiprocessing import pool,Process
import time
def func(i):
time.sleep(2)
print(i)
if __name__ == '__main__':
pool = pool.ThreadPool(5)
for i in range(30):
pool.apply_async(func,args=(i,))
pool.close()
pool.join()