zoukankan      html  css  js  c++  java
  • python 并发专题(二):python线程以及线程池相关以及实现

    一 多线程实现

    线程模块

    多线程主要的内容:直接进行多线程操作,线程同步,带队列的多线程;

    Python3 通过两个标准库 _thread 和 threading 提供对线程的支持。

    _thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于 threading 模块的功能还是比较有限的。

    threading 模块除了包含 _thread 模块中的所有方法外,还提供的其他方法:

    • threading.currentThread(): 返回当前的线程变量。
    • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:

    • run(): 用以表示线程活动的方法。
    • start():启动线程活动。
    • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
    • isAlive(): 返回线程是否活动的。
    • getName(): 返回线程名。
    • setName(): 设置线程名。

    1. 最简单的多线程

    
    
    import threading
    import time
    def target():
        print("the current threading %s is runing"
           %(threading.current_thread().name))
        time.sleep(1)
        print("the current threading %s is ended"%(threading.current_thread().name))
        
    print("the current threading %s is runing"%(threading.current_thread().name))
    ## 属于线程t的部分
    t = threading.Thread(target=target)
    t.start()
    ## 属于线程t的部分
    t.join() # join是阻塞当前线程(此处的当前线程时主线程) 主线程直到Thread-1结束之后才结束
    print("the current threading %s is ended"%(threading.current_thread().name))

    2.通过继承threading.Thread定义子类创建多线程

    使用Threading模块创建线程,直接从threading.Thread继承,然后重写init方法和run方法:

    import threading
    import time
    
    class myThread(threading.Thread):  # 继承父类threading.Thread
       def __init__(self, threadID, name, counter):
          threading.Thread.__init__(self)
          self.threadID = threadID
          self.name = name
          self.counter = counter
    
       def run(self):  # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
          print("Starting " + self.name)
          print_time(self.name, self.counter, 5)
          print("Exiting " + self.name)
    
    
    def print_time(threadName, delay, counter):
       while counter:
          time.sleep(delay)
          print("%s process at: %s" % (threadName, time.ctime(time.time())))
          counter -= 1
    
    
    # 创建新线程
    thread1 = myThread(1, "Thread-1", 1)
    thread2 = myThread(2, "Thread-2", 2)
    
    # 开启线程
    thread1.start()
    thread2.start()
    
    # 等待线程结束
    thread1.join()
    thread2.join()
    
    print("Exiting Main Thread")

    3. 主线程退出,进程等待所有子线程执行完毕后才结束

    进程启动后会默认产生一个主线程,默认情况下主线程创建的子线程都不是守护线程(setDaemon(False))。

    因此主线程结束后,子线程会继续执行,进程会等待所有子线程执行完毕后才结束

    所有线程共享一个终端输出(线程所属进程的终端)

    import threading
    import time
    
    
    def child_thread1():
        for i in range(100):
            time.sleep(1)
            print('child_thread1_running...')
    
    
    def parent_thread():
        print('parent_thread_running...')
        thread1 = threading.Thread(target=child_thread1)
        thread1.start()
        print('parent_thread_exit...')
    
    
    if __name__ == "__main__":
        parent_thread()

    输出为:

    parent_thread_running...
    parent_thread_exit...
    child_thread1_running...
    child_thread1_running...
    child_thread1_running...
    child_thread1_running...
    ...

    可见父线程结束后,子线程仍在运行,此时结束进程,子线程才会被终止

    4.主线程结束后进程不等待守护线程完成,程序立即结束

    当设置一个线程为守护线程时,此线程所属进程不会等待此线程运行结束,进程将立即结束

    import threading
    import time
    
    
    def child_thread1():
        for i in range(100):
            time.sleep(1)
            print('child_thread1_running...')
    
    
    def child_thread2():
        for i in range(5):
            time.sleep(1)
            print('child_thread2_running...')
    
    
    def parent_thread():
        print('parent_thread_running...')
        thread1 = threading.Thread(target=child_thread1)
        thread2 = threading.Thread(target=child_thread2)
        thread1.setDaemon(True)
        thread1.start()
        thread2.start()
        print('parent_thread_exit...')
    
    
    if __name__ == "__main__":
        parent_thread()

    输出:

    parent_thread_running...
    parent_thread_exit...
    child_thread1_running...child_thread2_running...
    
    child_thread1_running...child_thread2_running...
    
    child_thread1_running...child_thread2_running...
    
    child_thread1_running...child_thread2_running...
    
    child_thread2_running...child_thread1_running...
    
    Process finished with exit code 0

    thread1是守护线程,thread2非守护线程,因此,进程会等待thread2完成后结束,而不会等待thread1完成

    注意:子线程会继承父线程中daemon的值,即守护线程开启的子线程仍是守护线程

    5.主线程等待子线程完成后结束

    在线程A中使用B.join()表示线程A在调用join()处被阻塞,且要等待线程B的完成才能继续执行

    import threading
    import time
    
    
    def child_thread1():
        for i in range(10):
            time.sleep(1)
            print('child_thread1_running...')
    
    
    def child_thread2():
        for i in range(5):
            time.sleep(1)
            print('child_thread2_running...')
    
    
    def parent_thread():
        print('parent_thread_running...')
        thread1 = threading.Thread(target=child_thread1)
        thread2 = threading.Thread(target=child_thread2)
        thread1.setDaemon(True)
        thread2.setDaemon(True)
        thread1.start()
        thread2.start()
        thread2.join()
        1/0
        thread1.join()
        print('parent_thread_exit...')
    
    
    if __name__ == "__main__":
        parent_thread()

    输出:

    parent_thread_running...
    child_thread1_running...
    child_thread2_running...
    child_thread1_running...
    child_thread2_running...
    child_thread1_running...
    child_thread2_running...
    child_thread1_running...
    child_thread2_running...
    child_thread1_running...
    child_thread2_running...
    Traceback (most recent call last):
      File "E:/test_thread.py", line 31, in <module>
        parent_thread()
      File "E:/test_thread.py", line 25, in parent_thread
        1/0
    ZeroDivisionError: integer division or modulo by zero

    主线程在执行到thread2.join()时被阻塞,等待thread2结束后才会执行下一句

    1/0 会使主线程报错退出,且thread1设置了daemon=True,因此主线程意外退出时thread1也会立即结束。thread1.join()没有被主线程执行

    6.子线程间的执行顺序

    6.1 默认状态下

    import threading
    import time
    
    
    def child_thread1():
        for i in range(50):
            time.sleep(1)
            print('child_thread1_running...')
    
    def child_thread2():
        for i in range(50):
            time.sleep(1)
            print('child_thread2_running...')
    
    def parent_thread():
        print('parent_thread_running...')
        thread1 = threading.Thread(target=child_thread1)
        thread2 = threading.Thread(target=child_thread2)
        thread1.start()
        thread2.start()

    #
    #
    print('parent_thread_exit...') if __name__ == "__main__": parent_thread()

    结果

    parent_thread_running...
    parent_thread_exit...
    child_thread2_running...
    child_thread1_running...
    child_thread1_running...child_thread2_running...
    
    child_thread1_running...child_thread2_running...
    
    child_thread2_running...
    child_thread1_running...
    child_thread1_running...
    child_thread2_running...
    child_thread1_running...child_thread2_running...
    
    child_thread1_running...child_thread2_running...
    
    child_thread1_running...child_thread2_running...
    
    child_thread1_running...
    child_thread2_running...
    child_thread1_running...
    child_thread2_running...
    child_thread2_running...
    child_thread1_running...
    child_thread1_running...child_thread2_running...
    
    child_thread1_running...child_thread2_running...
    
    child_thread2_running...
    child_thread1_running...
    child_thread1_running...
    child_thread2_running...
    child_thread2_running...
    child_thread1_running...
    child_thread2_running...
    child_thread1_running...
    child_thread1_running...
    child_thread2_running...
    child_thread1_running...child_thread2_running...
    
    child_thread2_running...child_thread1_running...
    
    child_thread1_running...
    child_thread2_running...
    child_thread2_running...
    child_thread1_running...
    child_thread1_running...
    child_thread2_running...
    child_thread2_running...
    child_thread1_running...
    ...
    ...
    [Finished in 50.1s]

    默认状态下,子线程的工作顺序随机,没有明显先后顺序,主线程比子线程早结束

    6.2 使用join

    child_thread1先执行,完成后,再执行child_thread2,child_thread2执行完成后,主线程再执行print('parent_thread_exit...')后退出
    import threading
    import time
    
    
    def child_thread1():
        for i in range(50):
            time.sleep(1)
            print('child_thread1_running...')
    
    def child_thread2():
        for i in range(50):
            time.sleep(1)
            print('child_thread2_running...')
    
    def parent_thread():
        print('parent_thread_running...')
        thread1 = threading.Thread(target=child_thread1)
        thread2 = threading.Thread(target=child_thread2)
        thread1.start()
        thread1.join() 
    
        thread2.start()
        thread2.join()
    
        print('parent_thread_exit...')
    
    
    if __name__ == "__main__":
        parent_thread()

    结果

    parent_thread_running...
    child_thread1_running...
    ...
    child_thread1_running...
    child_thread1_running...
    child_thread2_running...
    child_thread2_running...
    ...... child_thread2_running... parent_thread_exit... [Finished
    in 100.2s]
    child_thread2先执行,完成后,再执行child_thread1,child_thread2执行完成后,主线程再执行print('parent_thread_exit...')后退出
    import threading
    import time
    
    
    def child_thread1():
        for i in range(50):
            time.sleep(1)
            print('child_thread1_running...')
    
    def child_thread2():
        for i in range(50):
            time.sleep(1)
            print('child_thread2_running...')
    
    def parent_thread():
        print('parent_thread_running...')
        thread1 = threading.Thread(target=child_thread1)
        thread2 = threading.Thread(target=child_thread2)
        
        thread2.start()
        thread2.join()
        thread1.start()
        thread1.join()
        
    
        print('parent_thread_exit...')
    
    
    if __name__ == "__main__":
        parent_thread()
    child_thread1与child_thread2没有执行顺序,主线程再执行print('parent_thread_exit...')后退出
    import threading
    import time
    
    
    def child_thread1():
        for i in range(50):
            time.sleep(1)
            print('child_thread1_running...')
    
    def child_thread2():
        for i in range(50):
            time.sleep(1)
            print('child_thread2_running...')
    
    def parent_thread():
        print('parent_thread_running...')
        thread1 = threading.Thread(target=child_thread1)
        thread2 = threading.Thread(target=child_thread2)
        
        thread2.start()
        thread1.start()
        thread2.join()
        thread1.join()
        
    
        print('parent_thread_exit...')
    
    
    if __name__ == "__main__":
        parent_thread()

    7.线程间的通信

    多线程共享全局变量

    线程时进程的执行单元,进程时系统分配资源的最小执行单位,所以在同一个进程中的多线程是共享资源的

    7.1  多线程共享全局变量

    
    
    import threading
    import time
    # g_num = 100
    # def work1():
    #     global  g_num
    #     for i in range(3):
    #         g_num+=1
    #     print('in work1 g_num is : %d' % g_num)
    #
    # def work2():
    #     global g_num
    #     print('in work2 g_num is : %d' % g_num)
    #
    # if __name__ == '__main__':
    #     t1 = threading.Thread(target=work1)
    #     t1.start()
    #     time.sleep(1)
    #     t2=threading.Thread(target=work2)
    #     t2.start()

    7.2 互斥锁(Lock)

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以出现了线程锁,即同一时刻允许一个线程执行操作。线程锁用于锁定资源,可以定义多个锁,像下面的代码,当需要独占某一个资源时,任何一个锁都可以锁定这个资源,就好比你用不同的锁都可以把这个相同的门锁住一样。
    由于线程之间是进行随机调度的,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,
    我们因此也称为“线程不安全”。为了防止上面情况的发生,就出现了互斥锁(Lock)

    import time, threading
    count=0 #声明全局变量
    lock=threading.Lock() #申请一把锁
    def lajifenlei():
        global count #引用全局变量
        lock.acquire()  #加锁
        try:
            count+=1
        except Exception as e:
            pass
        finally:
            lock.release() #释放锁
        time.sleep(1)
        print(count)
    
    for i in range(10):
        th = threading.Thread(target=lajifenlei,) #声明线程数
        th.start() #启动线程
    while threading.activeCount()!=1:
        pass

    另一种类似打开和关闭文件的with方法,自动开关锁

    import time, threading
    count=0 #声明全局变量
    lock=threading.Lock() #申请一把锁
    def lajifenlei():
        global count #引用全局变量
    
        with lock: #with模块自动加锁及解锁
         count+=1
    
        time.sleep(1)
        print(count)
    
    for i in range(10):
        th = threading.Thread(target=lajifenlei,) #声明线程数
        th.start() #启动线程
    while threading.activeCount()!=1:
        pass

    7.3 线程同步队列queue

    Python的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

    queue模块中的常用方法:

    • queue.qsize() 返回队列的大小
    • queue.empty() 如果队列为空,返回True,反之False
    • queue.full() 如果队列满了,返回True,反之False
    • queue.full 与 maxsize 大小对应
    • queue.get([block[, timeout]])获取队列,timeout等待时间
    • queue.get_nowait() 相当Queue.get(False)
    • queue.put(item) 写入队列,timeout等待时间
    • queue.put_nowait(item) 相当Queue.put(item, False)
    • queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
    • queue.join() 实际上意味着等到队列为空,再执行别的操作
    # coding: utf-8
    
    from queue import Queue
    
    # Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递
    def test_queue():
    
        q=Queue(10)
        for i in range(5):
            q.put(i)
        while not q.empty():
            print(q.get())
    
    def test_LifoQueue():
        import queue
        # queue.LifoQueue() #后进先出->堆栈
        q = queue.LifoQueue(3)
        q.put(1)
        q.put(2)
        q.put(3)
        print(q.get())
        print(q.get())
        print(q.get())
    
    def test_PriorityQueue():
        import queue
        # queue.PriorityQueue() #优先级
        q = queue.PriorityQueue(3)  # 优先级,优先级用数字表示,数字越小优先级越高
        q.put((10, 'a'))
        q.put((-1, 'b'))
        q.put((100, 'c'))
        print(q.get())
        print(q.get())
        print(q.get())
    
    
    # Python queue队列,实现并发,在网站多线程推荐最后也一个例子,比这货简单,但是不够规范
    
    from queue import Queue  # Queue在3.x中改成了queue
    import random
    import threading
    import time
    from threading import Thread
    
    class Producer(threading.Thread):
        """
        Producer thread 制作线程
        """
        def __init__(self, t_name, queue):  # 传入线程名、实例化队列
            threading.Thread.__init__(self, name=t_name)  # t_name即是threadName
            self.data = queue
    
        """
        run方法 和start方法:
        它们都是从Thread继承而来的,run()方法将在线程开启后执行,
        可以把相关的逻辑写到run方法中(通常把run方法称为活动[Activity]);
        start()方法用于启动线程。
        """
    
        def run(self):
            for i in range(5):  # 生成0-4五条队列
                print("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i))  # 当前时间t生成编号d并加入队列
                self.data.put(i)  # 写入队列编号
                time.sleep(random.randrange(10) / 5)  # 随机休息一会
            print("%s: %s producing finished!" % (time.ctime(), self.getName))  # 编号d队列完成制作
    
    
    class Consumer(threading.Thread):
        """
        Consumer thread 消费线程,感觉来源于COOKBOOK
        """
        def __init__(self, t_name, queue):
            threading.Thread.__init__(self, name=t_name)
            self.data = queue
    
        def run(self):
            for i in range(5):
                val = self.data.get()
                print("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val))  # 编号d队列已经被消费
                time.sleep(random.randrange(10))
            print("%s: %s consuming finished!" % (time.ctime(), self.getName()))  # 编号d队列完成消费
    
    
    def main():
        """
        Main thread 主线程
        """
        queue = Queue()  # 队列实例化
        producer = Producer('Pro.', queue)  # 调用对象,并传如参数线程名、实例化队列
        consumer = Consumer('Con.', queue)  # 同上,在制造的同时进行消费
        producer.start()  # 开始制造
        consumer.start()  # 开始消费
        """
        join()的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
       join()方法的位置是在for循环外的,也就是说必须等待for循环里的两个进程都结束后,才去执行主进程。
        """
        producer.join()
        consumer.join()
        print('All threads terminate!')
    
    
    
    if __name__=="__main__":
    
        test_queue()
    
        print("=====后进先出=====")
        test_LifoQueue()
    
        print("=====优先级======")
        test_PriorityQueue()
    
        main()

    7.4 threading.local

    二 线程池实现

             https://www.cnblogs.com/zhang293/p/7954353.html 

            https://www.cnblogs.com/shuai1991/p/11224919.html

            https://www.jianshu.com/p/b9b3d66aa0be

    ThreadPoolExecutor 基础

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    # 参数times用来模拟网络请求的时间
    def get_html(times):
        time.sleep(times)
        print("get page {}s finished".format(times))
        return times
    
    executor = ThreadPoolExecutor(max_workers=2)
    # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
    task1 = executor.submit(get_html, (3))
    task2 = executor.submit(get_html, (2))
    # done方法用于判定某个任务是否完成
    print(task1.done())
    # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
    print(task2.cancel())
    time.sleep(4)
    print(task1.done())
    # result方法可以获取task的执行结果
    print(task1.result())
    
    # 执行结果
    # False  # 表明task1未执行完成
    # False  # 表明task2取消失败,因为已经放入了线程池中
    # get page 2s finished
    # get page 3s finished
    # True  # 由于在get page 3s finished之后才打印,所以此时task1必然完成了
    # 3     # 得到task1的任务返回值
    • ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。
    • 使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。
    • 通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在task1提交后立刻判断,task1还未完成,而在延时4s之后判断,task1就完成了。
    • 使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是task1task2还在排队等候,这是时候就可以成功取消。
    • 使用result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。

    as_completed

    上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed方法一次取出所有任务的结果。
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
    
    # 参数times用来模拟网络请求的时间
    def get_html(times):
        time.sleep(times)
        print("get page {}s finished".format(times))
        return times
    
    executor = ThreadPoolExecutor(max_workers=2)
    urls = [3, 2, 4] # 并不是真的url
    all_task = [executor.submit(get_html, (url)) for url in urls]
    
    for future in as_completed(all_task):
        data = future.result()
        print("in main: get page {}s success".format(data))
    
    # 执行结果
    # get page 2s finished
    # in main: get page 2s success
    # get page 3s finished
    # in main: get page 3s success
    # get page 4s finished
    # in main: get page 4s success

    as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程

    map

    除了上面的as_completed方法,还可以使用executor.map方法,但是有一点不同。

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    # 参数times用来模拟网络请求的时间
    def get_html(times):
        time.sleep(times)
        print("get page {}s finished".format(times))
        return times
    
    executor = ThreadPoolExecutor(max_workers=2)
    urls = [3, 2, 4] # 并不是真的url
    
    for data in executor.map(get_html, urls):
        print("in main: get page {}s success".format(data))
    # 执行结果
    # get page 2s finished
    # get page 3s finished
    # in main: get page 3s success
    # in main: get page 2s success
    # get page 4s finished
    # in main: get page 4s success
    使用map方法,无需提前使用submit方法,map方法与python标准库中的map含义相同,都是将序列中的每个元素都执行同一个函数。上面的代码就是对urls的每个元素都执行get_html函数,并分配各线程池。可以看到执行结果与上面的as_completed方法的结果不同,输出顺序和urls列表的顺序相同,就算2s的任务先执行完成,也会先打印出3s的任务先完成,再打印2s的任务完成。

    wait

    wait方法可以让主线程阻塞,直到满足设定的要求。

    from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
    import time
    
    # 参数times用来模拟网络请求的时间
    def get_html(times):
        time.sleep(times)
        print("get page {}s finished".format(times))
        return times
    
    executor = ThreadPoolExecutor(max_workers=2)
    urls = [3, 2, 4] # 并不是真的url
    all_task = [executor.submit(get_html, (url)) for url in urls]
    wait(all_task, return_when=ALL_COMPLETED)
    print("main")
    # 执行结果 
    # get page 2s finished
    # get page 3s finished
    # get page 4s finished
    # main
    wait方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件return_when默认为ALL_COMPLETED,表明要等待所有的任务都结束。可以看到运行结果中,确实是所有任务都完成了,主线程才打印出main。等待条件还可以设置为FIRST_COMPLETED,表示第一个任务完成就停止等待。
     
    ThreadPoolExecutor 间的通信
    三 多线程爬虫实现
     
    实例 抓取b站up主视频评论

     1.普通多线程(利用threading的local)

    import re
    import requests
    import json
    import threading
    import math
    import time
    
    HEADERS = {#'Accept':"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
        'User-Agent': 'Mozilla/5.0 (Linux; U; Android 7.1.1; zh-cn; MI 6 Build/NMF26X) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30',
    
    }
    
    local = threading.local()
    
    # Thread-local state to stored information on locks already acquired
    
    def start_urls(total_page):
        #生产者  产生用于消费的urls任务列表
        tasks = list()
        url = "https://api.bilibili.com/x/v2/reply?jsonp=jsonp&pn={}&type=1&oid=455312953&sort=2&_=1587372277524"
        for i in range(1,total_page+1):
            tasks.append(url.format(i))
        return tasks
    
    def init_start():
        #获取评论列表的总页数
        url = "https://api.bilibili.com/x/v2/reply?jsonp=jsonp&pn=1&type=1&oid=455312953&sort=2&_=1587372277524"
        content = downloader(url)
        data = json.loads(content.text)
        total_page = math.ceil(int(data['data']['page']['count'])/int(data['data']['page']['size']))
        print(total_page)
        return total_page
    
    def downloader(url):
        #下载任务
        content = requests.get(url,headers=HEADERS)
        return content
    
    def work(tasks,n):
        #消费者
        while len(tasks):
            time.sleep(1)
            try:
                local.url = tasks.pop()
            except Exception as e:
                print('e',e)
                continue
            print(local.url,threading.current_thread())
            data = downloader(local.url)
    
    
    if __name__ == '__main__':
        total_page = init_start()
        task_urls = start_urls(total_page)
        for i in range(3):
            t = threading.Thread(target=work,args=(task_urls,i))
            t.start()

    2.线程池运用版本

    import re
    import requests
    import json
    import threading
    import math
    import time
    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    HEADERS = {#'Accept':"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
        'User-Agent': 'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
    
    }
    
    local = threading.local()
    
    # Thread-local state to stored information on locks already acquired
    
    def start_urls(total_page):
        #生产者  产生用于消费的urls任务列表
        tasks = list()
        url = "https://api.bilibili.com/x/v2/reply?jsonp=jsonp&pn={}&type=1&oid=455312953&sort=2&_=1587372277524"
        for i in range(1,total_page+1):
            tasks.append(url.format(i))
        return tasks
    
    def init_start():
        #获取评论列表的总页数
        url = "https://api.bilibili.com/x/v2/reply?jsonp=jsonp&pn=1&type=1&oid=455312953&sort=2&_=1587372277524"
        content = downloader(url)
        data = json.loads(content.text)
        total_page = math.ceil(int(data['data']['page']['count'])/int(data['data']['page']['size']))
        print(total_page)
        return total_page
    
    def downloader(url):
        #下载任务
        content = requests.get(url,headers=HEADERS)
        print(content.status_code,type(content.status_code))
        return content
    
    
    def work(tasks,n):
        #消费者
        while len(tasks):
            time.sleep(1)
            try:
                local.url = tasks.pop()
            except Exception as e:
                print('e',e)
                continue
            print(local.url,threading.current_thread())
            data = downloader(local.url)
    
    
    if __name__ == '__main__':
        total_page = init_start()
        task_urls = start_urls(total_page)
    
        with ThreadPoolExecutor(max_workers=5) as executor:
            all_task = [executor.submit(work, task_urls,i) for i in range(3)]
            for future in as_completed(all_task):
                data = future.result()
                print("in main: get page {}s success".format(data))
  • 相关阅读:
    使用Systrace分析UI性能
    android官方推荐的网络调优器AT&T ARO
    HttpResponseCache 网络缓存使用
    SectionIndexer中的getSectionForPosition()与getPositionForSection()
    MVVM_Android-CleanArchitecture
    Android UI:机智的远程动态更新策略
    Data Binding
    Android实战之你应该使用哪个网络库?
    View以自身中心旋转的代码解惑
    为什么要使用puppet 及初步接触
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/12674325.html
Copyright © 2011-2022 走看看