zoukankan      html  css  js  c++  java
  • 学习PYTHON之路, DAY 10 进程、线程、协程篇

    线程

    线程是应用程序中工作的最小单元。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

    直接调用

    import threading
    import time
      
    def show(arg):
        time.sleep(1)
        print 'thread'+str(arg)
      
    for i in range(10):
        t = threading.Thread(target=show, args=(i,))
        t.start()
      
    print 'main thread stop'

    继承式调用

    import threading
    import time
     
     
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num = num
     
        def run(self):#定义每个线程要运行的函数
     
            print("running on number:%s" %self.num)
     
            time.sleep(3)
     
    if __name__ == '__main__':
     
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()

    更多方法:

      • start            线程准备就绪,等待CPU调度
      • setName      为线程设置名称
      • getName      获取线程名称
      • setDaemon   设置为后台线程或前台线程(默认)
                           如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                            如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
      • join              逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
      • run              线程被cpu调度后自动执行线程对象的run方法
     1 import time
     2 import threading
     3  
     4  
     5 def run(n):
     6  
     7     print('[%s]------running----
    ' % n)
     8     time.sleep(2)
     9     print('--done--')
    10  
    11 def main():
    12     for i in range(5):
    13         t = threading.Thread(target=run,args=[i,])
    14         t.start()
    15         t.join(1)
    16         print('starting thread', t.getName())
    17  
    18  
    19 m = threading.Thread(target=main,args=[])
    20 m.setDaemon(True) #将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
    21 m.start()
    22 m.join(timeout=2)
    23 print("---main thread done----")
    守护线程

    线程锁(Lock、RLock)

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

    import threading
    import time
       
    gl_num = 0
       
    lock = threading.RLock()
       
    def Func():
        lock.acquire()
        global gl_num
        gl_num +=1
        time.sleep(1)
        print gl_num
        lock.release()
           
    for i in range(10):
        t = threading.Thread(target=Func)
        t.start()

    信号量(Semaphore)

    互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

     1 import threading,time
     2  
     3 def run(n):
     4     semaphore.acquire()
     5     time.sleep(1)
     6     print("run the thread: %s" %n)
     7     semaphore.release()
     8  
     9 if __name__ == '__main__':
    10  
    11     num= 0
    12     semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
    13     for i in range(20):
    14         t = threading.Thread(target=run,args=(i,))
    15         t.start()

    事件(event)

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    • clear:将“Flag”设置为False
    • set:将“Flag”设置为True
    import threading
     
     
    def do(event):
        print 'start'
        event.wait()
        print 'execute'
     
     
    event_obj = threading.Event()
    for i in range(10):
        t = threading.Thread(target=do, args=(event_obj,))
        t.start()
     
    event_obj.clear()
    inp = raw_input('input:')
    if inp == 'true':
        event_obj.set()

    条件(Condition)

    使得线程等待,只有满足某条件时,才释放n个线程

    def condition_func():
    
        ret = False
        inp = input('>>>')
        if inp == '1':
            ret = True
    
        return ret
    
    
    def run(n):
        con.acquire()
        con.wait_for(condition_func)
        print("run the thread: %s" %n)
        con.release()
    
    if __name__ == '__main__':
    
        con = threading.Condition()
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            t.start()

    队列

    class queue.Queue(maxsize=0) #先入先出
    class queue.LifoQueue(maxsize=0) #last in fisrt out 
    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    Queue.qsize()

     

    Queue.empty() #return True if empty  

     

    Queue.full() # return True if full 

     

    Queue.put(itemblock=Truetimeout=None)Queue.put_nowait(item)Equivalent to put(item, False).

     

    Queue.get(block=Truetimeout=None)Queue.get_nowait()

    Equivalent to get(False)

    Queue.task_done()

     1 import time,random
     2 import queue,threading
     3 q = queue.Queue()
     4 def Producer(name):
     5   count = 0
     6   while count <20:
     7     time.sleep(random.randrange(3))
     8     q.put(count)
     9     print('Producer %s has produced %s baozi..' %(name, count))
    10     count +=1
    11 def Consumer(name):
    12   count = 0
    13   while count <20:
    14     time.sleep(random.randrange(4))
    15     if not q.empty():
    16         data = q.get()
    17         print(data)
    18         print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
    19     else:
    20         print("-----no baozi anymore----")
    21     count +=1
    22 p1 = threading.Thread(target=Producer, args=('A',))
    23 c1 = threading.Thread(target=Consumer, args=('B',))
    24 p1.start()
    25 c1.start()

    线程池

     1 import queue
     2 import threading
     3 import time
     4 
     5 
     6 class ThreadPool:
     7     def __init__(self, maxsize=5):
     8         self.maxsize = maxsize
     9         self._q = queue.Queue(maxsize)
    10         for i in range(maxsize):
    11             self._q.put(threading.Thread)
    12         # 【threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread】
    13     def get_thread(self):
    14         return self._q.get()
    15 
    16     def add_thread(self):
    17         self._q.put(threading.Thread)
    18 
    19 pool = ThreadPool(5)
    20 
    21 def task(arg,p):
    22     print(arg)
    23     time.sleep(1)
    24     p.add_thread()
    25 
    26 for i in range(100):
    27     # threading.Thread类
    28     t = pool.get_thread()
    29     obj = t(target=task,args=(i,pool,))
    30     obj.start()
    简单版
      1 #!/usr/bin/env python
      2 # -*- coding:utf-8 -*-
      3 # Author:Alex Li
      4 import queue
      5 import threading
      6 import contextlib
      7 import time
      8 
      9 StopEvent = object()
     10 
     11 class ThreadPool(object):
     12 
     13     def __init__(self, max_num, max_task_num = None):
     14         if max_task_num:
     15             self.q = queue.Queue(max_task_num)
     16         else:
     17             self.q = queue.Queue()
     18         self.max_num = max_num
     19         self.cancel = False
     20         self.terminal = False
     21         self.generate_list = []
     22         self.free_list = []
     23 
     24     def run(self, func, args, callback=None):
     25         """
     26         线程池执行一个任务
     27         :param func: 任务函数
     28         :param args: 任务函数所需参数
     29         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
     30         :return: 如果线程池已经终止,则返回True否则None
     31         """
     32         if self.cancel:
     33             return
     34         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
     35             self.generate_thread()
     36         w = (func, args, callback,)
     37         self.q.put(w)
     38 
     39     def generate_thread(self):
     40         """
     41         创建一个线程
     42         """
     43         t = threading.Thread(target=self.call)
     44         t.start()
     45 
     46     def call(self):
     47         """
     48         循环去获取任务函数并执行任务函数
     49         """
     50         current_thread = threading.currentThread
     51         self.generate_list.append(current_thread)
     52 
     53         event = self.q.get()
     54         while event != StopEvent:
     55 
     56             func, arguments, callback = event
     57             try:
     58                 result = func(*arguments)
     59                 success = True
     60             except Exception as e:
     61                 success = False
     62                 result = None
     63 
     64             if callback is not None:
     65                 try:
     66                     callback(success, result)
     67                 except Exception as e:
     68                     pass
     69 
     70             with self.worker_state(self.free_list, current_thread):
     71                 if self.terminal:
     72                     event = StopEvent
     73                 else:
     74                     event = self.q.get()
     75         else:
     76 
     77             self.generate_list.remove(current_thread)
     78 
     79     def close(self):
     80         """
     81         执行完所有的任务后,所有线程停止
     82         """
     83         self.cancel = True
     84         full_size = len(self.generate_list)
     85         while full_size:
     86             self.q.put(StopEvent)
     87             full_size -= 1
     88 
     89     def terminate(self):
     90         """
     91         无论是否还有任务,终止线程
     92         """
     93         self.terminal = True
     94 
     95         while self.generate_list:
     96             self.q.put(StopEvent)
     97 
     98         self.q.empty()
     99 
    100     @contextlib.contextmanager
    101     def worker_state(self, state_list, worker_thread):
    102         """
    103         用于记录线程中正在等待的线程数
    104         """
    105         state_list.append(worker_thread)
    106         try:
    107             yield
    108         finally:
    109             state_list.remove(worker_thread)
    110 
    111 
    112 pool = ThreadPool(5)
    113 
    114 def callback(status, result):
    115     # status, execute action status
    116     # result, execute action return value
    117     pass
    118 
    119 def action(i):
    120     print(i)
    121 
    122 for i in range(300):
    123     ret = pool.run(action, (i,), callback)
    124 
    125 # time.sleep(5)
    126 # print(len(pool.generate_list), len(pool.free_list))
    127 # print(len(pool.generate_list), len(pool.free_list))
    复杂版

     上下文管理

     1 import contextlib
     2 
     3 @contextlib.contextmanager #加了这个装饰器,可以用with
     4 def work(free_list, worker_thread):
     5     free_list.append(worker_thread)
     6     try:
     7         yield
     8     finally:
     9         free_list.remove(worker_thread)
    10 
    11 free_list = []
    12 worker_thread = '1'
    13 with work(free_list, worker_thread):
    14     print(123)

    运行顺序

    多进程

    from multiprocessing import Process
    import time
    def f(name):
        time.sleep(2)
        print('hello', name)
     
    if __name__ == '__main__': #在windos进程只能做测试,一定要写这句
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()

    进程间通讯

    Queues

    使用方法跟threading里的queue差不多

    from multiprocessing import Process, Queue
    
    def f(i,q):
        print(i,q.get())
    
    if __name__ == '__main__':
        q = Queue()
    
        q.put("h1")
        q.put("h2")
        q.put("h3")
    
        for i in range(10):
            p = Process(target=f, args=(i,q,))
            p.start()

     Managers

     1 from multiprocessing import Process, Manager
     2  
     3 def f(d, l):
     4     d[1] = '1'
     5     d['2'] = 2
     6     d[0.25] = None
     7     l.append(1)
     8     print(l)
     9  
    10 if __name__ == '__main__':
    11     with Manager() as manager:
    12         d = manager.dict()
    13  
    14         l = manager.list(range(5))
    15         p_list = []
    16         for i in range(10):
    17             p = Process(target=f, args=(d, l))
    18             p.start()
    19             p_list.append(p)
    20         for res in p_list:
    21             res.join()
    22  
    23         print(d)
    24         print(l)

    协程

    协程一个标准定义:

    1. 必须在只有一个单线程里实现并发
    2. 修改共享数据不需加锁
    3. 用户程序里自己保存多个控制流的上下文栈
    4. 一个协程遇到IO操作自动切换到其它协程

    greenlet

     1 from greenlet import greenlet
     2  
     3  
     4 def test1():
     5     print 12
     6     gr2.switch()
     7     print 34
     8     gr2.switch()
     9  
    10  
    11 def test2():
    12     print 56
    13     gr1.switch()
    14     print 78
    15  
    16 gr1 = greenlet(test1)
    17 gr2 = greenlet(test2)
    18 gr1.switch()

    gevent

     1 import gevent
     2  
     3 def foo():
     4     print('Running in foo')
     5     gevent.sleep(0)
     6     print('Explicit context switch to foo again')
     7  
     8 def bar():
     9     print('Explicit context to bar')
    10     gevent.sleep(0)
    11     print('Implicit context switch back to bar')
    12  
    13 gevent.joinall([
    14     gevent.spawn(foo),
    15     gevent.spawn(bar),
    16 ])
  • 相关阅读:
    Python3之random模块常用方法
    Go语言学习笔记(九)之数组
    Go语言学习笔记之简单的几个排序
    Go语言学习笔记(八)
    Python3之logging模块
    Go语言学习笔记(六)
    123. Best Time to Buy and Sell Stock III(js)
    122. Best Time to Buy and Sell Stock II(js)
    121. Best Time to Buy and Sell Stock(js)
    120. Triangle(js)
  • 原文地址:https://www.cnblogs.com/nikitapp/p/6597398.html
Copyright © 2011-2022 走看看