老样子,先上参考连接:
https://www.cnblogs.com/jiangfan95/p/11439543.html
https://www.liaoxuefeng.com/wiki/1016959663602400/1017629247922688
https://blog.csdn.net/mr__l1u/article/details/81772073
先复制一篇进程与线程的特点。
1> 进程、线程和协程的认识:
进程是系统进行资源分配和调度的独立单位;
线程是进程的实体,是CPU调度和分派的基本单位;
协程也是线程,称微线程,自带CPU上下文,是比线程更小的执行单元;
2> 区别
一个程序至少有一个进程,一个进程至少有一个线程;
线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高;
进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率;
线程不能够独立执行,必须依存在进程中;
3> 优缺点:线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反
4> 协程:我们假设把一个进程比作我们实际生活中的一个兰州拉面馆,那么负责保持拉面馆运行的服务员就是线程了,每个餐桌代表要完成的任务。
当我们用多线程完成任务时,模式是这样的==》每来一桌的客人,就在那张桌子上安排一个服务员,即有多少桌客人就得对应多少个服务员;
而当我们用协程来完成任务时,模式却有所不同了==》 就安排一个服务员,来吃饭得有一个点餐和等菜的过程,当A在点菜,就去B服务,B叫了菜在等待,我就去C,当C也在等菜并且A点菜点完了,赶紧到A来服务… …依次类推。
从上面的例子可以看出,想要使用协程,那么我们的任务必须有等待。当我们要完成的任务有耗时任务,属于IO密集型任务时,我们使用协程来执行任务会节省很多的资源(一个服务员和多个服务员的区别。##能一个人服务一个店铺,这样的超人给我来一打 (-..-)), 并且可以极大的利用到系统的资源。
上面的介绍,我觉的还是比较不错的,第三个链接可以点进去看原文。
上代码,两种不同的多线程开启模式:
import threading import time import logging import random class MyThreading(threading.Thread): # 通过继承的方式 def __init__(self, num): super(MyThreading, self).__init__() self.num = num def run(self) -> None: time.sleep(random.random()) logging.debug(str(self.__class__.__name__) + '======>' + 'Work: %s' % self.num) def worker(num): """thread worker function""" time.sleep(random.random()) # 通过threading.current_thread().getName()获取线程的名字 logging.debug(threading.current_thread().getName() + ':' + 'Worker: %s' % num) #通过logging来打印显示具体信息 logging.basicConfig(level=logging.DEBUG, format="[%(levelname)s] (%(threadName)-10s) %(message)s") threads = [] for i in range(5): my = MyThreading(i) my.start() t = threading.Thread(target=worker, args=(i,), name='sidian') # 通过直接调用使用多线程 t.start() threads.append(t) threads.append(my) for i in threads: # 加了阻塞让新建的多线程执行。 i.join() logging.info('__main__:' + threading.current_thread().getName())
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t1.py [DEBUG] (Thread-5 ) MyThreading======>Work: 4 [DEBUG] (sidian ) sidian:Worker: 0 [DEBUG] (Thread-1 ) MyThreading======>Work: 0 [DEBUG] (Thread-4 ) MyThreading======>Work: 3 [DEBUG] (sidian ) sidian:Worker: 4 [DEBUG] (sidian ) sidian:Worker: 2 [DEBUG] (sidian ) sidian:Worker: 1 [DEBUG] (Thread-2 ) MyThreading======>Work: 1 [DEBUG] (sidian ) sidian:Worker: 3 [DEBUG] (Thread-3 ) MyThreading======>Work: 2 [INFO] (MainThread) __main__:MainThread Process finished with exit code 0
从代码看,多线程的写法与多进程有很多相同之处。
后面我写个案例看多线程与多进程的一些区别。
import multiprocessing import threading import time n = 0 def run(): global n for i in range(10000): n += i return n def s_time(func): def wrap(): t1 = time.perf_counter() res = func() print(f'({func.__name__})cost_time:{time.perf_counter()-t1:0.5f}') return res return wrap @s_time def run_threads(): threads = [] for i in range(10000): t = threading.Thread(target=run) t.start() threads.append(t) for i in threads: i.join() @s_time def run_process(): threads = [] for i in range(10000): t = multiprocessing.Process(target=run) t.start() threads.append(t) for i in threads: i.join() run_threads() run_process()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t2.py (run_threads)cost_time:8.54433 (run_process)cost_time:20.15610 Process finished with exit code 0
上面是同样开一万个进程与一万的线程的事件对比查,明显多线程快多了。
后面我是我分别开1000个与100个的效果。
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t2.py (run_threads)cost_time:0.86798 (run_process)cost_time:1.20561 Process finished with exit code 0
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t2.py (run_threads)cost_time:0.10313 (run_process)cost_time:0.12918 Process finished with exit code 0
总体来说,针对IO任务,多线程的开启速度明显比多进程快。
下面讲一下守护线程,跟守护进程一样,守护线程不阻塞主线程的运行,但随着主线程的运行结束而结束。默认情况下线程不作为守护线程。
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format="(%(threadName)-10s) %(message)s", ) d = threading.Thread(target=daemon, name='daemon', daemon=True) # 开启守护线程 # d.setDaemon(True) # 第二种方式 t = threading.Thread(target=non_daemon, name='non_daemon') d.start() t.start() d.join(0.1) # 设置守护线程的阻塞时间,如果不设置时间,讲长期阻塞 logging.info('d.isAlive' + str(d.is_alive())) # 判断该线程是否还活着
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t3.py (daemon ) Starting (non_daemon) Starting (non_daemon) Exiting (MainThread) d.isAliveTrue Process finished with exit code 0
下面再介绍一个感觉不是很实用的枚举所有线程的方法。
import random import threading import time import logging def worker(): """thread worker function""" pause = random.randint(1, 5) / 10 logging.debug('sleeping %0.2f', pause) time.sleep(pause) logging.debug('ending') logging.basicConfig( level=logging.DEBUG, format="[%(levelname)s] (%(threadName)-10s) %(message)s", ) for i in range(3): t = threading.Thread(target=worker) t.setDaemon(True) t.start() main_thread = threading.main_thread() logging.info(main_thread) # 主线程 logging.info(threading.enumerate()) for t in threading.enumerate(): # 枚举所有的线程 if t is main_thread: # 如果是主线程就跳过 continue logging.debug(f'joining {t.getName()}') # 非主线程就阻塞 t.join()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t4.py [DEBUG] (Thread-1 ) sleeping 0.20 [DEBUG] (Thread-2 ) sleeping 0.50 [DEBUG] (Thread-3 ) sleeping 0.30 [INFO] (MainThread) <_MainThread(MainThread, started 4757224896)> [INFO] (MainThread) [<_MainThread(MainThread, started 4757224896)>, <Thread(Thread-1, started daemon 123145473859584)>, <Thread(Thread-2, started daemon 123145479114752)>, <Thread(Thread-3, started daemon 123145484369920)>] [DEBUG] (MainThread) joining Thread-1 [DEBUG] (Thread-1 ) ending [DEBUG] (MainThread) joining Thread-2 [DEBUG] (Thread-3 ) ending [DEBUG] (Thread-2 ) ending [DEBUG] (MainThread) joining Thread-3 Process finished with exit code 0
其实我感觉为什么这么麻烦,其实直接做个空列表,把需要阻塞的线程放进去再循环不是一样吗?
比如下面的:
import random import threading import time import logging def worker(): """thread worker function""" pause = random.randint(1, 5) / 10 logging.debug('sleeping %0.2f', pause) time.sleep(pause) logging.debug('ending') logging.basicConfig( level=logging.DEBUG, format="[%(levelname)s] (%(threadName)-10s) %(message)s", ) threads = [] for i in range(3): t = threading.Thread(target=worker) t.setDaemon(True) t.start() threads.append(t) main_thread = threading.main_thread() logging.info(main_thread) # 主线程 logging.info(threading.enumerate()) # for t in threading.enumerate(): # 枚举所有的线程 # if t is main_thread: # 如果是主线程就跳过 # continue # logging.debug(f'joining {t.getName()}') # 非主线程就阻塞 # t.join() for t in threads: t.join()
这样不是更加简单吗?
下面上一个更加无聊的定时器线程,实在想不通,这种定时器线程一般用在什么地方。
import threading import time import logging def delayed(): logging.debug('worker running') logging.basicConfig( level=logging.DEBUG, format="[%(levelname)s] (%(threadName)-10s) %(message)s", ) t1 = threading.Timer(0.3, delayed) # 0.3秒后启动线程 t1.setName('t1') t2 = threading.Timer(0.3, delayed) t2.setName('t2') logging.debug('starting timers') t1.start() t2.start() logging.debug(f'waiting before canceling {t2.getName()}') time.sleep(0.2) logging.debug(f'canceling {t2.getName()}') t2.cancel() # 取消线程 logging.debug('done')
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_timer.py [DEBUG] (MainThread) starting timers [DEBUG] (MainThread) waiting before canceling t2 [DEBUG] (MainThread) canceling t2 [DEBUG] (MainThread) done [DEBUG] (t1 ) worker running Process finished with exit code 0
从代码可以看出,Timer我只看到了延迟启动线程任务与中途可以取消任务的功能,但实在很难想象使用的场景。
下面介绍一个多线程里面的Event线程通讯管理,其实多线程共享数据,我觉设置全局变量作为通讯管理应该也是不错的选择。
import logging import threading import time def wait_for_event(e): """wait for the event to be set before doing anything""" logging.debug('wait for event starting') # 线程启动最开始启动 event_is_set = e.wait() # 这个还是比较有意思的,竟然还有返回值。 logging.debug(f'event set: {event_is_set}') logging.debug(str(e.is_set())) def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" while not e.is_set(): # 默认事件为关闭 logging.debug('wait_for_event_timeout starting') event_is_set = e.wait(t) # 线程开始进来等待阻塞,可以设置阻塞时间,返回值为时间是否为设置的值 logging.debug('event set : %s', event_is_set) if event_is_set: logging.debug('processing event') else: logging.debug('doing other work') logging.basicConfig( level=logging.DEBUG, format="[%(levelname)s] (%(threadName)-10s) %(message)s", ) e = threading.Event() t1 = threading.Thread( name='block', target=wait_for_event, args=(e,), ) t1.start() t2 = threading.Thread( name='nonblock', target=wait_for_event_timeout, args=(e, 2), ) t2.start() logging.debug('Waiting before calling Event.set()') time.sleep(3) e.set() logging.debug('Event is set')
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/thread_event.py [DEBUG] (block ) wait for event starting [DEBUG] (nonblock ) wait_for_event_timeout starting [DEBUG] (MainThread) Waiting before calling Event.set() [DEBUG] (nonblock ) event set : False [DEBUG] (nonblock ) doing other work [DEBUG] (nonblock ) wait_for_event_timeout starting [DEBUG] (MainThread) Event is set [DEBUG] (block ) event set: True [DEBUG] (block ) True [DEBUG] (nonblock ) event set : True [DEBUG] (nonblock ) processing event Process finished with exit code 0
从代码可以看出e.wait()还是非常好用的一个方法。
接下来讲一下,多线程操作中的锁。这一点很重要,上面讲的是同步线程操作,这里是要能控制对共享资源的访问,从而避免破坏或丢失数据。
Python的内置数据结构(列表、字典等)是线程安全的,这是Python使用原子字节码来管理这些数据结构的一个副作用(全局解释器锁的一个好处)
Python中实现的其他数据结构或更简单的类型(如整数和浮点数)则没有这个保护。要保证同时安全地访问一个对象,可以使用Lock对象。
锁尽量用with,这样避免锁忘记关闭,防止死锁。
import logging
import threading, time
logging.basicConfig(level=logging.INFO)
# 10 -> 100cups
cups = []
lock = threading.Lock()
def worker(lock = threading.Lock, task=100):
while True:
count = len(cups) # 当多个最后count为99的时候,如果有多个线程进入的话,cups数据个数将出现问题。
if count >= task:
break
logging.info(count)
cups.append(1)
logging.info("{} make 1........ ".format(threading.current_thread().getName()))
logging.info("{} ending=======>".format(len(cups)))
for x in range(10):
threading.Thread(target=worker, args=(lock, 100)).start()
INFO:root:104 ending=======> INFO:root:104 ending=======> INFO:root:Thread-6 make 1........ INFO:root:105 ending=======> INFO:root:105 ending=======> INFO:root:105 ending=======>
import logging import threading, time logging.basicConfig(level=logging.INFO) # 10 -> 100cups cups = [] lock = threading.Lock() def worker(lock = None, task=100): with lock: # 加锁以后 while True: count = len(cups) # 当多个最后count为99的时候,如果有多个线程进入的话,cups数据个数将出现问题。 if count >= task: break logging.info(count) cups.append(1) logging.info("{} make 1........ ".format(threading.current_thread().getName())) logging.info("{} ending=======>".format(len(cups))) for x in range(10): threading.Thread(target=worker, args=(lock, 100)).start()
INFO:root:Thread-1 make 1........ INFO:root:96 INFO:root:Thread-1 make 1........ INFO:root:97 INFO:root:Thread-1 make 1........ INFO:root:98 INFO:root:Thread-1 make 1........ INFO:root:99 INFO:root:Thread-1 make 1........ INFO:root:100 ending=======> INFO:root:100 ending=======> INFO:root:100 ending=======> INFO:root:100 ending=======> INFO:root:100 ending=======> INFO:root:100 ending=======> INFO:root:100 ending=======> INFO:root:100 ending=======> INFO:root:100 ending=======> INFO:root:100 ending=======>
加锁以后数据就正确了,但加锁真的是一件非常让人蛋疼的事情,这样的话,基本多线程的效果等于0,因为加锁了以后会让并行的线程变成串行,效率会低很多。
Lock的方法:
acquire(blocking=True,timeout=-1) 加锁。默认True阻塞,阻塞可以设置超时时间。非阻塞时成功获取锁返回True,否则返回False。
这个还是非常有意思的,当你再acquire里面填参数0的时候,这个锁大家都可以拿,但假如返回是Flase说明你拿的是假锁。
l = threading.Lock() r1 = l.acquire(0) r2 = l.acquire(0) l.release() r3 = l.acquire(0) r4 = l.acquire(0) logging.info(f'r1>{r1};r2>{r2};r3>{r3};r4>{r4}')
INFO:root:r1>True;r2>False;r3>True;r4>False
这个还是蛮有意思的,可以通过返回值来判断,谁拿到的锁是真的锁,而且大家可以重复去拿锁,当那把锁解锁以后,第一个拿到的锁的,就是True
插播一条廖大的threading.local代码:
import threading # 创建全局ThreadLocal对象(在实际使用中,其实你创建了几个线程后期就有几个实际的对象): local_school = threading.local() def process_student(): # 获取当前线程关联的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().getName())) def process_thread(name): # 绑定ThreadLocal的student: local_school.student = name process_student() t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join()
这个threading.loacl,其实在任何一条线程里面都有一个对立的对象,说实话,要是我用,还不如函数里面层层传递比较,又直接,又清爽。
而且我感觉这个对象创建还会比较消耗内存。
后面用的比较少模块我觉的,我讲抄袭Python3标准库的代码,做好相关解释。
还有由于多线程是共享内存的,如果为了防止用锁带来的效率底下,可以使用queue模块。
queue队列 :使用import queue,用法与进程Queue一样
class queue.
Queue
(maxsize=0) #先进先出
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''
class queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
import queue
q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''
ok
同样你可以用queue写一个生产者与消费者的模型。以下我先插入写一个生产者,消费者模型吧。
import threading import queue import logging import time logging.basicConfig( level=logging.DEBUG, format='[%(levelname)s] (%(threadName)s) => %(message)s' ) def consumer(q): while True: res = q.get() if res is None: logging.debug(f'你没东西给我吃了,我走了') break logging.debug(f'我开始吃{res}了') def product(q): for i in ('猪头','milk', 'egg'): logging.debug(f'我放入了{i}') q.put(i) time.sleep(2) q.put(None) # 给一个结束信号 if __name__ == '__main__': q = queue.Queue() c = threading.Thread(target=consumer, args=(q,)) p = threading.Thread(target=product, args=(q,)) c.start() p.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_queue.py [DEBUG] (Thread-2) => 我放入了猪头 [DEBUG] (Thread-1) => 我开始吃猪头了 [DEBUG] (Thread-2) => 我放入了milk [DEBUG] (Thread-1) => 我开始吃milk了 [DEBUG] (Thread-2) => 我放入了egg [DEBUG] (Thread-1) => 我开始吃egg了 [DEBUG] (Thread-1) => 你没东西给我吃了,我走了 Process finished with exit code 0
接下来,我开始抄写,注释代码了。
首先是同步线程除了使用Event还能使用Condition对象来同步线程。
import logging import threading import time def consumer(cond): """wait for the condition and use the resource""" logging.debug('Starting consumer thread') with cond: # 必须再condition的环境下才等待 cond.wait() # 开始等待 logging.debug('Resource is available to consumer') def producer(cond): """set up the resource to be used by the consumer""" logging.debug('Starting producer thread') with cond: # 必须在condition环境下菜能设置condition logging.debug('Making resource available') cond.notifyAll() # 给条件通知 logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) condition = threading.Condition() # print(dir(condition)) c1 = threading.Thread(name='c1', target=consumer, args=(condition,)) c2 = threading.Thread(name='c2', target=consumer, args=(condition,)) p = threading.Thread(name='p', target=producer, args=(condition,)) c1.start() time.sleep(0.2) c2.start() time.sleep(0.2) p.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threding_condition.py 2019-12-12 02:16:24,131 (c1) Starting consumer thread 2019-12-12 02:16:24,334 (c2) Starting consumer thread 2019-12-12 02:16:24,535 (p ) Starting producer thread 2019-12-12 02:16:24,536 (p ) Making resource available 2019-12-12 02:16:24,536 (c1) Resource is available to consumer 2019-12-12 02:16:24,536 (c2) Resource is available to consumer Process finished with exit code 0
屏障(barrier)是另外一种线程同步机制。Barrier会建立一个控制点,所有参与的线程会在这里阻塞,直到所有这些参与线程到达这个点。
当线程的数量等于你的设置量时候,线程开始工作。感觉非常有意思的玩意。
import threading import time def work(barrier): print(threading.current_thread().getName(), 'waiting for barrier with {} others'.format(barrier.n_waiting)) worker_id = barrier.wait() # 设置线程数量阻塞,返回值会阻塞数量,跟列表有点像从从0开始,到2返回值,就说明三个线程阻塞完成。 # print('worker_id_num====>',worker_id) print(threading.current_thread().getName(), 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS) # 外部定义barrier传入运作函数里面。 threads = [ threading.Thread( name=f'worker-{i}', target=work, args=(barrier,), ) for i in range(6) # 定一6个线程,如果定义的线程数量如阻塞的数量为非整除,整个线程将阻塞。 ] for t in threads: print(t.getName(),'starting') t.start() time.sleep(0.1) for t in threads: t.join()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_barrier.py worker-0 starting worker-0 waiting for barrier with 0 others worker-1 starting worker-1 waiting for barrier with 1 others worker-2 starting worker-2 waiting for barrier with 2 others worker-2 after barrier 2 worker-0 after barrier 0 worker-1 after barrier 1 worker-3 starting worker-3 waiting for barrier with 0 others worker-4 starting worker-4 waiting for barrier with 1 others worker-5 starting worker-5 waiting for barrier with 2 others worker-5 after barrier 2 worker-4 after barrier 1 worker-3 after barrier 0 Process finished with exit code 0
后面将介绍一个在使用abort()来避免由于线程数量问题导致的阻塞。
import threading import time def work(barrier): print(threading.current_thread().getName(), 'waiting for barrier with {} others'.format(barrier.n_waiting)) try: worker_id = barrier.wait() # 设置线程数量阻塞,返回值会阻塞数量,跟列表有点像从从0开始,到2返回值,就说明三个线程阻塞完成。 # print('worker_id_num====>',worker_id) except threading.BrokenBarrierError: print(threading.current_thread().getName(), 'aborting') else: print(threading.current_thread().getName(), 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS) # 外部定义barrier传入运作函数里面。 threads = [ threading.Thread( name=f'worker-{i}', target=work, args=(barrier,), ) for i in range(4) # 定一6个线程,如果定义的线程数量如阻塞的数量为非整除,整个线程将阻塞。 ] for t in threads: print(t.getName(),'starting') t.start() time.sleep(0.1) barrier.abort() # 定义abort取消符后,就非常方便,当最后的线程无法满足条件,将自动报错,可以通过except接收。 for t in threads: t.join()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_barrier_abort.py worker-0 starting worker-0 waiting for barrier with 0 others worker-1 starting worker-1 waiting for barrier with 1 others worker-2 starting worker-2 waiting for barrier with 2 others worker-2 after barrier 2 worker-0 after barrier 0 worker-1 after barrier 1 worker-3 starting worker-3 waiting for barrier with 0 others worker-3 aborting Process finished with exit code 0
还有最后两个,再说一个限制资源的并发访问。有时候可能需要允许多个工作线程同时访问一个资源,但要限制总数。列如,连接池支持同时链接,但数目可能固定,或者一个网络应用可能支持固定数目的并发下载。这些可以用Semaphore来管理。
import logging import random import threading import time class ActivePool: def __init__(self): self.active = [] self.lock = threading.Lock() def makeActive(self,name): # 不加锁的情况下 # with self.lock: self.active.append(name) time.sleep(1) logging.debug(f'A{threading.current_thread().getName()}-Running: {self.active}') def makeInactive(self,name): # with self.lock: self.active.remove(name) # time.sleep(0.1) logging.debug(f'I{threading.current_thread().getName()}-Running: {self.active}') def worker(s, pool): logging.debug('Waiting to join the pool') with s: # 可以使用acquire与release # time.sleep(0.1) name = threading.current_thread().getName() print(f'{name} is coming.') pool.makeActive(name) # time.sleep(0.2) pool.makeInactive(name) # time.sleep(0.1) print(f'{name} is out.') logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) pool = ActivePool() s = threading.Semaphore(2) # 设置最大连接数量 def spend_time(func): def warp(): t1 = time.perf_counter() func() t2 = time.perf_counter() print(f'speng_time is :{t2 -t1:0.5f}') return warp @spend_time def run(): threads = [] for i in range(4): t = threading.Thread( target=worker, name=str(i), args=(s, pool), ) t.start() threads.append(t) for t in threads: t.join() run()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/therading_semaphonre.py 0 is coming. 1 is coming. 2019-12-13 03:22:01,429 (0 ) Waiting to join the pool 2019-12-13 03:22:01,429 (1 ) Waiting to join the pool 2019-12-13 03:22:01,430 (2 ) Waiting to join the pool 2019-12-13 03:22:01,430 (3 ) Waiting to join the pool 0 is out. 2 is coming.1 is out. 3 is coming. 2019-12-13 03:22:02,429 (0 ) A0-Running: ['0', '1'] 2019-12-13 03:22:02,429 (0 ) I0-Running: ['1'] 2019-12-13 03:22:02,430 (1 ) A1-Running: ['1'] 2019-12-13 03:22:02,430 (1 ) I1-Running: [] 2019-12-13 03:22:03,430 (3 ) A3-Running: ['2', '3'] 2019-12-13 03:22:03,431 (3 ) I3-Running: ['2'] 2019-12-13 03:22:03,431 (2 ) A2-Running: ['2'] 2019-12-13 03:22:03,431 (2 ) I2-Running: [] 3 is out. 2 is out. speng_time is :2.00197 Process finished with exit code 0
加锁的情况下:
import logging import random import threading import time class ActivePool: def __init__(self): self.active = [] self.lock = threading.Lock() def makeActive(self,name): # 不加锁的情况下 with self.lock: self.active.append(name) time.sleep(1) logging.debug(f'A{threading.current_thread().getName()}-Running: {self.active}') def makeInactive(self,name): with self.lock: self.active.remove(name) # time.sleep(0.1) logging.debug(f'I{threading.current_thread().getName()}-Running: {self.active}') def worker(s, pool): logging.debug('Waiting to join the pool') with s: # 可以使用acquire与release # time.sleep(0.1) name = threading.current_thread().getName() print(f'{name} is coming.') pool.makeActive(name) # time.sleep(0.2) pool.makeInactive(name) # time.sleep(0.1) print(f'{name} is out.') logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) pool = ActivePool() s = threading.Semaphore(2) # 设置最大连接数量 def spend_time(func): def warp(): t1 = time.perf_counter() func() t2 = time.perf_counter() print(f'speng_time is :{t2 -t1:0.5f}') return warp @spend_time def run(): threads = [] for i in range(4): t = threading.Thread( target=worker, name=str(i), args=(s, pool), ) t.start() threads.append(t) for t in threads: t.join() run()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/therading_semaphonre.py 2019-12-13 03:24:15,506 (0 ) Waiting to join the pool 2019-12-13 03:24:15,506 (1 ) Waiting to join the pool 2019-12-13 03:24:15,506 (2 ) Waiting to join the pool 2019-12-13 03:24:15,507 (3 ) Waiting to join the pool 0 is coming.1 is coming. 2019-12-13 03:24:16,507 (1 ) A1-Running: ['1'] 2019-12-13 03:24:16,507 (1 ) I1-Running: [] 1 is out. 2 is coming. 2019-12-13 03:24:17,509 (0 ) A0-Running: ['0'] 2019-12-13 03:24:17,510 (0 ) I0-Running: [] 0 is out. 3 is coming. 2019-12-13 03:24:18,512 (2 ) A2-Running: ['2'] 2019-12-13 03:24:18,512 (2 ) I2-Running: [] 2 is out. 2019-12-13 03:24:19,514 (3 ) A3-Running: ['3'] 2019-12-13 03:24:19,515 (3 ) I3-Running: [] 3 is out. speng_time is :4.00896 Process finished with exit code 0
其实根据我实际的操作来看,是否需要加锁应该也看实际需要,像Python保护的类型,操作列表,字典就无需加锁,要不然效率非常低。
在测试多线程运行的过程中,当跑的线程中,外部传入的函数的参数,是属于每个函数内部的一部分,并不会相互干扰,属于局部变量。
最后对于前面的local类的使用,参照Python3标准库的介绍,再详细记录一下。(主要用在函数内部调用函数,每个人操作一份具体的属性值,可以参考操作个人的存款)
有些资源需要锁定以便于多个线程使用,另外一些资源则需要保护,让他们能够对非这些线程的"所有者"隐藏。
local()类会创建一个对象,,它能隐藏值,使其再不同的线程中无法被看到。(一半这种用在多层函数使用中,一层需要使用的函数就可以设置好对象属性,后面所有的函数对可以直接调用这个对象属性)
对比廖大的解释,感觉还是这个专业书籍翻译过来的解释更加让人理解。
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug(f'value={val}') def worker(data): show_value(data) data.value = random.randint(1, 100) # 对对象的属性进行赋值 show_value(data) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = threading.local() show_value(local_data) # 主线程开始先跑起来 local_data.value = 1000 show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) # 开启两个线程,并传入local对象。 t.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/thread_local_book.py (MainThread) No value yet (MainThread) value=1000 (Thread-1 ) No value yet (Thread-1 ) value=90 (Thread-2 ) No value yet (Thread-2 ) value=62 Process finished with exit code 0
上面的代码测试了,不同的线程执行可以拥有各自线程的对象属性。但如果希望这个对象一出来就自带一个默认的线程属性,
可以继承local,并再__init__初始化过程中给予对象设置属性。(银行用户操作,其实用起来蛮好了,可以混合继承该类,用于多线程操作,操作起来会更加方便)
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug(f'value={val}') def worker(data): show_value(data) data.value = random.randint(1, 100) # 对对象的属性进行赋值 show_value(data) class MyLocal(threading.local): # 通过继承的方式 def __init__(self,value): super(MyLocal, self).__init__() logging.debug('Initializing %r', self) self.value = value # 给予对象初始值 logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = MyLocal(1000) show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) # 开启两个线程,并传入local对象。 t.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/thread_loacl_defaults.py (MainThread) Initializing <__main__.MyLocal object at 0x1060acc20> (MainThread) value=1000 (Thread-1 ) Initializing <__main__.MyLocal object at 0x1060acc20> (Thread-1 ) value=1000 (Thread-1 ) value=53 (Thread-2 ) Initializing <__main__.MyLocal object at 0x1060acc20> (Thread-2 ) value=1000 (Thread-2 ) value=99 Process finished with exit code 0
最后给自己提个醒,无论是Event事件,还是condition,又或者是semaphore,local等等,杜需要将该对象放入函数中,再外面对该对象设置好相关参数,再函数中执行该对象具体方法。
初步的就记录到这里,总体感觉多线程与多进程很多操作都是非常相似的,平时一半并发操作,应该还是多线程比较好,至少内存使用少,启动快。
而且对比方法里面线程也比较多,也不存在数据通信问题。