zoukankan      html  css  js  c++  java
  • Python多线程编程(第二篇)

    一.Python中的上下文管理器(contextlib模块)

    上下文管理器的任务是:代码块执行前准备,代码块执行后收拾

    1、如何使用上下文管理器:

    如何打开一个文件,并写入"hello world"

    filename="my.txt"
    mode="w"
    f=open(filename,mode)
    f.write("hello world")
    f.close()

    当发生异常时(如磁盘写满),就没有机会执行第5行。当然,我们可以采用try-finally语句块进行包装:

    writer=open(filename,mode)
    try:
        writer.write("hello world")
    finally:
        writer.close()

    当我们进行复杂的操作时,try-finally语句就会变得丑陋,采用with语句重写:

    with open(filename,mode) as writer:
        writer.write("hello world")

    as指代了从open()函数返回的内容,并把它赋给了新值。with完成了try-finally的任务。

    2、自定义上下文管理器  

    with语句的作用类似于try-finally,提供一种上下文机制。要应用with语句的类,其内部必须提供两个内置函数__enter__和__exit__。前者在主体代码执行前执行,后者在主体代码执行后执行。as后面的变量,是在__enter__函数中返回的。

    class echo():
        def output(self):
            print "hello world"
        def __enter__(self):
            print "enter"
            return self  #可以返回任何希望返回的东西
        def __exit__(self,exception_type,value,trackback):
            print "exit"
            if exception_type==ValueError:
                return True
            else:
                return Flase
      
    >>>with echo as e:
        e.output()
         
    输出:
    enter
    hello world
    exit

    完备的__exit__函数如下:

    def __exit__(self,exc_type,exc_value,exc_tb)

    其中,exc_type:异常类型;exc_value:异常值;exc_tb:异常追踪信息

    当__exit__返回True时,异常不传播

    3、contextlib模块  

    contextlib模块的作用是提供更易用的上下文管理器,它是通过Generator实现的。contextlib中的contextmanager作为装饰器来提供一种针对函数级别的上下文管理机制,常用框架如下:

    from contextlib import contextmanager
    @contextmanager
    def make_context():
        print('enter')
        try:
            yield "ok"
        except RuntimeError as err:
            print('error',err)
        finally:
            print('exit')
             
        with make_context() as value:
        print value
         
    输出为:
        enter
        ok
        exit

    其中,yield写入try-finally中是为了保证异常安全(能处理异常)as后的变量的值是由yield返回。yield前面的语句可看作代码块执行前操作,yield之后的操作可以看作在__exit__函数中的操作。

    以线程锁为例:

    # -*- coding: utf-8 -*-
    # 2017/11/24 17:22
    from contextlib import contextmanager
    import threading
    
    lock = threading.Lock()
    
    @contextmanager
    def loudLock():
        print('Locking')
        lock.acquire()
        yield
        print('Releasing')
        lock.release()
    
    with loudLock():
        print('Lock is locked: %s' % lock.locked())
        print('Doing something that needs locking')
    
        # Output:
        
        # Locking
        # Lock is locked: True
        # Doing something that needs locking
        # Releasing

    4、contextlib.closing() 

    file类直接支持上下文管理器API,但有些表示打开句柄的对象并不支持,如urllib.urlopen()返回的对象。还有些遗留类,使用close()方法而不支持上下文管理器API。为了确保关闭句柄,需要使用closing()为它创建一个上下文管理器(调用类的close方法)。

    # -*- coding: utf-8 -*-
    # 2017/11/24 17:30
    import contextlib
    
    class myclass():
        def __init__(self):
            print('__init__')
    
        def close(self):
            print('close()')
    
    with contextlib.closing(myclass()):
        print('ok')
        
    输出:
    __init__
    ok
    close()

    二.queue模块

    创建一个“队列”对象
    import Queue
    q = Queue.Queue(maxsize = 10)
    Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

    将一个值放入队列中
    q.put(10)
    调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
    1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

    将一个值从队列中取出
    q.get()
    调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

    Python Queue模块有三种队列及构造函数:
    1、Python Queue模块的FIFO队列先进先出。  class queue.Queue(maxsize)
    2、LIFO类似于堆,即先进后出。             class queue.LifoQueue(maxsize)
    3、还有一种是优先级队列级别越低越先出来。   class queue.PriorityQueue(maxsize)

    此包中的常用方法(q = Queue.Queue()):
    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)

    q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    q.join() 实际上意味着等到队列为空,再执行别的操作

    from random import randrange
    from time import sleep,ctime
    from queue import Queue
    
    import threading
    
    class MyThread(threading.Thread):
        def __init__(self, func, args, name='', verb=False):
            threading.Thread.__init__(self)
            self.name = name
            self.func = func
            self.args = args
            self.verb = verb
    
        def getResult(self):
            return self.res
    
        def run(self):
            if self.verb:
                print('starting', self.name, 'at:', ctime())
            self.res = self.func(*self.args)
            if self.verb:
                print(self.name, 'finished at:', ctime())
    
    def writeQ(queue):
        print('producing object for Q...', end='')
        queue.put('xxx', 1)
        print("size now", queue.qsize())
    
    def readQ(queue):
        val = queue.get(1)
        print('consumed object from Q... size now', queue.qsize())
    
    def writer(queue, loops):
        for i in range(loops):
            writeQ(queue)
            sleep(randrange(1, 4))
    
    def reader(queue, loops):
        for i in range(loops):
            readQ(queue)
            sleep(randrange(2, 6))
    
    funcs = [writer, reader]
    nfuncs = range(len(funcs))
    
    def main():
        nloops = randrange(2, 6)
        q = Queue(32)
    
        threads = []
        for i in nfuncs:
            t = MyThread(funcs[i], (q, nloops),funcs[i].__name__)
            threads.append(t)
    
        for i in nfuncs:
            threads[i].start()
    
        for i in nfuncs:
            threads[i].join()
    
        print('all DONE')
    
    if __name__ == '__main__':
        main()
    queue

    三 自定义线程池

    # -*- coding: utf-8 -*-
    # 2017/11/25 12:54
    import queue
    import threading
    
    class ThreadPool(object):
        def __init__(self, max_num=20):
            self.queue = queue.Queue(max_num)
            for i in range(max_num):
                self.queue.put(threading.Thread)
    
        def get_thread(self):
            return self.queue.get()
    
        def add_thread(self):
            self.queue.put(threading.Thread)
    
    pool = ThreadPool(10)
    
    def func(arg, p):
        print(arg)
        import time
        time.sleep(2)
        p.add_thread()
    
    for i in range(30):
        thread = pool.get_thread()
        t = thread(target=func, args=(i, pool))
        t.start()
    # -*- coding: utf-8 -*-
    # 2017/11/25 13:00
    import queue
    import threading
    import contextlib
    import time
    
    StopEvent = object()
    
    class ThreadPool(object):
        def __init__(self, max_num, max_task_num = None):
            if max_task_num:
                self.q = queue.Queue(max_task_num)
            else:
                self.q = queue.Queue()
            self.max_num = max_num
            self.cancel = False
            self.terminal = False
            self.generate_list = []
            self.free_list = []
    
        def run(self, func, args, callback=None):
            """
            线程池执行一个任务
            :param func: 任务函数
            :param args: 任务函数所需参数
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数
            1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
            :return: 如果线程池已经终止,则返回True否则None
            """
            if self.cancel:
                return
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
                self.generate_thread()
            w = (func, args, callback,)
            self.q.put(w)
    
        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.call)
            t.start()
    
        def call(self):
            """
            循环去获取任务函数并执行任务函数
            """
            current_thread = threading.currentThread()
            self.generate_list.append(current_thread)
            event = self.q.get()
            while event != StopEvent:
                func, arguments, callback = event
                try:
                    result = func(*arguments)
                    success = True
                except Exception as e:
                    success = False
                    result = None
                if callback is not None:
                    try:
                        callback(success, result)
                    except Exception as e:
                        pass
                with self.worker_state(self.free_list, current_thread):
                    if self.terminal:
                        event = StopEvent
                    else:
                        event = self.q.get()
            else:
                self.generate_list.remove(current_thread)
    
        def close(self):
            """
            执行完所有的任务后,所有线程停止
            """
            self.cancel = True
            full_size = len(self.generate_list)
            while full_size:
                self.q.put(StopEvent)
                full_size -= 1
    
        def terminate(self):
            """
            无论是否还有任务,终止线程
            """
            self.terminal = True
            while self.generate_list:
                self.q.put(StopEvent)
            self.q.queue.clear()
    
        @contextlib.contextmanager
        def worker_state(self, state_list, worker_thread):
            """
            用于记录线程中正在等待的线程数
            """
            state_list.append(worker_thread)
            try:
                yield
            finally:
                state_list.remove(worker_thread)
    
    # How to use
    
    pool = ThreadPool(5)
    
    def callback(status, result):
        # status, execute action status
        # result, execute action return value
        pass
    
    def action(i):
        print(i)
    
    for i in range(30):
        ret = pool.run(action, (i,), callback)
    
    time.sleep(5)
    print(len(pool.generate_list), len(pool.free_list))
    print(len(pool.generate_list), len(pool.free_list))
    pool.close()
    pool.terminate()
    edition2
  • 相关阅读:
    20155333 2016-2017-2 《Java程序设计》第十周学习总结
    实验二 面向对象程序设计
    20155333 2016-2017-2 《Java程序设计》第九周学习总结
    20155333 2016-2017-2 《Java程序设计》第八周学习总结
    总结
    家庭作业第三章
    20135316王剑桥 linux第六周课实验笔记
    20135316王剑桥 linux第五周课实验笔记
    20135316王剑桥 linux第四周课实验笔记
    20135316王剑桥 linux第三周课实验笔记
  • 原文地址:https://www.cnblogs.com/ningxin18/p/7891643.html
Copyright © 2011-2022 走看看