zoukankan      html  css  js  c++  java
  • python_进程池以及线程池

    可以重复利用的线程

    直接上代码

    from threading import Thread, current_thread
    from queue import Queue
    # 重写线程类
    class MyThread(Thread):
        def __init__(self):
            super().__init__()
            self.daemon = True  # 守护线程
            self.queue = Queue(10)
            self.start()  # 实例化的时候开启线程
    
        def run(self):  # 子线程只有这一个线程, 从队列里面拿任务
            while True:
                task, args, kwargs = self.queue.get()  # 拿任务 也是元组
                task(*args, **kwargs)  # 可能有,可能没有,所有传入不定长参数
                self.queue.task_done()   # 结束任务
    
        def apply_async(self, func, args=(), kwargs={}):   # 自写任务,不是重写任务, 充当生产者, 给线程提供任务(把任务扔到队列)
            self.queue.put((func, args, kwargs))
    
        def join_R(self):   # 主线程等待子线程结束
            self.queue.join()   # task_done 为0 的时候就阻塞
    
    
    def func():
        print(1, current_thread())
    
    def func2(*args, **kwargs):
        print(2, current_thread())
        print('func: ', args, kwargs)
    
    t = MyThread()
    t.apply_async(func)
    t.apply_async(func2, args=(1,2), kwargs={'a':1, 'b':2})
    print("任务提交完成")
    t.join_R()
    print("任务完成")

    结果:

    任务提交完成
    1 <MyThread(Thread-1, started daemon -1223214272)>
    2 <MyThread(Thread-1, started daemon -1223214272)>
    func:  (1, 2) {'a': 1, 'b': 2}
    任务完成    任务完成后,主线程就开始退出, 因此守护线程被杀死

    线程池的简单实现

    池的概念

    主线程: 相当于生产者,只管向线程池提交任务。
                   并不关心线程池是如何执行任务的。
                   因此,并不关心是哪一个线程执行的这个任务。
    线程池: 相当于消费者,负责接收任务,
                   并将任务分配到一个空闲的线程中去执行。

    代码实现如下:

    from threading import Thread, current_thread
    from queue import Queue
    
    class T_pool:
        def __init__(self, n):  # 准备多少个池
            super().__init__()
            self.queue = Queue()
            for i in range(n):  # 在池里开多少个线程
                Thread(target=self.fun, daemon=Thread).start()    # 守护进程 并启动
    
        def fun(self):   # 生产者
            while True:
                task = self.queue.get()
                task()
                self.queue.task_done()
    
        def apply_async(self, task):   # 消费者
            self.queue.put(task)
    
        def join(self):
            self.queue.join()
    
    def func():
        print(current_thread())
    
    def func2():
        print(current_thread())
    
    p = T_pool(2)
    p.apply_async(func)
    p.apply_async(func2)
    p.join()

    结果:

    <Thread(Thread-1, started daemon -1223324864)>
    <Thread(Thread-1, started daemon -1223324864)>

    Python自带的池

    内置线程池

    from multiprocessing.pool import ThreadPool     # 线程池
    from multiprocessing import pool  # 进程池
    # 内置线程池
    def fun(*args, **kwargs):
        print(args, kwargs)
    
    p = ThreadPool(2)   # 直接使用内置的
    p.apply_async(fun, args=(1,2), kwds={'a':1})
    p.close()   # 要求:在join前必须要close,这样就不允许再提交任务了
    p.join()

    结果:

    (1, 2) {'a': 1}

    内置进程池

    from multiprocessing import Pool  # 进程池
    # 内置进程池
    def fun(*args, **kwargs):
        print(args, kwargs)
    
    if __name__ == '__main__': # 必须要有一个main测试
        p = Pool(2)   # pool的实例化必须在main测试之下
        p.apply_async(fun, args=(1,2), kwds={'a':1})
        p.close()   # 要求:在join前必须要close,这样就不允许再提交任务了
        p.join()

    结果:

    (1, 2) {'a': 1}

    池的其他操作
    操作一: close - 关闭提交通道,不允许再提交任务
    操作二: terminate - 中止进程池,中止所有任务
    操作三: 结果操作

    结果操作

    from multiprocessing.pool import ThreadPool
    import time
    def func(n):
        if n == 1:
            return 1
        elif n == 2:
            return 2
        return func(n-1) + func(n-2)
    
    pool = ThreadPool()
    
    a_result = pool.apply_async(func, args=(35,))
    print("note1:",time.asctime(time.localtime(time.time())))
    result = a_result.get() # 会阻塞,知道结果产生了
    print("note2:",time.asctime(time.localtime(time.time())))

    结果:

    note1: Mon Sep 17 00:07:31 2018
    note2: Mon Sep 17 00:07:34 2018

     

    使用池来实现并发服务器

    使用线程池来实现并发服务器

    import socket
    from multiprocessing.pool import ThreadPool  # 线程池
    from multiprocessing import Pool, cpu_count
    '''
    使用线程池来实现
    并发服务器
    '''
    print(cpu_count())
    
    server = socket.socket()
    server.bind(('0.0.0.0', 8080))
    server.listen(1000)
    
    def work_thread(conn):
        while True:
            data = conn.recv(1000)
            if data:
                print(data)
                conn.send(data)
    
            else:
                conn.close()
                break
    
    if __name__ == '__main__':
    
        t_pool = ThreadPool(5)  # 使用线程池, 通常分配2倍的cpu个数
        while True:
            conn,addr = server.accept()
            t_pool.apply_async(work_thread, args=(conn,))  # 接收的是个任务, conn做为参数

    使用进程池来实现并发服务器

    import socket
    from multiprocessing.pool import ThreadPool  # 线程池
    from multiprocessing import Pool, cpu_count
    '''
    使用进程池来实现
    并发服务器
    '''
    print(cpu_count())
    
    server = socket.socket()
    server.bind(('0.0.0.0', 9000))
    server.listen(1000)
    
    def work_process(server):
        t_pool = ThreadPool(cpu_count()*2)  # 使用线程池, 通常分配2倍的cpu个数
        while True:
            conn,addr = server.accept()
            t_pool.apply_async(work_thread, args=(conn,))  # 接收的是个任务, conn做为参数
    
    def work_thread(conn):
        while True:
            data = conn.recv(1000)
            if data:
                print(data)
                conn.send(data)
    
            else:
                conn.close()
                break
    
    
    n = cpu_count()  # 获取当前计算机的CPU核心数量
    p = Pool(n)
    for i in range(n):  # 充分利用CPU, 为每个CPU分配一个进程
        p.apply_async(work_process, args=(server,))
    
    p.close()
    p.join()

    客户端:

    import socket
    
    click = socket.socket()
    click.connect(('127.0.0.1', 8888))
    
    while True:
         data = input("请输入你要发送的数据:")
         click.send(data.encode())
         print("接收到的消息: {}".format(click.recv(1024).decode()))

    总结完毕。

    作者:含笑半步颠√

    博客链接:https://www.cnblogs.com/lixy-88428977

    声明:本文为博主学习感悟总结,水平有限,如果不当,欢迎指正。如果您认为还不错,欢迎转载。转载与引用请注明作者及出处。

  • 相关阅读:
    资源加载相关
    Ubuntu 使用root登陆帐户
    安装zookeeper时,启动成功,可是状态查询未成功
    使用WinSCP远程连接虚拟机
    分布式服务管理框架-Zookeeper客户端zkCli.sh使用详解
    js怎么监听一类标签的点击事件
    js获取select标签选中的值
    学习
    druid
    Linux上非root用户jdk环境变量配置
  • 原文地址:https://www.cnblogs.com/lixy-88428977/p/9658417.html
Copyright © 2011-2022 走看看