zoukankan      html  css  js  c++  java
  • 线程

    多进程和多线程选择

        一般来说IO密集型用线程(python中线程无法夸cpu执行,i在IO操作只会少许占用cpu,线程安全),计算密集型用多进程(多进程可以跑在不同cpu上)。线程带GIL锁,同时操作IO时候只有一个可以成功操作。

        多进程和线程开多少合适? 理论上多进程个数可以等于cpu核数,线程数量看业务而定,cpu在线程上切换也会占用时间(上下文切换)

    thread方法说明

    t.start() : 激活线程,

    t.getName() : 获取线程的名称

    t.setName() : 设置线程的名称 

    t.name : 获取或设置线程的名称

    t.is_alive() : 判断线程是否为激活状态

    t.isAlive() :判断线程是否为激活状态

    t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

    t.isDaemon() : 判断是否为守护线程

    t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。

    t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

    t.run() :线程被cpu调度后自动执行线程对象的run方法

    简单线程操作

     1 #-*- coding:utf-8 -*-
     2 import time
     3 import threading
     4 
     5 
     6 def f0():pass
     7 
     8 
     9 def f1(*args):
    10     time.sleep(10)
    11     f0()
    12 
    13 #线程执行前准备
    14 t= threading.Thread(target=f1,args=(111,222))
    15 #开始执行线程
    16 t.start()
    17 
    18 
    19 t= threading.Thread(target=f1,args=(111,222))
    20 t.start()
    21 t= threading.Thread(target=f1,args=(111,222))
    22 t.start()

    线程锁

    我们使用线程对数据进行操作的时候,如果多个线程同时修改某个数据,可能会出现不可预料的结果,为了保证数据的准确性,引入了锁的概念。

     1 #-*- coding:utf-8  -*-
     2 import threading
     3 import time
     4  
     5 globals_num = 0
     6  
     7 lock = threading.RLock()
     8  
     9 def Func():
    10     lock.acquire()  # 获得锁 
    11     global globals_num
    12     globals_num += 1
    13     time.sleep(1)
    14     print(globals_num)
    15     lock.release()  # 释放锁 
    16  
    17 for i in range(10):
    18     t = threading.Thread(target=Func)
    19     t.start()
    View Code

    threading.RLock和threading.Lock 的区别

    RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

    线程之间通知(Event)

    Event是线程间通信最间的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。 Events 管理一个flag,这个flag可以使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True之前。flag默认为False。

    • Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
    • Event.set() :将标识位设为Ture
    • Event.clear() : 将标识位设为False。
    • Event.isSet() :判断标识位是否为Ture。
     1 #-*- coding:utf-8  -*-
     2 import threading
     3 
     4 def do(event):
     5     print('start')
     6     #线程在这里阻塞了。等待event_obj.set()方法才能继续执行线程
     7     event.wait()
     8     print('execute')
     9 
    10 event_obj = threading.Event()
    11 for i in range(10):#加锁
    12     t = threading.Thread(target=do,args=(event_obj,))
    13     t.start()
    14 
    15 #event_obj.clear()设置为flase 线程就阻塞了。
    16 event_obj.clear()
    17 
    18 inp = raw_input('input:')#放锁
    19 print(inp)
    20 if inp == 'true':
    21     #发送event事件,线程有阻塞变成无阻塞
    22     event_obj.set()
    event

    当某事过程完毕后改变事件,被阻塞的线程将继续执行

    threading.Condition:

    一个condition变量总是与某些类型的锁相联系,这个可以使用默认的情况或创建一个,当几个condition变量必须共享和同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。

    condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和 release() 会调用与锁相关联的相应的方法。

    其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,

    Condition类实现了一个conditon变量。 这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。 如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。

    • wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。

    如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。

    注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。除非线程调用notify()和notify_all()之后放弃了锁的所有权。

    在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。 例子: 生产者-消费者模型

     1 #-*- coding:utf-8 -*-
     2 import threading
     3 import time
     4 
     5 
     6 def consumer(cond):
     7     #不管释放锁还是加锁都需要先用with
     8     with cond:
     9         print("consumer before wait")
    10         #等待解锁,如果没写cond.wait()这个方法程序会往下执行
    11         cond.wait()
    12         print("consumer after wait")
    13 
    14 
    15 #释放锁
    16 def producer(cond):
    17     with cond:
    18         print("producer before notifyAll")
    19         #释放锁,这个必须是同一个condition对象
    20         cond.notifyAll()
    21         print("producer after notifyAll")
    22 
    23 #定义condition对象
    24 condition = threading.Condition()
    25 
    26 c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
    27 c2 = threading.Thread(name="c2", target=consumer, args=(condition,))
    28 
    29 p = threading.Thread(name="p", target=producer, args=(condition,))
    30 
    31 c1.start()
    32 time.sleep(2)
    33 c2.start()
    34 time.sleep(2)
    35 
    36 #释放锁
    37 p.start()
    condition

    自定义线程池(线程池默认没有)

    一简单的方法实现线程池

     1 #-*- coding:utf-8 -*-
     2 import  threading
     3 import time
     4 import Queue
     5 
     6 
     7 #线程池
     8 class ThreadPool(object):
     9 
    10     def __init__(self,max_num=20):
    11         #创建队列默认20个
    12         self.queue = Queue.Queue(max_num)
    13 
    14         #增加线程方法到Queue里面
    15         for i in xrange(max_num):
    16             self.queue.put(threading.Thread)
    17 
    18     #获取队列线程
    19     def get_thread(self):
    20         return self.queue.get()
    21 
    22     def add_thread(self):
    23         self.queue.put(threading.Thread)
    24 
    25 
    26 def func(pool,num):
    27     time.sleep(1)
    28     print(num)
    29     #取一个线程就需要往队列中在放一个线程
    30     pool.add_thread()
    31 
    32 p = ThreadPool(5)
    33 
    34 for i in range(10):
    35     thread=p.get_thread()
    36     t= thread(target=func,args=(p,i,))
    37     t.start()
    View Code

    二增强版线程池

      1 #-*- coding:utf-8 -*-
      2 import  threading
      3 import time
      4 import Queue
      5 import contextlib
      6 
      7 StopEvent=object()
      8 
      9 class ThreadPool(object):
     10     def __init__(self,max_num):
     11         #这个Q是无限大的用来接受任务
     12         self.q = Queue.Queue()
     13 
     14         #执行线程计数器
     15         self.work_num =0
     16 
     17         #为True线程会立即终止执行
     18         self.terminal = False
     19 
     20         #最多创建的线程数
     21         self.max_num = max_num
     22 
     23         #真实创建的线程列表
     24         self.generate_list = []
     25 
     26         #空闲线程列表
     27         self.free_list = []
     28 
     29     def run(self,func,args,cllback=None):
     30 
     31         #w元组类型存储的执行函数,参数,callback函数。put到任务队列中去
     32         w = (func,args,cllback)
     33         self.q.put(w)
     34 
     35         #判断是否创建新的线程
     36         #1空线程等于0
     37         #2工作线程小于最大线程数
     38         if len(self.free_list) == 0 and len(self.generate_list)<self.max_num:
     39            #创建线程
     40            self.generate_thread()
     41 
     42     #创建线程
     43     def generate_thread(self):
     44         #启动一个线程然后调用self.call方法
     45         t = threading.Thread(target=self.call)
     46         t.start()
     47 
     48     #当任务执行完毕后往队列中增加停止符
     49     def close(self):
     50         while self.generate_list:
     51             self.q.put(StopEvent)
     52 
     53 
     54     def terminate(self):
     55         self.terminal = True
     56         self.close()
     57         #清空队列.这里已经终止队列中可能还有很多任务没有完成
     58         self.q.empty()
     59 
     60     #做任务
     61     def call(self):
     62         #获取当前线程池
     63         current_thread = threading.currentThread
     64         #加入工作列表
     65         self.generate_list.append(current_thread)
     66 
     67         #获取q中的任务,没有任务get就会阻塞线程
     68         event = self.q.get()
     69 
     70         #增加计数器
     71         self.work_num +=1
     72 
     73         while event != StopEvent:
     74              #获取的event 如果是元组那么它就是任务,其它格式肯定不是任务
     75              #解开任务包
     76              func, args ,callable = event
     77 
     78 
     79              try:
     80                     #获取执行方法的返回值
     81                     result = func(*args)
     82                     #状态
     83                     status = True
     84              except Exception as e:
     85                     #func函数执行过程中报异常了,这里捕获一下
     86                     result = e
     87                     #状态改为false
     88                     status = False
     89 
     90              #判断一下是否需要回调
     91              if callable is not None:
     92                  try:
     93                      #执行回调函数,并且把结果当参数传递到callable中
     94                      #回调函数中只需要判断下status就知道func函数是否执行成功
     95                      callable(status,result)
     96                  except Exception as e:
     97                      pass
     98              if self.terminal:
     99                  event = StopEvent
    100              else:
    101                  #标记,free_list空闲列表中
    102                  self.free_list.append(current_thread)
    103 
    104                  #获取任务,如果没有get方法将阻塞线程,等待任务到来
    105                  event = self.q.get()
    106 
    107                  #从空闲队列中移除线程
    108                  self.free_list.remove(current_thread)
    109 
    110         else:#这里event == StopEvent 就执行这里,说明event不是元组
    111             self.generate_list.remove(current_thread)
    112 
    113 
    114 def f1(args):
    115     import time
    116     time.sleep(0.5)
    117     print(args)
    118 
    119 #线程池初始线程设置
    120 pool = ThreadPool(10)
    121 
    122 for i in range(5):
    123     pool.run(f1,(i,))
    124 
    125 #增加停止符
    126 #pool.close()
    127 
    128 #立即停止
    129 pool.terminate()
    130 
    131 #查看用了多少个工作线程
    132 time.sleep(3)
    133 print('
    work:%d'%pool.work_num)
    View Code

    用with改造下面流程:

    self.free_list.append(current_thread)
    event = self.q.get()
    self.free_list.remove(current_thread)

    增加ThreadPool类中的一个方法:

    1 #上下文管理函数
    2     @contextlib.contextmanager
    3     def work_status(self,current_thread):
    4         self.free_list.append(current_thread)
    5         try:
    6               yield
    7         finally:
    8             self.free_list.remove(current_thread)
    View Code

    将上面改造的三行代码改成:

    1 with self.work_status(current_thread):
    2                      event = self.q.get()
    View Code
  • 相关阅读:
    Linux修改环境变量的方法
    读书笔记:高性能网站建设
    xtrabackup备份还原
    自制mysql.rpm安装包
    python装饰器
    python中闭包
    python中返回函数
    python中自定义排序函数
    python中filter()函数
    python中reduce()函数
  • 原文地址:https://www.cnblogs.com/menkeyi/p/7089297.html
Copyright © 2011-2022 走看看