zoukankan      html  css  js  c++  java
  • 多线程实践—Python多线程编程

    多线程实践

    前面的一些文章和脚本都是只能做学习多线程的原理使用,实际上什么有用的事情也没有做。接下来进行多线程的实践,看一看在实际项目中是怎么使用多线程的。

    图书排名示例

    Bookrank.py:

    该脚本通过单线程进行下载图书排名信息的调用


     1 from atexit import register
     2 from re import compile
     3 from threading import Thread
     4 from time import sleep, ctime
     5 import requests
     6  7 REGEX = compile('#([d,]+) in Books')
     8 AMZN = 'https://www.amazon.com/dp/'
     9 ISBNS = {
    10     '0132269937': 'Core Python Programming',
    11     '0132356139': 'Python Web Development with Django',
    12     '0137143419': 'Python Fundamentals',
    13 }
    14 15 def getRanking(isbn):
    16     url = '%s%s' % (AMZN, isbn)
    17     page = requests.get(url)
    18     data = page.text
    19     return REGEX.findall(data)[0]
    20 21 def _showRanking(isbn):
    22     print '- %r ranked %s' % (
    23         ISBNS[isbn], getRanking(isbn))
    24 25 def _main():
    26     print 'At', ctime(), 'on Amazon'
    27     for isbn in ISBNS:
    28         _showRanking(isbn)
    29 30 @register
    31 def _atexit():
    32     print 'all DONE at:', ctime()
    33 34 if __name__ == '__main__':
    35     _main()
    36
     

    输出结果为:

    1 /usr/bin/python ~/Test_Temporary/bookrank.py
    2 At Sat Jul 28 17:16:51 2018 on Amazon
    3 - 'Core Python Programming' ranked 322,656
    4 - 'Python Fundamentals' ranked 4,739,537
    5 - 'Python Web Development with Django' ranked 1,430,855
    6 all DONE at: Sat Jul 28 17:17:08 2018
     

    引入线程

    上面的例子只是一个单线程程序,下面引入线程,并使用多线程再执行程序对比各自所需的时间。

    ​ 将上面脚本中 _main() 函数的 _showRanking(isbn) 修改以下代码:

    ​ 

    Thread(target=_showRanking, args=(isbn,)).start()

    再次执行查看返回结果:

    1 /usr/bin/python ~/Test_Temporary/bookrank.py
    2 At Sat Jul 28 17:39:16 2018 on Amazon
    3 - 'Python Fundamentals' ranked 4,739,537
    4 - 'Python Web Development with Django' ranked 1,430,855
    5 - 'Core Python Programming' ranked 322,656
    6 all DONE at: Sat Jul 28 17:39:19 2018

    从两个的输出结果中可以看出,使用单线程时总体完成的时间为 7s ,而使用多线程时,总体完成时间为 3s 。另外一个需要注意的是,单线程版本是按照变量的顺序输出,而多线程版本按照完成的顺序输出。

    同步原语

    一般在多线程代码中,总会有一些特定的函数或代码块不希望(或不应该)被多个线程同时执行,通常包括修改数据库、更新文件或其它会产生竟态条件的类似情况。这就是需要使用同步的情况。

    • 当任意数量的线程可以访问临界区的代码,但给定的时刻只有一个线程可以通过时,就是使用同步的时候了;

    • 程序员选择适合的同步原语,或者线程控制机制来执行同步;

    • 进程同步有不同的类型【参见:https://en.wikipedia.org/wiki/Synchronization_(computer_science) 】

    • 同步原语有:锁/互斥、信号量。锁是最简单、最低级的机制,而信号量用于多线程竞争有限资源的情况。

    锁示例

    锁有两种状态:锁定和未锁定。而且它也只支持两个函数:获得锁和释放锁。

    • 当多线程争夺锁时,允许第一个获得锁的线程进入临界区,并执行代码;

    • 所有之后到达的线程将被阻塞,直到第一个线程结束退出临界区并释放锁;

    • 锁被释放后,其它等待的线程可以继续争夺锁,并进入临界区;

    • 被阻塞的线程没有顺序,不会先到先得,胜出的线程是不确定的。

    代码示例(mtsleepF.py):

    *注:该脚本派生了随机数量的线程,每个线程执行结束时会进行输出

     1 # -*- coding=utf-8 -*-
     2 from atexit import register
     3 from random import randrange
     4 from threading import Thread, currentThread
     5 from time import sleep, ctime
     6  7 class CleanOutputSet(set):
     8     def __str__(self):
     9         return ', '.join(x for x in self)
    10 11 loops = (randrange(2, 5) for x in range(randrange(3, 7)))
    12 remaining = CleanOutputSet()
    13 14 def loop(nsec):
    15     myname = currentThread().name
    16     remaining.add(myname)
    17     print('这个是目前线程池中的线程:', remaining)
    18     print('[%s] Started %s' % (ctime(), myname))
    19     sleep(nsec)
    20     remaining.remove(myname)
    21     print('[%s] Completed %s (%d secs)' % (ctime(), myname, nsec))
    22     print(' (remaining: %s)' % (remaining or 'None'))
    23 24 def _main():
    25     for pause in loops:
    26         Thread(target=loop, args=(pause,)).start()
    27 28 @register
    29 def _atexit():
    30     print('all DONE at:%s' % ctime())
    31 32 if __name__ == '__main__':
    33     _main()
     

    执行后的输出结果:

     1 /usr/local/bin/python3.6 /Users/zhenggougou/Project/Test_Temporary/mtsleepF.py
     2 这个是目前线程池中的线程: Thread-1
     3 [Sat Jul 28 21:09:44 2018] Started Thread-1
     4 这个是目前线程池中的线程: Thread-2, Thread-1
     5 [Sat Jul 28 21:09:44 2018] Started Thread-2
     6 这个是目前线程池中的线程: Thread-3, Thread-2, Thread-1
     7 [Sat Jul 28 21:09:44 2018] Started Thread-3
     8 这个是目前线程池中的线程: Thread-3, Thread-2, Thread-4, Thread-1
     9 [Sat Jul 28 21:09:44 2018] Started Thread-4
    10 这个是目前线程池中的线程: Thread-5, Thread-4, Thread-3, Thread-2, Thread-1
    11 [Sat Jul 28 21:09:44 2018] Started Thread-5
    12 这个是目前线程池中的线程: Thread-5, Thread-6, Thread-4, Thread-3, Thread-2, Thread-1
    13 [Sat Jul 28 21:09:44 2018] Started Thread-6
    14 [Sat Jul 28 21:09:46 2018] Completed Thread-2 (2 secs)
    15 [Sat Jul 28 21:09:46 2018] Completed Thread-1 (2 secs)
    16 [Sat Jul 28 21:09:46 2018] Completed Thread-3 (2 secs)
    17  (remaining: Thread-5, Thread-6, Thread-4)
    18 [Sat Jul 28 21:09:46 2018] Completed Thread-6 (2 secs)
    19  (remaining: Thread-5, Thread-4)
    20 [Sat Jul 28 21:09:46 2018] Completed Thread-4 (2 secs)
    21  (remaining: Thread-5)
    22  (remaining: Thread-5)
    23 [Sat Jul 28 21:09:46 2018] Completed Thread-5 (2 secs)
    24  (remaining: None)
    25  (remaining: None)
    26 all DONE at:Sat Jul 28 21:09:46 2018

    从执行结果中可以看出,有的时候可能会存在多个线程并行执行操作删除 remaining 集合中数据的情况。比如上面结果中,线程1、2、3 就是同时执行去删除集合中数据的。所以为了避免这种情况需要加锁,通过引入 Lock (或 RLock),然后创建一个锁对象来保证数据的修改每次只有一个线程能操作。

    1. 首先先导入锁类,然后创建锁对象

      from threading import Thread, Lock, currentThread

      lock = Lock()

    2. 然后使用创建的锁,将上面 mtsleepF.py 脚本中 loop() 函数做以下改变:

       1 def loop(nsec):
       2     myname = currentThread().name
       3     lock.acquire() # 获取锁
       4     remaining.add(myname)
       5     print('这个是目前线程池中的线程:', remaining)
       6     print('[%s] Started %s' % (ctime(), myname))
       7     lock.release() # 释放锁
       8     sleep(nsec)
       9     lock.acquire() # 获取锁
      10     remaining.remove(myname)
      11     print('[%s] Completed %s (%d secs)' % (ctime(), myname, nsec))
      12     print(' (remaining: %s)' % (remaining or 'None'))
      13     lock.release() # 释放锁

    在操作变量的前后需要进行获取锁和释放锁的操作,以保证在修改变量时只有一个线程进行。上面的代码有两处修改变量,一是:remaining.add(myname) ,二是:remaining.remove(myname)。 所以上面代码中有两次获取锁和释放锁的操作。其实还有一种方案可以不再调用锁的 acquire()release() 方法,二是使用上下文管理,进一步简化代码。代码如下:

     1 def loop(nesc):
     2     myname = currentThread().name
     3     with lock:
     4         remaining.add(myname)
     5         print('[{0}] Started {1}'.format(ctime(), myname))
     6     sleep(nesc)
     7     with lock:
     8         remaining.remove(myname)
     9         print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nesc))
    10         print(' (remaining: {0})'.format(remaining or 'None'))
     

    信号量示例

    锁非常易于理解和实现,也很容易决定何时需要它们,然而,如果情况更加复杂,可能需要一个更强大的同步原语来代替锁。

    信号量是最古老的同步原语之一。它是一个计数器,当资源消耗时递减,当资源释放时递增。可以认为信号量代表它们的资源可用或不可用。信号量比锁更加灵活,因为可以有多个线程,每个线程都拥有有限资源的一个实例。

    • 消耗资源使计数器递减的操作习惯上称为 P() —— acquire ;

    • 当一个线程对一个资源完成操作时,该资源需要返回资源池中,这个操作一般称为 V() —— release 。

    示例,糖果机和信号量(candy.py):

    *注:该脚本使用了锁和信号量来模拟一个糖果机

     1 # -*- coding=utf-8 -*-
     2 from atexit import register
     3 from random import randrange
     4 from threading import BoundedSemaphore, Lock, Thread
     5 from time import sleep, ctime
     6  7 lock = Lock()
     8 MAX = 5
     9 candytray = BoundedSemaphore(MAX)
    10 11 def refill():
    12     lock.acquire()
    13     print('Refilling candy')
    14     try:
    15         candytray.release() # 释放资源
    16     except ValueError:
    17         print('full, skipping')
    18     else:
    19         print('OK')
    20     lock.release()
    21 22 def buy():
    23     lock.acquire()
    24     print('Buying candy...')
    25     if candytray.acquire(False): # 消耗资源
    26         print('OK')
    27     else:
    28         print('empty, skipping')
    29     lock.release()
    30 31 def producer(loops):
    32     for i in range(loops):
    33         refill()
    34         sleep(randrange(3))
    35 36 def consumer(loops):
    37     for i in range(loops):
    38         buy()
    39         sleep(randrange(3))
    40 41 def _main():
    42     print('starting at:{0}'.format(ctime()))
    43     nloops = randrange(2, 6)
    44     print('THE CANDY MACHINE (full with %d bars)!' % MAX)
    45     Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start()
    46     Thread(target=producer, args=(nloops,)).start()
    47 48 @register
    49 def _atexit():
    50     print('all DONE at:{0}'.format(ctime()))
    51 52 if __name__ == '__main__':
    53     _main()

    执行结果为:

     1 /usr/local/bin/python3.6 ~/Test_Temporary/candy.py
     2 starting at:Sun Jul 29 21:12:50 2018
     3 THE CANDY MACHINE (full with 5 bars)!
     4 Buying candy...
     5 OK
     6 Refilling candy
     7 OK
     8 Refilling candy
     9 full, skipping
    10 Buying candy...
    11 OK
    12 Buying candy...
    13 OK
    14 all DONE at:Sun Jul 29 21:12:52 2018
  • 相关阅读:
    MySQL length函数
    MySQL between ... and ...
    MySQL Group By使用
    MySQL 聚合函数/分组函数
    MySQL where与like
    MySQL order by与distinct
    城市网络
    滑动窗口
    合并回文子串(NC13230)
    NC50439
  • 原文地址:https://www.cnblogs.com/tester-xt/p/9387971.html
Copyright © 2011-2022 走看看