zoukankan      html  css  js  c++  java
  • threading模块和queue模块实现程序并发功能和消息队列

    简介:

    通过三个例子熟悉一下python threading模块和queue模块实现程序并发功能和消息队列。

    说明:以下实验基于python2.6

    基本概念

    什么是进程?

    拥有独立的地址空间,内存,数据栈以及记录其运行轨迹的辅助数据。它可以通过fork和spawn操作来完成其它任务。进程间只能使用IPC(进程间通信协议)共享信息。

    什么是线程?

    线程运行在一个主进程中,线程间共享相同的运行环境。只有在多cpu框架中线程才能发挥威力,单cpu中依然是顺序执行的。

    注意问题:

           -线程同时访问同一数据,可能导致数据结果不一致,(race condition)。大多数现场库有同步原语来控制线程的执行和数据访问。

           -线程不可以直接kill,只能sys.exit()或者thread.exit()

    示例一:threading模块实现多线程编程

    threading_1.py

    #!/usr/bin/env python

    import threading

    import datetime

     

    class ThreadClass(threading.Thread):

      def run(self):

        now = datetime.datetime.now()

        print"%s Hello KK at time: %s" % (self.getName(), now)

     

    for i in range(2):

      t = ThreadClass()

      t.start()

    运行结果:

    我们启动了两个线程,所以看到两条输出。

    Thread-1 Hello KK at time: 2012-12-22 20:13:40.150000

    Thread-2 Hello KK at time: 2012-12-22 20:13:40.150000

    解读:

    重定义run方法,把自己的逻辑告诉线程。

    getName()方法是threading模块中获取线程名用的。

    for循环定义了我们将启动几个线程

    start()启动线程。threading中等所有线程准备好后统一启动,而不是一个一个启动。

    示例二:threading+Queue实现线程队列

    threading_2.py

    #!/usr/bin/env python

     

    import Queue

    import threading

    import time

     

    queue = Queue.Queue()

     

    class ThreadNum(threading.Thread):

      """没打印一个数字等待1秒,并发打印10个数字需要多少秒?"""

      def __init__(self, queue):

        threading.Thread.__init__(self)

        self.queue = queue

     

      def run(self):

        whileTrue:

          #消费者端,从队列中获取num

          num = self.queue.get()

          print"i'm num %s"%(num)

          time.sleep(1)

          #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号

          self.queue.task_done()

     

    start = time.time()

    def main():

      #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发

      for i in range(10):

        t = ThreadNum(queue)

        t.setDaemon(True)

        t.start()

       

      #往队列中填错数据  

      for num in range(10):

          queue.put(num)

      #wait on the queue until everything has been processed

      queue.join()

     

    main()

    print"Elapsed Time: %s" % (time.time() - start)

    运行结果:

    i'm num 0

    i'm num 1

    i'm num 2

    i'm num 3

    i'm num 4

    i'm num 5

    i'm num 6

    i'm num 7

    i'm num 8

    i'm num 9

    Elapsed Time: 1.01399993896

    解读:

    具体工作步骤描述如下:

    1,创建一个 Queue.Queue() 的实例,然后使用数据对它进行填充。

    2,将经过填充数据的实例传递给线程类,后者是通过继承 threading.Thread 的方式创建的。

    3,生成守护线程池。

    4,每次从队列中取出一个项目,并使用该线程中的数据和 run 方法以执行相应的工作。

    5,在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号。

    6,对队列执行 join 操作,实际上意味着等到队列为空,再退出主程序。

           在使用这个模式时需要注意一点:通过将守护线程设置为 true,程序运行完自动退出。好处是在退出之前,可以对队列执行 join 操作、或者等到队列为空。

    示例三:多个队列

    所谓多个队列,一个队列的输出可以作为另一个队列的输入!

    threading_3.py

    #!/usr/bin/env python

    import Queue

    import threading

    import time

     

    queue = Queue.Queue()

    out_queue = Queue.Queue()

     

    class ThreadNum(threading.Thread):

        """bkeep"""

        def __init__(self, queue, out_queue):

            threading.Thread.__init__(self)

            self.queue = queue

            self.out_queue = out_queue

     

        def run(self):

            whileTrue:

                #从队列中取消息

                num = self.queue.get()

                bkeep = num

               

                #将bkeep放入队列中

                self.out_queue.put(bkeep)

     

                #signals to queue job is done

                self.queue.task_done()

     

    class PrintLove(threading.Thread):

        """Threaded Url Grab"""

        def __init__(self, out_queue):

            threading.Thread.__init__(self)

            self.out_queue = out_queue

     

        def run(self):

            whileTrue:

                #从队列中获取消息并赋值给bkeep

                bkeep = self.out_queue.get()   

                keke = "I love " + str(bkeep)

                print keke,

                print self.getName()

                time.sleep(1)

     

                #signals to queue job is done

                self.out_queue.task_done()

     

    start = time.time()

    def main():

        #populate queue with data

        for num in range(10):

            queue.put(num)

           

        #spawn a pool of threads, and pass them queue instance

        for i in range(5):

            t = ThreadNum(queue, out_queue)

            t.setDaemon(True)

            t.start()

     

     

        for i in range(5):

            pl = PrintLove(out_queue)

            pl.setDaemon(True)

            pl.start()

     

        #wait on the queue until everything has been processed

        queue.join()

        out_queue.join()

     

    main()

    print"Elapsed Time: %s" % (time.time() - start)

    运行结果:

    I love 0 Thread-6

    I love 1 Thread-7

    I love 2 Thread-8

    I love 3 Thread-9

    I love 4 Thread-10

    I love 5 Thread-7

    I love 6 Thread-6

    I love 7 Thread-9

    I love 8 Thread-8

    I love 9 Thread-10

    Elapsed Time: 2.00300002098

    解读:

    ThreadNum 类工作流程

    定义队列--->继承threading---->初始化queue---->定义run函数--->get queue中的数据---->处理数据---->put数据到另外一个queue-->发信号告诉queue该条处理完毕

    main函数工作流程:

    --->往自定义queue中扔数据

    --->for循环确定启动的线程数---->实例化ThreadNum类---->启动线程并设置守护

    --->for循环确定启动的线程数---->实例化PrintLove类--->启动线程并设置为守护

    --->等待queue中的消息处理完毕后执行join。即退出主程序。

    附1:threading模块对象

    Thread          表示一个线程的执行对象

    Lock              锁原语对象

    RLock           可重入锁对象。使单线程可以再次获得已经获得了的锁

    Condition      条件变量。可以让一个线程停下来等待满某个条件

    Event            通用条件变量。多个线程同时等待某个事件,事件发生后激活所有线程

    Semaphore          为等待锁的线程提供一个类似“候车室”的结构

    BoundedSemaphore   与Semaphore类似,但它不运行超过初始值

    Timer            与Thread相似,只是它要等待一段时间后才开始运行。

    activeCount()      返回活动的线程对象的数量

    currentThread()   返回当前线程对象

    enumerate()         返回当前活动线程的列表

    settrace(func)             为所有线程设置一个跟踪函数

    setprofile(func)    为所有线程设置一个profile函数

    附2:生产者-消费者模型

    生产者把货物放入队列中,消费者消耗队列中的货物

    queue模块就可以实现这个模型,实现线程间通信。

    Queue模块函数:

    queue(size)  创建一个大小为size的Queue对象

    Queue对象函数:

    qsize()   返回队列的大小,近似值

    empty()  如果队列为空,则返回True。否则返回Fales

    full()              呵呵,大家想一下就知道了。

    put(item,block=0) 把item放到队列中,如果给了block且不为0,函数会一直阻塞到队列中有空间为止

    get(block=0) 从队列中取一个对象,如果给了block且不为0,函数会一直阻塞到队列中有对象为止。

  • 相关阅读:
    解决xcode5升级后,Undefined symbols for architecture arm64:问题
    第8章 Foundation Kit介绍
    app 之间发送文件 ios
    iphone怎么检测屏幕是否被点亮 (用UIApplication的Delegate)
    CRM下载对象一直处于Wait状态的原因
    错误消息Customer classification does not exist when downloading
    How to resolve error message Distribution channel is not allowed for sales
    ABAP CCDEF, CCIMP, CCMAC, CCAU, CMXXX这些东东是什么鬼
    有了Debug权限就能干坏事?小心了,你的一举一动尽在系统监控中
    SAP GUI和Windows注册表
  • 原文地址:https://www.cnblogs.com/lly-lly/p/5417582.html
Copyright © 2011-2022 走看看