zoukankan      html  css  js  c++  java
  • 网络编程之线程2

    一 concurrent.futures模块

     开启进程池和线程池模块。

      线程池和进程池的区别:进程池开启的数量范围只能在CPU对的核数以内。而线程池的数量可以自己随便的定义,默认为是核数的五倍。

     相关用法:

      ThreadPoolExecutor:创建一个线程池

      ProcessPoolExecutor:创建一个进程池

      Executor:是一个抽象类,上面的两个都是方法继承这个抽象类的

      submit:异步的提交方式。

      map:简化提交的方式,map自带循环,只能单纯的提交,用于int类型,并且没有放回的结果。

      shutdown:提供的一个借口,等待进程池或着线程池执行完毕过后,回收调那里的资源。它里面的wait=True的时候,要等待执行解释后才会有返回结果;如果wait=False,就会理解返回结果,等到任务执行结束后才能回收调进程池或者线程池里面的资源。但是不能提交任务了。

      result:拿到结果。

    # import concurrent.futures
    # import os
    # import time
    # import random
    #
    # def work(n):
    #     print('%s is working'%os.getpid())
    #     time.sleep(random.random())
    #     return n
    #
    # if __name__=='__main__':
    #     excutor=concurrent.futures.ProcessPoolExecutor(4)
    #     futures=[]
    #     for i in range(20):
    #         future=excutor.submit(work,i)
    #         futures.append(future)
    #     excutor.shutdown(wait=True)
    #     print('%s is zhuing'%os.getpid())
    #     for future in futures:
    #         print(future.result())
    

     简写如下:

    # import concurrent.futures
    # import os
    # import time
    # import random
    # def work(n):
    #     print('%s is working' % os.getpid())
    #     time.sleep(random.random())
    #     return n
    # if __name__ == '__main__':
    #     excutor = concurrent.futures.ThreadPoolExecutor(4)
    #     futures = [excutor.submit(work, i) for i in range(20)]
    #     excutor.shutdown(wait=True)
    #     print('%s is zhuing' % os.getpid())
    #     for future in futures:
    #         print(future.result())
    

     回调函数:add_done_callback后面括号里加上一个回调函数,回调函数接收的就是第一个函数返回的一个对像,使用时,还需要在回调函数的内部result提交一下。

    如下:

    # import concurrent.futures
    # import requests
    # import time
    # import random
    # import os
    # def work(url):
    #     print('%s is %s'%(os.getpid(),url))
    #     ret=requests.get(url)
    #     time.sleep(3)
    #     if ret.status_code==200:
    #         print('%s DONE %s'%(os.getpid(),url))
    #         return {'url':url,'text':ret.text}
    # def foo(ret):
    #     ret = ret.result()
    #     print('%s FOO %s'%(os.getpid(),ret['url']))
    #     time.sleep(1)
    #     res='%s;长度:%s'%(ret['url'],len(ret['text']))
    #     with open('a.txt','a',encoding='utf-8')as f:
    #         f.write(res+'
    ')
    #
    # if __name__=='__main__':
    #     url_list = [
    #         'http://tool.chinaz.com/regex/',
    #         'http://www.cnblogs.com/fangjie0410/',
    #         "http://www.cnblogs.com/xuanan",
    #         "http://www.cnblogs.com/bg0131/p/6430943.html",
    #         "http://www.cnblogs.com/wupeiqi/",
    #         "http://www.cnblogs.com/linhaifeng/",
    #         "http://www.cnblogs.com/Eva-J/articles/7125925.html",
    #         "http://www.cnblogs.com/Eva-J/articles/6993515.html",
    #     ]
    #     excutres=concurrent.futures.ProcessPoolExecutor()
    #     for i in url_list:
    #         excutres.submit(work,i).add_done_callback(foo)
    #
    #     print('主',os.getpid())
    

     异常处理:exception:异常接口

    concurrent.futures:自带异常,所以python内置的异常是不能识别出来的。

    raise:抛出异常

    import concurrent.futures
    import os
    import time
    import random
    def work(n):
        print('%s is working' % os.getpid())
        time.sleep(random.random())
        raise Exception
        return n
    if __name__ == '__main__':
        excutor = concurrent.futures.ThreadPoolExecutor(4)
        futures = []
        for i in range(20):
            future = excutor.submit(work, i).result()
            futures.append(future)
        excutor.shutdown(wait=True)
        print('%s is zhuing' % os.getpid())
        for future in futures:
            print(future)
    

     cuncel:取消终止异常

    二 死锁现象和递归锁现象

     什么是死锁:各自拿着对方想要的一把锁,但是各自缺一把锁而不能释放

     Lock:就是互斥锁但是容易出现死锁。只能够acquire一次,只要锁不释放(selease),就不能acquire了。

    死锁现象如下:

    # import threading
    # import time
    # import random
    # l1=threading.Lock()
    # l2=threading.Lock()
    # class Func(threading.Thread):
    #     def run(self):
    #         self.aa()
    #         self.bb()
    #
    #     def aa(self):
    #         l1.acquire()
    #         print(111)
    #         l2.acquire()
    #         print(222)
    #         l2.release()
    #         l1.release()
    #
    #     def bb(self):
    #         l2.acquire()
    #         print(333)
    #         time.sleep(random.random())
    #         l1.acquire()
    #         print(444)
    #         l1.release()
    #         l2.release()
    #
    # if __name__=='__main__':
    #     for i in range(10):
    #         ret=Func()
    #         ret.start()
    

     RLock:递归锁,可以赋值多个变量。

    递归锁现象如下:

    # import threading
    # import time
    # import random
    # r1=r2=threading.RLock()
    # class Func(threading.Thread):
    #     def run(self):
    #         self.aa()
    #         self.bb()
    #
    #     def aa(self):
    #         r1.acquire()
    #         print(111)
    #         r2.acquire()
    #         print(222)
    #         r2.release()
    #         r1.release()
    #
    #     def bb(self):
    #         r1.acquire()
    #         print(333)
    #         time.sleep(random.random())
    #         r2.acquire()
    #         print(444)
    #         r2.release()
    #         r1.release()
    #
    # if __name__=='__main__':
    #     for i in range(10):
    #         ret=Func()
    #         ret.start()
    

     递归锁:没加一次锁,引用技术就会加1,没减一次锁,引用技术就会减一,并且可以同时acquire多次,只要技术不为0,就不能被其他的线程抢到。

    三 信号量

     什么是信号量:其实就是锁,同时能够创建多个锁,实现了一个并发的效果。超出锁的范围的任务就只有等待所得释放,才能够抢到锁。相当于产生一堆新的线程和进程。

      Semaphore:创建信号量,同时管理一个内置的计数器

    # import threading
    # import time
    # import random
    # import os
    # def task(n):
    #     with sm:
    #         time.sleep(random.randint(1, 5))
    #         print('%s is tasking'%threading.current_thread().getName())
    #
    # if __name__=='__main__':
    #     sm=threading.Semaphore(5)
    #     for i in range(20):
    #         t=threading.Thread(target=task,args=(i,))
    #         t.start()
    

     可以指定信号量的个数。

    四 事件Eventr

        1、为什么会有Event?

      线程的一个关键特性就是每个线程的运行都是独立运行且状态不可预测。如果程序中的线程需要通过别的线程的状态来判断自己线程中的  某个程序是否需要执行,那么Event就产生了。

        2、Event的作用?

      threading库中Event对象主要是通过判断自己线程中的Event对象来判断是否唤醒所等待这个Event对象的线程,Event对象包含一个可由  线程设置的信号标志,默认情况下该信号标志为假,如果有别的线程等待这个Event对象时,当他为假时那么这些被等待的线程将一直被阻塞  直到Event对象为真时才会继续执行。如果这个信号标志为真了那么别的线程就会忽略它而继续执行程序。

     什么是时间:两个任务的协同工作,一个任务需要另一个任务的执行结果,如果另一个任务还没有执行结果,那么这个任务就会继续等待。

      Event:创建事件。用于线程之间的通信。Event对象实现了简单的线程之间通信机制,它提供了设置信号,清除信号,等待等用于实现线程间的通信。

      set:设置Event内部的信号为真,发送信号。

      wait:只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。接收信号。

      timeout:wait里面的一个参数,时间参数,等待的时间范围。

      使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假,当使用Event的clear方法后,isSet()方法返回假

      is_set:判断是否传入了信号

      isSet:返回event的状态值

           event.clear():     #恢复event的状态值为False,默认情况下就是为Flase。

          

      注释:在多个线程中,当有线程1出现Event.wait()时会判断线程2中Event.isSet()的状态,如果为True,线程1继续执行程序,如  果为Flase,线程1会等待线程2中Event.set()状态设置为True后再继续执行程序。

          线程1是否能执行完毕程序依赖于线程2的Event对象是否为真

    # import threading
    # import time
    # import random
    # e=threading.Event()
    # def work():
    #     print('%s 正在检测'%threading.current_thread().getName())
    #     time.sleep(random.randint(1,5))
    #     e.set()
    #
    # def foo():
    #     count=1
    #     while not e.is_set():
    #         if count > 3:
    #             raise TimeoutError('等待超时')
    #         print('%s 正在等待%s次连接'%(threading.current_thread().getName(),count))
    #         e.wait(timeout=random.randint(1,5))
    #         count+=1
    #     print('%s 正在连接'%threading.current_thread().getName())
    # if __name__=='__main__':
    #     t1=threading.Thread(target=work)
    #     t2=threading.Thread(target=foo)
    #     t1.start()
    #     t2.start()

      如图实例:

        

    五 定时器

     Timer:定时器,是thread的一个派生类,用于在指定的时间后调用调用某个方法。

    # import threading
    # import random
    # def hello(n):
    #     print('hello',n)
    #
    # if __name__=='__main__':
    #     for i in range(20):
    #         t=threading.Timer(random.random(),hello,args=(i,))
    #         t.start()
    

    六 线程queue:队列

        什么是队列?

       数据进行有序的处理,相当于按照顺序处理数据。

     Queue:先放入队列里的数据先读取出来,

      get()和put()方法

        1.get()是发送数据,put()是接收数据,可以在实例化产生对象的时候设置消息队列的长度,即queue.Queue(100).

    # import queue
    # import time
    # import random
    # import threading
    # q=queue.Queue(5)
    # def work(n):
    #     time.sleep(random.randint(1,5))
    #     q.put('%s is working'%n)
    #     print(n)
    #
    # def foo():
    #     time.sleep(random.randint(1,3))
    #     print(q.get())
    #
    # if __name__=='__main__':
    #     for i in range(20):
    #         t1=threading.Thread(target=work,args=(i,))
    #         t2=threading.Thread(target=foo)
    #         t1.start()
    #         t2.start()
    

     PriorityQueue:先读取优先级最高的数据,传参时,以元组的格式传入,前面传入数字,后面传入内容。数字从小到大排的优先级,如果是数字相同,就会按照ascii码从小到大排序

    # import queue
    # import time
    # import random
    # import threading
    # q=queue.PriorityQueue()
    # def work(n):
    #     time.sleep(random.randint(1,5))
    #     q.put('%s is working'%n)
    #     print(n)
    #
    # def foo():
    #     time.sleep(random.randint(1,3))
    #     print(q.get())
    #
    # if __name__=='__main__':
    #     for i in range(20):
    #         t1=threading.Thread(target=work,args=(i,))
    #         t2=threading.Thread(target=foo)
    #         t1.start()
    #         t2.start()
    

     LifoQueue:先进后出,也叫做堆栈。读取数据时,按照时间短的先读取。

    # import queue
    # import random
    # import time
    # import threading
    # q=queue.LifoQueue()
    # def work(n):
    #     time.sleep(random.randint(1, 5))
    #     q.put((random.randint(1,20),'jie_%s'%n))
    #     print(n)
    #
    # def foo():
    #     print(q.get())
    #     time.sleep(random.randint(1,3))
    #
    # if __name__=='__main__':
    #     for i in range(30):
    #         t1=threading.Thread(target=work,args=(1,))
    #         t2=threading.Thread(target=foo)
    #         t1.start()
    #         t2.start()
    

     

    join()和task_done()

        1.如图所示

        

        2.其他用法

          q.qsize() 返回队列的大小 q.empty() 如果队列为空,返回True,反之False

            q.full() 如果队列满了,返回True,反之False

          q.full 与 maxsize 大小对应

               q.get([block[, timeout]]) 获取队列,timeout等待时间

          q.get_nowait() 相当q.get(False)非阻塞 
          q.put(item) 写入队列,timeout等待时间

               q.put_nowait(item) 相当q.put(item, False)

          q.task_done() 在完成一项工作之后,

          q.task_done() 函数向任务已经完成的队列发送一个信号

          q.join() 实际上意味着等到队列为空,再执行别的操作

      3、其他模式

        1、Python Queue模块的FIFO队列先进先出。class queue.Queue(maxsize)

        2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)

        3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)

  • 相关阅读:
    20151019
    20151013
    20150810
    20150626
    20150625
    20150530
    HTML特殊字符大全
    label标签跳出循环
    IIS 负载均衡
    .NET代码执行效率优化
  • 原文地址:https://www.cnblogs.com/fangjie0410/p/7678515.html
Copyright © 2011-2022 走看看