zoukankan      html  css  js  c++  java
  • python 多线程 (线程池的几种使用)(2)

    参考自定义线程池:https://www.cnblogs.com/zhang293/p/7954353.html

    原密解析: https://www.jianshu.com/p/b9b3d66aa0be

    最佳线程数的获取:

    1、通过用户慢慢递增来进行性能压测,观察QPS(即每秒的响应请求数,也即是最大吞吐能力。),响应时间

    2、根据公式计算:服务器端最佳线程数量=((线程等待时间+线程cpu时间)/线程cpu时间) * cpu数量

    3、单用户压测,查看CPU的消耗,然后直接乘以百分比,再进行压测,一般这个值的附近应该就是最佳线程数量。

    方式 1 :(推荐)

     线程的监听:

    pools = ThreadPoolExecutor(5)
    def testKafkaConsumerMutil():
        consumer = KafkaConsumer("Sea",bootstrap_servers = ['192.168.18.129:9092'],
                            group_id ='test0',
                            auto_offset_reset ='earliest',
                            max_poll_records=100,
                            consumer_timeout_ms=30000,
                            session_timeout_ms=30000, 
                            enable_auto_commit =False)
        for msg in consumer:
            offset = msg.offset
            print("offset"+str(offset))
            data= json.loads(msg.value)
            pools.submit(printt,data)
            #监听当前线程池的线程数量
            print("qsize"+str(pools._work_queue.qsize()))
            if pools._work_queue.qsize()>=6:
                pass
                time.sleep(30)
            consumer.commit()



    def printt(value):
    #     print(value)
        time.sleep(2)
        pass
    ThreadPoolExecutor  返回为future(T)
    '''
    Created on 2019年10月11日
    
    @author: sea
    '''
    # -*- coding: utf-8 -*-
    from concurrent.futures import ThreadPoolExecutor
    import time
    from com.sea.email.MailUtils import send
    '''
    ThreadPoolExecutor中的submit()
    <T> Future<T> submit(Callable<T> callable);
    <T> Future<T> submit(Runnable var1, T result);
    Future<?> submit(Runnable runnable);
    '''
    print(" 线程池 ThreadPoolExecutor 的使用")
    def sayhello(a):
        print("hello  "+a)
    #     time.sleep(1)
        return "结果 nihao : "+a
    
    
    
    def test1():#submit()
        pool = ThreadPoolExecutor(3)
        submit1 = pool.submit(sayhello,("sea"))  #方法名 ,参数() 如果有多个参数,直接写多个就好,eg : say(a,b)----->pool.submit(say,"aaa","bbb")
    #     submit1 = pool.submit(sayhello,"sea")  #方法名 ,参数()
    
        submit2 = pool.submit(sayhello,("sea2"))
        submit3 = pool.submit(sayhello,("sea3"))
        submit4 = pool.submit(sayhello,("sea4"))
        
        print(submit4.result())#打印应返回值  阻塞-直到返回结果
        print(submit1.result())
        print(submit2.result())
        print(submit3.result())
        print("over")
    
    def test2():#map() 
        seed=["a","b","c"]
        pool = ThreadPoolExecutor(3)
        rsultList = pool.map(sayhello,seed)#返回值为list 结果集
        for result in rsultList:
            print(result)#打印应返回值
        print("over")
    
    #     with ThreadPoolExecutor(3) as executor1:
    #         executor1.map(sayhello,seed)
    if __name__ == '__main__':
    #     test1()
        test2()
        
        
      

    concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:

    2.1、map可以保证输出的顺序, submit输出的顺序是乱的

    2.2、如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()

    2.3、submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。

    3.现在?

    这里要考虑一个问题,以上两种线程池的实现都是封装好的,任务只能在线程池初始化的时候添加一次,那么,假设我现在有这样一个需求,需要在线程池运行时,再往里面添加新的任务(注意,是新任务,不是新线程),那么要怎么办?

    方式2 :python3的vthread库
    可以试试python3的vthread库
    import vthread
    
    @vthread.pool(6)
    def some(a,b,c):
        import time;time.sleep(1)
        print(a+b+c)
    
    for i in range(10):
        some(i,i,i)
    
    分组线程池
    import vthread
    
    @vthread.pool(6)
    def some1(a,b,c):
        import time;time.sleep(1)
        print("some1",a+b+c)
    
    @vthread.pool(3,1)
    def some2(a,b,c):
        import time;time.sleep(1)
        print("some2",a*b*c)
    
    for i in range(10):
        some1(i,i,i)
        some2(i,i,i)
    
    加锁或其他操作,help()一下基本就能看懂。

    方式3 :(比较老)

    使用threadpool模块  具体使用方式如下:

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import threadpool
    import time
    
    def sayhello (a):
        print("hello: "+a)
        time.sleep(2)
    
    def main():
        global result
        seed=["a","b","c"]
        start=time.time()
        task_pool=threadpool.ThreadPool(5)
        requests=threadpool.makeRequests(sayhello,seed)
        for req in requests:
            task_pool.putRequest(req)
        task_pool.wait()
        end=time.time()
        time_m = end-start
        print("time: "+str(time_m))
        start1=time.time()
        for each in seed:
            sayhello(each)
        end1=time.time()
        print("time1: "+str(end1-start1))
    
    if __name__ == '__main__':
        main()
  • 相关阅读:
    第三讲:增长全局观
    搭建安卓环境
    ~~
    天气阴
    天气晴
    Spark性能优化指南——高级篇
    Ceph Jewel 10.2.3 环境部署
    《你只是看起来很努力》--读书笔记
    博客园基础环境配置
    Spark 1.3.0 单机安装
  • 原文地址:https://www.cnblogs.com/lshan/p/11654495.html
Copyright © 2011-2022 走看看