zoukankan      html  css  js  c++  java
  • 多线程和多进程2

    一. 线程同步之信号量--semaphore
     
    作用:控制进入数量的锁
    举个例子:
    写文件的时候,一般只用于一个线程写;读文件的时候可以用多个线程读,我们可以用信号量来控制多少个线程读文件
     
    做爬虫的时候,也可以用信号量来控制并发数量,以免访问量过多而被反爬,如下面代码
    import threading
    import time
     
    # 模拟2秒钟抓取一个html
    class HtmlSpider(threading.Thread):
        def __init__(self, url):
            super().__init__()
            self.url = url
     
        def run(self):
            time.sleep(2)
            print("success")
     
     
    class UrlProducer(threading.Thread):
        def run(self):
            for i in range(10):
                html_thread = HtmlSpider("https://baidu.com/{}".format(i))
                html_thread.start()
     
    if __name__ == "__main__":
        url_producer = UrlProducer()
        url_producer.start()
     
    这样会生成10个线程,并一次输出10个success,那么如果想每次生成3个线程去爬取,怎么做
    import threading
    import time
     
    # 模拟2秒钟抓取一个html
    class HtmlSpider(threading.Thread):
        def __init__(self, url, sem):
            super().__init__()
            self.url = url
            self.sem = sem
     
        def run(self):
            time.sleep(2)
            print("success")
            self.sem.release()  # 第三步:在这里释放锁,因为线程里运行的是爬虫
     
     
    class UrlProducer(threading.Thread):
        def __init__(self, sem):
            super().__init__()
            self.sem = sem
     
        def run(self):
            for i in range(10):
                self.sem.acquire()  # 第二步:获得锁,每获得一个锁信号量中的值就减一。获得3个锁时暂停程序,等待锁释放,看Semaphore源码
                html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
                html_thread.start()
     
    if __name__ == "__main__":
        sem = threading.Semaphore(3)  # 第一步,设置3个并发
        url_producer = UrlProducer(sem)
        url_producer.start()
     
     
     
     
     
    二.ThreadPoolExcutor线程池
     
    为什么要用线程池
    1)线程池提供一个最大线程允许的数量,当任务请求过多而超过线程池最大值时,就会造成阻塞。这个功能信号量也能做到
    2)线程池允许主线程中获得某一个线程的状态,或者某一个任务的状态以及返回值
    3)当一个线程完成时,主线程能立即知道
    4)futures模块可以让多线程和多进程编码接口一致,如果想把多线程切换为多进程就会很方便
     
    例子1
    from concurrent.futures import ThreadPoolExecutor
    import time
     
    def get_html(times):
        time.sleep(times)
        print("use {} to success".format(times))
        return "运行的时间是{}秒".format(times)
     
    executor = ThreadPoolExecutor(max_workers=2)  # 生成一个线程池对象,设置线程池里同时运行的数量
     
    # 通过submit函数提交执行的函数到线程池中,返回一个Future对象
    task1 = executor.submit(get_html, (2))  #(2)为函数get_html中的参数值
    task2 = executor.submit(get_html, (1))
     
    # 返回对象的done方法可用于判断任务是否执行成功,并且是立即执行,这里用task1为例子
    print(task1.done())
    time.sleep(3) #等待3秒后,在用done方法测试,结果为True.  可能是pychram内部计算问题,这里不能写2,否则会显示False
    print(task1.done())
    print(task1.result()) #result方法可获得get_html函数的返回值
    输出为
    False
    use 1 to success
    use 2 to success
    True
    运行的时间是2秒
     
     
    例子2:获取已经成功的任务返回,可以用as_completed库
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
     
    def get_html(times):
        time.sleep(times)
        print("use {} to success".format(times))
        return "我运行的时间是{}秒".format(times)
     
    executor = ThreadPoolExecutor(max_workers=2)  # 设置线程池里同时运行的数量
     
    # 模拟各线程爬取时间为urls列表
    urls = [3, 4, 9, 7]
    all_task = [executor.submit(get_html, (url)) for url in urls]
    for future in as_completed(all_task):
        data = future.result()
        print(data)
    按每一个线程完成时,分4步输出下面信息
    use 3 to success
    我运行的时间是3秒
    use 4 to success
    我运行的时间是4秒
    use 7 to success
    我运行的时间是7秒
    use 9 to success
    我运行的时间是9秒
    注意:不管urls列表中爬取时间顺序如何,主线程的输出都是按照时间先后顺序输出的
     
     
    例子3,通过executor获取已经完成的task,需要用到map函数
    from concurrent.futures import ThreadPoolExecutor
    import time
     
    def get_html(times):
        time.sleep(times)
        print("use {} to success".format(times))
        return "我运行的时间是{}秒".format(times)
     
    executor = ThreadPoolExecutor(max_workers=2)  # 设置线程池里同时运行的数量
     
    # 模拟各线程爬取时间为urls列表
    urls = [3, 4, 9, 7]
     
    # 通过executor获取已经完成的task, 使用map(),和python中的map函数类似
    for data in executor.map(get_html, urls):
        print(data)
    输出
    use 3 to success
    我运行的时间是3秒
    use 4 to success
    我运行的时间是4秒
    use 7 to success
    use 9 to success
    我运行的时间是9秒
    我运行的时间是7秒
    注意:主线程的输出顺序和urls列表中的时间顺序一样,和上面的例子注意区分
     
     
    例子4:wait函数,可用于等待某一个任务完成时,输出自定义状态,在例子2的基础上修改如下
    from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
    import time
     
    def get_html(times):
        time.sleep(times)
        print("use {} to success".format(times))
        return "我运行的时间是{}秒".format(times)
     
    executor = ThreadPoolExecutor(max_workers=2)  # 设置线程池里同时运行的数量
     
    # 模拟各线程爬取时间为urls列表
    urls = [3, 4, 9, 7]
    all_task = [executor.submit(get_html, (url)) for url in urls]
     
    # 添加wait函数,其中的return_when表示第一个线程完成时执行下一行代码
    wait(all_task, return_when=FIRST_COMPLETED)
    print("main")
    for future in as_completed(all_task):
        data = future.result()
        print(data)
    输出
    use 3 to success
    main
    我运行的时间是3秒
    use 4 to success
    我运行的时间是4秒
    use 7 to success
    我运行的时间是7秒
    use 9 to success
    我运行的时间是9秒
     
    注意输出中main的位置,看下wait的源码理解下return_when
     
     
     
     
     
    三. ThreadPoolExecutor源码分析
     
    1. 未来对象:Future对象
    from concurrent.futures import Future:   主要用于作为task的返回容器
     
    现在感觉看不懂,以后再看
     
     
     
     
     
    四. 多进程和多线程对比
     
    因为多线程只能使用1个核。如果想提高效率,使用多个核运行程序,可以用多进程编程
    对于I/O操作,一般使用多线程编程,因为进程切换代价要高于线程
     
    例子1:斐波那契的计算是一个耗CPU的操作,下面用线程和进程执行一个10个数的斐波那契值所需要的时间
     
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from concurrent.futures import ProcessPoolExecutor
    import time
     
     
    def fib(n):
        if n<=2:
            return 1
        return fib(n-1)+fib(n-2)
     
    if __name__ == "__main__":
        with ThreadPoolExecutor(3) as executor:
            all_task = [executor.submit(fib, (num)) for num in range(25, 35)]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                #print("fib值为: {}".format(data))
     
            print("多线程用时:{}".format(time.time()-start_time))
     
        with ProcessPoolExecutor(3) as executor:
            all_task = [executor.submit(fib, (num)) for num in range(25, 35)]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                #print("fib值为: {}".format(data))
     
            print("多进程用时 :{}".format(time.time()-start_time))
     
    输出结果
    多线程用时:4.100145578384399
    多进程用时 :2.9355385303497314
     
     
    例子2:I/O操作的例子
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from concurrent.futures import ProcessPoolExecutor
    import time
     
     
    def random_sleep(n):
        time.sleep(n)
        return n
     
    if __name__ == "__main__":
        with ProcessPoolExecutor(3) as executor:
            all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
     
            print("多进程用时 :{}".format(time.time()-start_time))
     
        with ThreadPoolExecutor(3) as executor:
            all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
     
            print("多进程用时 :{}".format(time.time()-start_time))
    输出结果如下
    多进程用时 :20.17968201637268
    多线程用时 :20.00802493095398
    虽然差别不大,还是线程快点,而且线程比进程耗费的内存少的多
     
     
     
     
     
    五. 多进程编程
     
    1. 先来了解下fork函数
    fork在linux中用于创建子进程,不能在windows中使用,如下代码存在一个文件比如1.py中
    import os
    import time
     
    pid = os.fork()
    print("jack")
    if pid == 0:
        print('子进程 {} ,父进程是: {}.' .format(os.getpid(), os.getppid()))
    else:
        print('我是父进程:{}.'.format(pid))
     
    time.sleep(2)
    输出结果
    jack
    我是父进程:1016.
    jack
    子进程 1016 ,父进程是: 1015.
     
    说明:
    1)执行文件1.py时,会生成一个主进程;代码里的fork()又创建了一个子进程,pid不会是0。所以会先输出前两行的内容
    2)1.py的主进程执行完后,会执行里面的子进程,它会复制os.fork()后的所有代码,重新执行一次,所有得到后两行输出
     
     
    2. multiprocessing来实现多进程,比ProcessPoolExecutor更底层
    import multiprocessing
    import time
     
    def get_html(n):
        time.sleep(n)
        print("sub_progress sucess")
        return n
     
    if __name__ == "__main__":
        # 使用mulproccessing中的线程池
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
        result = pool.apply_async(get_html, args=(3,))  # 这里的3是给get_html的参数设置为3秒
     
        # 等待所有任务完成
        pool.close()  #要先把进程池关闭,否则会抛异常
        pool.join()
        print(result.get())
    输出结果
    sub_progress sucess
    3
     
     
    3. imap,对应线程中的map,按列表中的时间顺序输出
    import multiprocessing
    import time
     
    def get_html(n):
        time.sleep(n)
        print("sub_progress sucess")
        return n
     
    if __name__ == "__main__":
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
        for result in pool.imap(get_html, [1, 5, 3]):  #result为get_html的返回值
            print("{} success".format(result))
    输出结果
    sub_progress sucess
    1 success
    sub_progress sucess
    sub_progress sucess
    5 success
    3 success
     
     
    4. imap_unordered方法,按执行时间先后顺序输出
    import multiprocessing
    import time
     
    def get_html(n):
        time.sleep(n)
        print("sub_progress sucess")
        return n
     
    if __name__ == "__main__":
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
        for result in pool.imap_unordered(get_html, [1, 5, 3]):
            print("{} sleep success".format(result))
    输出结果
    sub_progress sucess
    1 sleep success
    sub_progress sucess
    3 sleep success
    sub_progress sucess
    5 sleep success
     
     
     
     
     
    六. 进程间的通信:Queue,Pipe, Manager
     
    1. 使用multiprocessing中的Queue进行通信
    import time
    from multiprocessing import Process, Queue
     
     
    def producer(queue):
        queue.put("a")
        time.sleep(2)
     
    def consumer(queue):
        time.sleep(2) #需等待producer执行完再拿数据
        data = queue.get()
        print(data)
     
    if __name__ == "__main__":
        queue = Queue(5)
        my_producer = Process(target=producer, args=(queue,))
        my_consumer = Process(target=consumer, args=(queue,))
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()
    输出a,说明进程my_producer和进程my_consumer间通信成功
     
     
    2. 进程池中的进程间通信需要使用Manager实例化中的Queue
    import time
    from multiprocessing import Pool, Manager
     
     
    def producer(queue):
        queue.put("a")
        time.sleep(2)
     
    def consumer(queue):
        time.sleep(2)
        data = queue.get()
        print(data)
     
    if __name__ == "__main__":
        queue = Manager().Queue(5)  # 使用Manage实例化后的Queue
        pool = Pool(2)
     
        pool.apply_async(producer, args=(queue,))
        pool.apply_async(consumer, args=(queue,))
     
        pool.close()
        pool.join()
    输出同样是a,表示通信成功
     
     
    3. 使用pipe实现进程间通信
    from multiprocessing import Process, Pipe
     
     
    def producer(pipe):
        pipe.send("a")
     
    def consumer(pipe):
        print(pipe.recv())
     
    if __name__ == "__main__":
        recevie_pipe, send_pipe = Pipe()
        # pipe只能用于2个进程间的通信
        my_producer = Process(target=producer, args=(send_pipe,))
        my_consumer = Process(target=consumer, args=(recevie_pipe,))
     
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()
    输出同样是a
    注意:pipe只能用于2个进程间的通信,pipe的性能是高于queue的
     
     
    4. 进程间使用共享内存
    from multiprocessing import Manager, Process
     
    def add_data(p_dict, key, value):
        p_dict[key] = value
     
    if __name__ == "__main__":
        progress_dict = Manager().dict()
        first_progress = Process(target=add_data, args=(progress_dict, "jack", 22))
        second_progress = Process(target=add_data, args=(progress_dict, "hong", 34))
     
        first_progress.start()
        second_progress.start()
        first_progress.join()
        second_progress.join()
     
        print(progress_dict)
    输出结果
    {'jack': 22, 'hong': 34}
     
    说明:
    1)结果显示2个进程间的数据合并了,都写入了主进程中的同一个内存中
    2)本例子是用dict来做说明,其实Manager()里还有list, tuple等数据结构都可以使用
     
     
  • 相关阅读:
    html+css学习
    mac安装软件系列
    Versions 出现 SVN Working Copy xxx locked
    linux安装gcc-c++
    linux常用命令
    linux挂载磁盘
    对jquery新增加的class绑定事件
    linux下安装php的mcrypt拓展
    Linux关闭selinux
    linux安装包资源库
  • 原文地址:https://www.cnblogs.com/regit/p/9881209.html
Copyright © 2011-2022 走看看