线程_apply堵塞式
'''
创建三个进程,让三个进程分别执行功能,关闭进程
Pool 创建 ,apply执行 , close,join 关闭进程
'''
from multiprocessing import Pool
import os,time,random
def worker(msg):
# 创建一个函数,用来使进程进行执行
time_start = time.time()
print("%s 号进程开始执行,进程号为 %d"%(msg,os.getpid()))
# 使用os.getpid()获取子进程号
# os.getppid()返回父进程号
time.sleep(random.random()*2)
time_end = time.time()
print(msg,"号进程执行完毕,耗时%0.2f"%(time_end-time_start))
# 计算运行时间
if __name__ == '__main__':
po = Pool(3)#创建三个进程
print("进程开始")
for i in range(3):
# 使用for循环,运行刚刚创建的进程
po.apply(worker,(i,))#进程池调用方式apply堵塞式
# 第一个参数为函数名,第二个参数为元组类型的参数(函数运行会用到的形参)
#只有当进程执行完退出后,才会新创建子进程来调用请求
po.close()# 关闭进程池,关闭后po不再接收新的请求
# 先使用进程的close函数关闭,后使用join函数进行等待
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("进程结束")
'''创建->apply应用->close关闭->join等待结束'''
线程_FIFO队列实现生产者消费者
import threading # 导入线程库
import time
from queue import Queue # 队列
class Producer(threading.Thread):
# 线程的继承类,修改 run 方法
def run(self):
global queue
count = 0
while True:
if queue.qsize() <1000:
for i in range(100):
count = count + 1
msg = '生成产品'+str(count)
queue.put(msg)#向队列中添加元素
print(msg)
time.sleep(1)
class Consumer(threading.Thread):
# 线程的继承类,修改 run 方法
def run(self):
global queue
while True:
if queue.qsize() >100 :
for i in range(3):
msg = self.name + '消费了' + queue.get() #获取数据
# queue.get()获取到数据
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue = Queue()
# 创建一个队列
for i in range(500):
queue.put('初始产品'+str(i))
# 在 queue 中放入元素 使用 put 函数
for i in range(2):
p = Producer()
p.start()
# 调用Producer类的run方法
for i in range(5):
c = Consumer()
c.start()
线程_GIL最简单的例子
#解决多进程死循环
import multiprocessing
def deadLoop():
while True:
print("Hello")
pass
if __name__ == '__main__':
# 子进程死循环
p1 = multiprocessing.Process(target=deadLoop)
p1.start()
# 主进程死循环
deadLoop()
线程_multiprocessing实现文件夹copy器
import multiprocessing
import os
import time
import random
def copy_file(queue,file_name,source_folder_name,dest_folder_name):
f_read = open(source_folder_name+"/"+file_name,"rb")
f_write = open(source_folder_name+"/"+file_name,"wb")
while True:
time.sleep(random.random())
content = f_read.read(1024)
if content:
f_write.write(content)
else:
break
f_read.close()
f_write.close()
# 发送已经拷贝完毕的文件名字
queue.put(file_name)
def main():
# 获取要复制的文件夹
source_folder_name = input("请输入要复制的文件夹名字:")
# 整理目标文件夹
dest_folder_name = source_folder_name + "副本"
# 创建目标文件夹
try:
os.mkdir(dest_folder_name)#创建文件夹
except:
pass
# 获取这个文件夹中所有的普通文件名
file_names = os.listdir(source_folder_name)
# 创建Queue
queue = multiprocessing.Manager().Queue()
# 创建线程池
pool = multiprocessing.Pool(3)
for file_name in file_names:
# 向线程池中添加任务
pool.apply_async(copy_file,args=(queue,file_name,source_folder_name,dest_folder_name))#不堵塞执行
# 主进程显示进度
pool.close()
all_file_num = len(file_names)
while True:
file_name = queue.get()
if file_name in file_names:
file_names.remove(file_name)
copy_rate = (all_file_num - len(file_names)) * 100 / all_file_num
print("
%.2f...(%s)" % (copy_rate, file_name) + " " * 50, end="")
if copy_rate >= 100:
break
print()
if __name__ == "__main__":
main()
线程_multiprocessing异步
from multiprocessing import Pool
import time
import os
def test():
print("---进程池中的进程---pid=%d,ppid=%d--"%(os.getpid(),os.getppid()))
for i in range(3):
print("----%d---"%i)
time.sleep(1)
return "hahah"
def test2(args):
print("---callback func--pid=%d"%os.getpid())
print("---callback func--args=%s"%args)
if __name__ == '__main__':
pool = Pool(3)
pool.apply_async(func=test,callback=test2)
# 异步执行
time.sleep(5)
print("----主进程-pid=%d----"%os.getpid())
线程_Process实例
from multiprocessing import Process
import os
from time import sleep
def run_proc(name,age,**kwargs):
for i in range(10):
print("子进程运行中,名字为 = %s,年龄为 = %d,子进程 = %d..."%(name,age,os.getpid()))
print(kwargs)
sleep(0.5)
if __name__ == '__main__':
print("父进程: %d"%(os.getpid()))
pro = Process(target=run_proc,args=('test',18),kwargs={'kwargs':20})
print("子进程将要执行")
pro.start( )
sleep(1)
pro.terminate()#将进程进行终止
pro.join()
print("子进程已结束")
from multiprocessing import Process
import time
import os
#两个子进程将会调用的两个方法
def work_1(interval):
# intercal为挂起时间
print("work_1,父进程(%s),当前进程(%s)"%(os.getppid(),os.getpid()))
start_time = time.time()
time.sleep(interval)
end_time = time.time()
print("work_1,执行时间为%f"%(end_time-start_time))
def work_2(interval):
print("work_2,父进程(%s),当前进程(%s)"%(os.getppid(),os.getpid()))
start_time = time.time()
time.sleep(2)
end_time = time.time()
print("work_2执行时间为:%.2f"%(end_time-start_time))
if __name__ == '__main__':
print("进程Id:", os.getpid())
pro1 = Process(target=work_1, args=(2,))
pro2 = Process(target=work_2, name="pro2", args=(3,))
pro1.start()
pro2.start()
print("pro2.is_alive:%s" % (pro2.is_alive()))
print("pro1.name:", pro1.name)
print("pro1.pid=%s" % pro1.pid)
print("pro2.name=%s" % pro2.name)
print("pro2.pid=%s" % pro2.pid)
pro1.join()
print("pro1.is_alive:", pro1.is_alive())
线程_Process基础语法
"""
Process([group[,target[,name[,args[,kwargs]]]]])
group:大多数情况下用不到
target:表示这个进程实例所调用的对象 target=函数名
name:为当前进程实例的别名
args:表示调用对象的位置参数元组 args=(参数,)
kwargs:表示调用对象的关键字参数字典
"""
"""
常用方法:
is_alive( ):判断进程实例是否还在执行
join([timeout]):是否等待进程实例执行结束或等待多少秒
start():启动进程实例(创建子进程)
run():如果没有给定target函数,对这个对象调用start()方法时,
就将执行对象中的run()方法
terminate():不管任务是否完成,立即停止
"""
"""
常用属性:
name:当前进程实例的别名,默认为Process-N,N从1开始
pid:当前进程实例的PID值
"""
线程_ThreadLocal
import threading
# 创建ThreadLocal对象
house = threading.local()
def process_paper():
user = house.user
print("%s是房子的主人,in %s"%(user,threading.current_thread().name))
def process_thread(user):
house.user = user
process_paper()
t1 = threading.Thread(target=process_thread,args=('Xiaoming',),name='佳木斯')
t2 = threading.Thread(target=process_thread,args=('Hany',),name='哈尔滨')
t1.start()
t1.join()
t2.start()
t2.join()
线程_互斥锁_Lock及fork创建子进程
"""
创建锁 mutex = threading.Lock()
锁定 mutex.acquire([blocking])
当blocking为True时,当前线程会阻塞,直到获取到这个锁为止
默认为True
当blocking为False时,当前线程不会阻塞
释放 mutex.release()
"""
from threading import Thread,Lock
g_num = 0
def test1():
global g_num
for i in range(100000):
mutexFlag = mutex.acquire(True)#通过全局变量进行调用函数
# True会发生阻塞,直到结束得到锁为止
if mutexFlag:
g_num += 1
mutex.release()
print("test1--g_num = %d"%(g_num))
def test2():
global g_num
for i in range(100000):
mutexFlag = mutex.acquire(True)
if mutexFlag:
g_num += 1
mutex.release()
print("----test2---g_num = %d "%(g_num))
mutex = Lock()
p1 = Thread(target=test1,)
# 开始进程
p1.start()
p2 = Thread(target=test2,)
p2.start()
print("----g_num = %d---"%(g_num))
fork创建子进程
import os
# fork()在windows下不可用
pid = os.fork()#返回两个值
# 操作系统创建一个新的子进程,复制父进程的信息到子进程中
# 然后父进程和子进程都会得到一个返回值,子进程为0,父进程为子进程的id号
if pid == 0:
print("哈哈1")
else:
print("哈哈2")
线程_gevent实现多个视频下载及并发下载
from gevent import monkey
import gevent
import urllib.request
#有IO操作时,使用patch_all自动切换
monkey.patch_all()
def my_downLoad(file_name, url):
print('GET: %s' % url)
resp = urllib.request.urlopen(url)
# 使用库打开网页
data = resp.read()
with open(file_name, "wb") as f:
f.write(data)
print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([
gevent.spawn(my_downLoad, "1.mp4", 'http://oo52bgdsl.bkt.clouddn.com/05day-08-%E3%80%90%E7%90%86%E8%A7%A3%E3%80%91%E5%87%BD%E6%95%B0%E4%BD%BF%E7%94%A8%E6%80%BB%E7%BB%93%EF%BC%88%E4%B8%80%EF%BC%89.mp4'),
gevent.spawn(my_downLoad, "2.mp4", 'http://oo52bgdsl.bkt.clouddn.com/05day-03-%E3%80%90%E6%8E%8C%E6%8F%A1%E3%80%91%E6%97%A0%E5%8F%82%E6%95%B0%E6%97%A0%E8%BF%94%E5%9B%9E%E5%80%BC%E5%87%BD%E6%95%B0%E7%9A%84%E5%AE%9A%E4%B9%89%E3%80%81%E8%B0%83%E7%94%A8%28%E4%B8%8B%29.mp4'),
])
from gevent import monkey
import gevent
import urllib.request
# 有耗时操作时需要
monkey.patch_all()
def my_downLoad(url):
print('GET: %s' % url)
resp = urllib.request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([
gevent.spawn(my_downLoad, 'http://www.baidu.com/'),
gevent.spawn(my_downLoad, 'http://www.itcast.cn/'),
gevent.spawn(my_downLoad, 'http://www.itheima.com/'),
])
线程_gevent自动切换CPU协程
import gevent
def f(n):
for i in range(n):
print (gevent.getcurrent(), i)
# gevent.getcurrent() 获取当前进程
g1 = gevent.spawn(f, 3)#函数名,数目
g2 = gevent.spawn(f, 4)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()
import gevent
def f(n):
for i in range(n):
print (gevent.getcurrent(), i)
#用来模拟一个耗时操作,注意不是time模块中的sleep
gevent.sleep(1)
g1 = gevent.spawn(f, 2)
g2 = gevent.spawn(f, 3)
g3 = gevent.spawn(f, 4)
g1.join()
g2.join()
g3.join()
import gevent
import random
import time
def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())
gevent.joinall([
# 添加可以切换的协程
gevent.spawn(coroutine_work, "work0"),
gevent.spawn(coroutine_work, "work1"),
gevent.spawn(coroutine_work, "work2")
])
from gevent import monkey
import gevent
import random
import time
# 有耗时操作时需要
monkey.patch_all()#自动切换协程
# 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块
def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())
gevent.joinall([
gevent.spawn(coroutine_work, "work"),
gevent.spawn(coroutine_work, "work1"),
gevent.spawn(coroutine_work, "work2")
])
线程_使用multiprocessing启动一个子进程及创建Process 的子类
from multiprocessing import Process
import os
# 子进程执行的函数
def run_proc(name):
print("子进程运行中,名称:%s,pid:%d..."%(name,os.getpid()))
if __name__ == "__main__":
print("父进程为:%d..."%(os.getpid()))
# os.getpid()获取到进程名
pro = Process(target=run_proc,args=('test',))
# target=函数名 args=(参数,)
print("子进程将要执行")
pro.start()#进程开始
pro.join()#添加进程
print("子进程执行结束...")
from multiprocessing import Process
import time
import os
# 继承Process类
class Process_Class(Process):
def __init__(self,interval):
Process.__init__(self)
self.interval = interval
# 重写Process类的run方法
def run(self):
print("我是类中的run方法")
print("子进程(%s),开始执行,父进程为(%s)"%(os.getpid(),os.getppid()))
start_time = time.time()
time.sleep(2)
end_time = time.time()
print("%s执行时间为:%.2f秒" % (os.getpid(),end_time-start_time))
if __name__ == '__main__':
start_time = time.time()
print("当前进程为:(%s)"%(os.getpid()))
pro1 = Process_Class(2)
# 对一个不包含target属性的Process类执行start()方法,
# 会运行这个类中的run()方法,所以这里会执行p1.run()
pro1.start()
pro1.join()
end_time = time.time()
print("(%s)执行结束,耗时%0.2f" %(os.getpid(),end_time - start_time))
线程_共享全局变量(全局变量在主线程和子线程中不同)
from threading import Thread
import time
g_num = 100
def work1():
global g_num
for i in range(3):
g_num += 1
print("----在work1函数中,g_num 是 %d "%(g_num))
def work2():
global g_num
print("在work2中,g_num为 %d "%(g_num))
if __name__ == '__main__':
print("---线程创建之前 g_num 是 %d"%(g_num))
t1 = Thread(target=work1)
t1.start()
t2 = Thread(target=work2)
t2.start()
线程_多线程_列表当做实参传递到线程中
from threading import Thread
def work1(nums):
nums.append('a')
print('---在work1中---',nums)
def work2(nums):
print("-----在work2中----,",nums)
if __name__ == '__main__':
g_nums = [1,2,3]
t1 = Thread(target=work1,args=(g_nums,))
# target函数,args参数
t1.start()
t2 = Thread(target=work2,args=(g_nums,))
t2.start()
线程_threading合集
# 主线程等待所有子线程结束才结束
import threading
from time import sleep,ctime
def sing():
for i in range(3):
print("正在唱歌---%d"%(i))
sleep(2)
def dance():
for i in range(3):
print("正在跳舞---%d" % (i))
sleep(2)
if __name__ == '__main__':
print("----开始----%s"%(ctime()))
t_sing = threading.Thread(target=sing)
t_dance = threading.Thread(target=dance)
t_sing.start()
t_dance.start()
print("----结束----%s"%(ctime()))
#查看线程数量
import threading
from time import sleep,ctime
def sing():
for i in range(3):
print("正在唱歌---%d"%i)
sleep(1)
def dance():
for i in range(3):
print("正在跳舞---%d"%i)
sleep(i)
if __name__ == '__main__':
t_sing = threading.Thread(target=sing)
t_dance = threading.Thread(target=dance)
t_sing.start()
t_dance.start()
while True:
length = len(threading.enumerate())
print("当前运行的线程数为:%d"%(length))
if length<= 1:
break
sleep(0.5)
import threading
import time
class MyThread(threading.Thread):
# 重写 构造方法
def __init__(self, num, sleepTime):
threading.Thread.__init__(self)
self.num = num
# 类实例不同,num值不同
self.sleepTime = sleepTime
def run(self):
self.num += 1
time.sleep(self.sleepTime)
print('线程(%s),num=%d' % (self.name, self.num))
if __name__ == '__main__':
mutex = threading.Lock()
t1 = MyThread(100, 3)
t1.start()
t2 = MyThread(200, 1)
t2.start()
import threading
from time import sleep
g_num = 1
def test(sleepTime):
num = 1 #num为局部变量
sleep(sleepTime)
num += 1
global g_num #g_num为全局变量
g_num += 1
print('---(%s)--num=%d --g_num=%d' % (threading.current_thread(), num,g_num))
t1 = threading.Thread(target=test, args=(3,))
t2 = threading.Thread(target=test, args=(1,))
t1.start()
t2.start()
import threading
import time
class MyThread1(threading.Thread):
def run(self):
if mutexA.acquire():
print("A上锁了")
mutexA.release()
time.sleep(2)
if mutexB.acquire():
print("B上锁了")
mutexB.release()
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
if mutexB.acquire():
print("B上锁了")
mutexB.release()
time.sleep(2)
if mutexA.acquire():
print("A上锁了")
mutexA.release()
mutexB.release()
# 先看B是否上锁,然后看A是否上锁
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == "__main__":
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
多线程threading的执行顺序(不确定)
# 只能保证都执行run函数,不能保证执行顺序和开始顺序
import threading
import time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm "+self.name+' @ '+str(i)
print(msg)
def test():
for i in range(5):
t = MyThread()
t.start()
if __name__ == '__main__':
test()
多线程threading的注意点
import threading
import time
class MyThread(threading.Thread):
# 重写threading.Thread类中的run方法
def run(self):
for i in range(3):#开始线程之后循环三次
time.sleep(1)
msg = "I'm "+self.name+'@'+str(i)
# name属性是当前线程的名字
print(msg)
if __name__ == '__main__':
t = MyThread()#使用threading.Thread的继承类
t.start()#继承线程之后要开始运行 start方法
线程_进程间通信Queue合集
# Queue的工作原理
from multiprocessing import Queue
q = Queue(3)#初始化一个Queue对象,最多可接收3条put消息
q.put("Info1")
q.put("Info2")
print("q是否满了",q.full())#查看q是否满了
q.put("Info3")
print("q是否满了",q.full())
try:
q.put_nowait("info4")
except:
print("消息列队已经满了,现有消息数量为:%s"%(q.qsize()))
# 使用q.qsize()查看数量
# 先验证是否满了,再写入
if not q.full():
q.put_nowait("info4")
# 读取信息时,先判断消息列队是否为空,再读取
if not q.empty():
print("开始读取")
for i in range(q.qsize()):
print(q.get_nowait())
from multiprocessing import Queue
from multiprocessing import Process
import os,time,random
def write(q):
for value in ['a','b','c']:
print("Put %s to q ..."%(value))
q.put(value)
time.sleep(random.random())
def read(q):
while True:
if not q.empty():
value = q.get(True)
print("Get %s from Queue..."%(value))
time.sleep(random.random())
else:
break
if __name__ == '__main__':
#父进程创建Queue,传给各个子进程
q = Queue()
pw = Process(target=write,args=(q,))
pr = Process(target=read,args=(q,))
pw.start()
# 等待pw结束
pw.join()
pr.start()
pr.join()
print("数据写入读写完成")
from multiprocessing import Manager,Pool
import os,time,random
# 名称为reader 输出子进程和父进程 os 输出q的信息
def reader(q):
print("reader启动,子进程:%s,父进程:%s"%(os.getpid(),os.getppid()))
for i in range(q.qsize()):#在0 ~ qsize范围内
print("获取到queue的信息:%s"%(q.get(True)))
def writer(q):
print("writer启动,子进程:%s,父进程:%s"%(os.getpid(),os.getppid()))
for i in "HanYang":#需要写入到 q 的数据
q.put(i)
if __name__ == '__main__':
print("%s 开始 "%(os.getpid()))
q = Manager().Queue()#Queue使用multiprocessing.Manager()内部的
po = Pool()#创建一个线程池
po.apply(writer,(q,))#使用apply阻塞模式
po.apply(reader,(q,))
po.close()#关闭
po.join()#等待结束
print("(%s) 结束"%(os.getpid()))
线程_进程池
from multiprocessing import Pool
import os,time,random
def worker(msg):
start_time = time.time()
print("(%s)开始执行,进程号为(%s)"%(msg,os.getpid()))
time.sleep(random.random()*2)
end_time = time.time()
print(msg,"(%s)执行完毕,执行时间为:%.2f"%(os.getpid(),end_time-start_time))
if __name__ == '__main__':
po = Pool(3)#定义一个进程池,最大进程数为3
for i in range(0,6):
po.apply_async(worker,(i,))
# 参数:函数名,(传递给目标的参数元组)
# 每次循环使用空闲的子进程调用函数,满足每个时刻都有三个进程在执行
print("---开始---")
po.close()
po.join()
print("---结束---")
"""
multiprocessing.Pool的常用函数:
apply_async(func[,args[,kwds]]):
使用非阻塞方式调用func,并行执行
args为传递给func的参数列表
kwds为传递给func的关键字参数列表
apply(func[,args[,kwds]])
使用堵塞方式调用func
堵塞方式:必须等待上一个进程退出才能执行下一个进程
close()
关闭Pool,使其不接受新的任务
terminate()
无论任务是否完成,立即停止
join()
主进程堵塞,等待子进程的退出
注:必须在terminate,close函数之后使用
"""
线程_可能发生的问题
from threading import Thread
g_num = 0
def test1():
global g_num
for i in range(1000000):
g_num += 1
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
for i in range(1000000):
g_num += 1
print("---test2---g_num=%d"%g_num)
p1 = Thread(target=test1)
p1.start()
# time.sleep(3)
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
内存泄漏
import gc
class ClassA():
def __init__(self):
print('对象产生 id:%s'%str(hex(id(self))))
def f2():
while True:
c1 = ClassA()
c2 = ClassA()
c1.t = c2#引用计数变为2
c2.t = c1
del c1#引用计数变为1 0才进行回收
del c2
#把python的gc关闭
gc.disable()
f2()
'''
创建三个进程,让三个进程分别执行功能,关闭进程
Pool 创建 ,apply执行 , close,join 关闭进程
'''
from multiprocessing import Pool
import os,time,random
def worker(msg):
# 创建一个函数,用来使进程进行执行
time_start = time.time()
print("%s 号进程开始执行,进程号为 %d"%(msg,os.getpid()))
# 使用os.getpid()获取子进程号
# os.getppid()返回父进程号
time.sleep(random.random()*2)
time_end = time.time()
print(msg,"号进程执行完毕,耗时%0.2f"%(time_end-time_start))
# 计算运行时间
if __name__ == '__main__':
po = Pool(3)#创建三个进程
print("进程开始")
for i in range(3):
# 使用for循环,运行刚刚创建的进程
po.apply(worker,(i,))#进程池调用方式apply堵塞式
# 第一个参数为函数名,第二个参数为元组类型的参数(函数运行会用到的形参)
#只有当进程执行完退出后,才会新创建子进程来调用请求
po.close()# 关闭进程池,关闭后po不再接收新的请求
# 先使用进程的close函数关闭,后使用join函数进行等待
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("进程结束")
'''创建->apply应用->close关闭->join等待结束'''
2020-05-07