1、概念
线程、进程
进程
一个程序,它是一组资源的集合
一个进程里面默认是有一个线程的,主线程
多进程是可以利用多核cpu的
线程
最小的执行单位
线程和线程之间是互相独立的
主线程等待子线程执行结束
线程和线程之间,数据是共享的。
守护线程:
只要主线程执行完成,不管子线程有没有执行完成,全部都结束
一个电脑有几核CPU就只能同时运行几个任务,比如4核CPU只能同时运行4个线程
我们在操作电脑时,感觉是同时运行多个任务,是因为CPU的运算速度很快,有上下文切换,我们感觉不到
python里的多线程利用不了多核CPU,比如我的电脑是8核的CPU,起100个线程,这100个线程都是在一个CPU里面执行,其他7个CPU是空闲的
因为线程之间数据是共享的,同时来处理数据会乱,GLI全局解释器锁,保证线程都在同一个CPU上运行
多进程可以利用多核CPU
CPU密集型任务,用多进程-->消耗CPU比较多
IO(磁盘IO,网络IO)密集型任务,用多线程-->消耗IO比较多
#1、多线程,线程之间数据是共享的
#2、多进程,进程之间数据是独立的
2、多线程
2.1、多线程代码
串行的方式是执行完一个,再接着执行第二个
多线程是同时启用多个线程去操作
import random import threading import time def clean(): print('打扫卫生') time.sleep(2) def xiyifu(): print('洗衣服') time.sleep(1) def cook(): print('做饭') time.sleep(3) def export_data(db,excel): print(threading.current_thread()) print('export_data %s %s' % (db, excel)) time.sleep(random.randint(1, 5)) # 单线程 start_time = time.time() clean() xiyifu() cook() end_time = time.time() print('单线程或串行的运行时间', end_time - start_time) # 多线程 start_time = time.time() t = threading.Thread(target=clean) # 这里只写函数名称 t2 = threading.Thread(target=xiyifu) t3 = threading.Thread(target=cook) t.start() t2.start() t3.start() # 以下这种是并行执行,如果每个子线程启动后就调用join方法,就变成了串行,不可取 t.join() # 主线程等待子线程 t2.join() t3.join() end_time = time.time() print('多线程并行的运行时间', end_time - start_time) time.sleep(4) # 过了4s后,后面开启的线程执行完毕就没有了 print(threading.active_count()) # 当前的线程数
执行结果
2.2、多线程的时间统计
import random import threading import time def clean(): print('打扫卫生') time.sleep(2) def xiyifu(): print('洗衣服') time.sleep(1) def cook(): print('做饭') time.sleep(3) def export_data(db,excel): print(threading.current_thread()) print('export_data %s %s' % (db, excel)) time.sleep(random.randint(1, 5)) # 多线程 start_time = time.time() t = threading.Thread(target=clean) # 这里只写函数名称 t2 = threading.Thread(target=xiyifu) t3 = threading.Thread(target=cook) t.start() t2.start() t3.start() end_time = time.time() print('多线程并行的运行时间', end_time - start_time)
执行结果
正常执行应该是6秒多一点,这里是因为只是主线程执行的时间,没有计算子线程执行的时间,如何解决该问题?
两种方法:
# 方法一:等待子线程执行结束,把启动的子线程放到list中,在循环调用t.join thread_list = [] for i in range(10): t = threading.Thread(target=export_data) thread_list.append(t) t.start() for t in thread_list: t.join() print('线程都运行完了') # 方法二:等待子线程执行结束,通过判断当前线程数 for i in range(10): t = threading.Thread(target=export_data, args=['db1', 'a.xlsx']) # 传参 t.start() while threading.active_count() != 1: pass print('线程都运行完了')
2.3、多线程传参
可以用数组的方式来传参,args=['lxy']
import threading import requests from day09.ketanglianxi_09 import zidonghuayilai_instal def down_load_pic(url): r = requests.get(url) file_name = zidonghuayilai_instal.InstallRequrie.md5(url) + '.jpg' with open(file_name, 'wb') as fw: fw.write(r.content) urls = [ 'https://ss0.bdstatic.com/70cFuHSh_Q1YnxGkpoWK1HF6hhy/it/u=3353166494,2700282750&fm=26&gp=0.jpg', 'https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1603003159808&di=7a97fdd275ec4e0fc4ab54bdb8a2e703&imgtype=0&src=http%3A%2F%2Fwww.pc6.com%2Fup%2F2011-12%2F201112918444441530.jpg', 'https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1603003159808&di=0ae20fe3bd8759c059cb4432938e4062&imgtype=0&src=http%3A%2F%2F5b0988e595225.cdn.sohucs.com%2Fimages%2F20181209%2F38467a58f9264ca68eefa37719b4b739.jpeg', 'https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1603003159807&di=5040439b916279a7106a7660b7e0168a&imgtype=0&src=http%3A%2F%2Fimg1.cache.netease.com%2Fcatchpic%2FE%2FE5%2FE5DD0A8099E2D28226465C6894F7A7A1.jpg' ] for url in urls: t = threading.Thread(target=down_load_pic, args=[url]) t.start() while threading.active_count() != 1: pass print('所有图片下载完毕')
2.4、多线程获取函数返回值
多线程运行函数时,是没有办法获取到函数的返回值,所以可以定义一个全局的list,把函数的返回结果存到list就可以了
case_result = [] def run_case(case_name): print('run case over...') case_result.append({case_name,'success'})
2.5、守护线程
守护线程,一旦主线程死掉,不管守护线程有没有执行完成,守护线程全部都结束
import random import threading import time def talk(name): print('正在和%s聊天' % name) time.sleep(random.randint(1, 5)) print('和%s聊完了' % name) t = threading.Thread(target=talk, args=['lhy']) t.setDaemon(True) # 设置成守护线程 t.start() t = threading.Thread(target=talk, args=['xiaohei']) t.setDaemon(True) t.start() t = threading.Thread(target=talk, args=['xiaobai']) t.setDaemon(True) t.start() # 等待所有子线程都执行完 # while threading.active_count() != 1: # pass print('聊完了') # 未等子进程执行完,主线程就执行完了,那么守护线程也立马结束
2.6、线程锁
多个线程同时操作同一个数据时,会有问题,这个时候需要用到线程锁
线程锁需要设置锁定时长,数据操作完成后,需要解锁,不然其他线程会进入无线等待
import threading count = 0 lock = threading.Lock() def add(): global count for i in range(1000000): # 锁的第一种写法 如果忘记写解锁或锁未释放,就会造成死锁 lock.acquire() # 加锁 count += 1 lock.release() # 解锁 #第二种写法 # with lock: # count += 1 for i in range(2): t = threading.Thread(target=add) t.start() while threading.active_count() != 1: pass print(count)
2.7 线程池
import threading import requests import threadpool from day09.ketanglianxi_09 import zidonghuayilai_instal def down_load_pic(url): print(threading.current_thread()) r = requests.get(url) file_name = zidonghuayilai_instal.InstallRequrie.md5(url) + '.jpg' with open(file_name, 'wb') as fw: fw.write(r.content) urls = [ 'https://ss0.bdstatic.com/70cFuHSh_Q1YnxGkpoWK1HF6hhy/it/u=3353166494,2700282750&fm=26&gp=0.jpg', 'https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1603003159808&di=7a97fdd275ec4e0fc4ab54bdb8a2e703&imgtype=0&src=http%3A%2F%2Fwww.pc6.com%2Fup%2F2011-12%2F201112918444441530.jpg', 'https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1603003159808&di=0ae20fe3bd8759c059cb4432938e4062&imgtype=0&src=http%3A%2F%2F5b0988e595225.cdn.sohucs.com%2Fimages%2F20181209%2F38467a58f9264ca68eefa37719b4b739.jpeg', 'https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1603003159807&di=5040439b916279a7106a7660b7e0168a&imgtype=0&src=http%3A%2F%2Fimg1.cache.netease.com%2Fcatchpic%2FE%2FE5%2FE5DD0A8099E2D28226465C6894F7A7A1.jpg' ] pool = threadpool.ThreadPool(3) reqs = threadpool.makeRequests(down_load_pic, urls) # 让它给子线程分配数据 [pool.putRequest(req) for req in reqs] # 上面一行代码等同于下面2行代码 for req in reqs: pool.putRequest(req) pool.wait() # 等待子线程执行结束
3、多进程
import multiprocessing import time lock = multiprocessing.Lock() # 进程锁 lock.acquire()# 加锁 lock.release()# 解锁 def make_money(): print('开始赚钱') time.sleep(10) def star_process(): for i in range(5): p = multiprocessing.Process(target=make_money) # 如果传参,p = multiprocessing.Process(target=make_money,args=[]) p.start() # 等待子进程执行完毕 while len(multiprocessing.active_children()) != 1: pass print('运行结束') # p = multiprocessing.Process(target=make_money) # p.start() # star_process() # 必须写在__name__ == '__main__'中 if __name__ == '__main__': star_process()
4、队列
# 队列 和list差不多 import queue import random import threading import time orders_q = queue.Queue() # 生产者/消费者模式 def producer(): for i in range(100): order_id = random.randint(1, 99999) print('订单生成,orderid=%d' % order_id) orders_q.put(order_id) time.sleep(1) def consumer(): while True: if orders_q.qsize() > 0: order_id = orders_q.get() print('consumer1--订单落库', order_id) def consumer2(): while True: if orders_q.qsize() > 0: order_id = orders_q.get() print('consumer2--订单落库', order_id) t = threading.Thread(target=producer) t.start() t = threading.Thread(target=consumer) t.start() t = threading.Thread(target=consumer2)