zoukankan      html  css  js  c++  java
  • python多线程学习三

    本文希望达到的目标: 

              1、服务器端与线程池  (实例demo)

              2、并发插入db与线程池(实例demo)

              3、线程池使用说明

              4、线程池源码解析

     一、基于socket的服务器与线程池连接。

    1、在i7内核,windows机器下,处理300笔客户端发的请求(客户端开启3个进程,每个进程串行发送数据100笔),为模拟服务器处理过程,服务器在返回数据前,服务器休眠0.5s(如果没有的话,线程池和单进程是一样的效果),使用基于线程池(同时存在10个线程)构建的服务器端,耗时为50s,随时创建随时销毁的服务器端,耗时为140s。

          这个对比不能说明很多问题,因为不够真实,一方面客户端的并发量没上去;另一方面服务器端的应答时间比较随意,如果服务器端不sleep 0.5s,导致的结果是两种写法的耗用时间会是差不多的,因为创建单个线程的时间不是很长。但是结果差距的明显,说明了线程池在场景下还是有一定的性能优势的(开启10个进程,每个进程发送数据100笔,基于线程池的服务器也能在50s内处理完成)。

    import  thread

    import time, threading
    import socket

    host =''
    port =8888
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.bind((host,port))
    s.listen(3)


    class MyThread(threading.Thread):
    def __init__(self,func,args,name=''):
    threading.Thread.__init__(self)
    self.name = name
    self.func = func
    self.args = args

    def run(self):
    self.func(*self.args)



    def handle_request(conn_socket):
    recv_data = conn_socket.recv(1024)
    reply = 'HTTP/1.1 200 OK '
    reply += 'hello world'
    time.sleep(0.5)
    conn_socket.send(reply)
    conn_socket.close()


    def main():
    while True:
    conn_socket, addr = s.accept()
    t = MyThread(handle_request,(conn_socket,))
    t.start()
    t.join()

    if __name__ == '__main__':
    main()

    还有网上重写的线程池,如果需要使用线程池做真正的项目,个人不建议参考网上这些不规范的脚本:

    #!/usr/bin/env python
    #coding=gbk
    
    import socket
    from Queue import Queue
    from threading import Thread
    import threading
    
    
    """
            重写threadpool,自定义和实现线程池,实现web服务端多线程处理
    """
    host =''
    port =8888
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.bind((host,port))
    s.listen(3)
    
    class ThreadPoolManger():
        def __init__(self,thread_num):
            self.work_queue =Queue()
            self.thread_num = thread_num
            self._init_threading_pool(self.thread_num)
    
        def _init_threading_pool(self,thread_num):
            for i in range(thread_num):
                thread= ThreadManger(self.work_queue)
                thread.start()
    
        def add_job(self,func,*args):
            self.work_queue.put((func,args))
    
    
    class ThreadManger(Thread):
        def __init__(self,work_queue):
            Thread.__init__(self)
            self.work_queue = work_queue
            self.daemon =True
    
        def run(self):
            while True:
                target,args = self.work_queue.get()
                target(*args)
                self.work_queue.task_done()
    
    def handle_request(conn_socket):
        recv_data = conn_socket.recv(1024)
        reply = 'HTTP/1.1 200 OK 
    
    '
        reply += 'hello world'
        print 'thread %s is running ' % threading.current_thread().name
        conn_socket.send(reply)
        conn_socket.close()
    
    thread_pool = ThreadPoolManger(10)
    while True:
        conn_socket,addr =s.accept()
        thread_pool.add_job(handle_request,*(conn_socket,))
    
    s.close()

     二、向DB插入数据1w条

          不考虑插入数据时的一些技巧(比如拼接sql批量插入;或者先写文件,再把调用mysql命令把文件导入db),单纯的基于多线程和线程池的差异对比,结果是同时开启10个线程插入1w笔数据耗时为32s,而使用线程池(size=10)耗时为9s,两者DB的连接

    都是每个线程有自己的句柄,内部实现是一致的,这个结果充分的说明了线程池的优势。

    #!/usr/bin/env python
    #coding=GBK
    from mytool.mysql import MySql
    import threading
    import time
    
    class MyThread(threading.Thread):
        def __init__(self,func,args,name=''):
            threading.Thread.__init__(self)
            self.name = name
            self.func = func
            self.args = args
    
        def run(self):
            self.func(*self.args)
    
    def make_t_tcpay_list(Flistid,Ftde_id,n):
        Fnum =0.5
        Fabankid = 6222620710005743769
        Fuid = 10000256025
        wbdb_mysql = MySql("10.125.56.154", "root", "1234", "wd_db")
        for i in range(n):
            sql ="**"   //自定义完成
            wbdb_mysql.connect()
            wbdb_mysql.execute_insert_command(sql)
            Flistid = Flistid + 1
            Ftde_id = Ftde_id + 1
    
    if __name__ == "__main__":
    
        start = time.time()
        Threads=[]
        Flistid = 110180809100012153312110361120
        Ftde_id = 1

    for i in range(10): print i Flistid = Flistid+1000 Ftde_id = Ftde_id +1000 t = MyThread(make_t_tcpay_list, (Flistid,Ftde_id,1000)) Threads.append(t) for i in range(10): Threads[i].start() for i in range(10): Threads[i].join() end = time.time() print end - start

    下面是线程池:

    #!/usr/bin/env python
    #coding=GBK
    from mytool.mysql import MySql
    import threadpool
    import time
    
    def make_t_tcpay_list(n,Flistid,Ftde_id):
        Fnum =0.5
        Fabankid = 6222620710005743769
        Fuid = 10000256025
        wbdb_mysql = MySql("10.125.56.154", "root", "root1234", "wd_db")
        wbdb_mysql.connect()
    
        for i in range(int(n)):
            sql =""   //自定义完成
            wbdb_mysql.execute_insert_command(sql)
            Flistid =int(Flistid) + 1
            Ftde_id =Ftde_id +1
    
    
    if __name__ == "__main__":
    
        start = time.time()
        lst_vars_1 = ['1000','110180809100012153314210311120',1]
        lst_vars_2 = ['1000','110180809100012153314210321120',2000]
        lst_vars_3 = ['1000','110180809100012153314210331120', 4000]
        lst_vars_4 = ['1000','110180809100012153314210341120', 6000]
        lst_vars_5 = ['1000','110180809100012153314210351120', 8000]
        lst_vars_6 = ['1000','110180809100012153314210361120', 10000]
        lst_vars_7 = ['1000', '110180809100012153314210721120', 12000]
        lst_vars_8 = ['1000', '110180809100012153314210821120', 14000]
        lst_vars_9 = ['1000', '110180809100012153314210921120', 16000]
        lst_vars_10 = ['1000', '110180809100012123314210321120', 18000]
    
        func_var = [(lst_vars_1, None), (lst_vars_2, None), (lst_vars_3, None), (lst_vars_4, None), (lst_vars_5, None), (lst_vars_6, None), (lst_vars_7, None), (lst_vars_8, None), (lst_vars_9, None), (lst_vars_10, None)]
    
        pool = threadpool.ThreadPool(10)
        requests = threadpool.makeRequests(make_t_tcpay_list, func_var)
        for req in requests:
            pool.putRequest(req)
        pool.wait()
        end = time.time()
        print end - start

    三、线程池使用说明

           线程池基本原理: 我们把任务放进队列中去,然后开N个线程,每个线程都去队列中取一个任务,执行完了之后告诉系统说我执行完了,然后接着去队列中取下一个任务,直至队列中所有任务取空,退出线程。

           通过上面2个demo,对单线程,多线程,线程池实际处理的差异性有了一定的了解。但是实际因为Cpython的GIL全局排他锁的存在,导致任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁。多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

          对于IO密集型的任务,多线程还是起到很大效率提升(之前的两个demo都属于IO密集型),因为进行耗时的IO操作的时候,会能释放GIL,从而其他线程就可以获取GIL,提供了任务的并发处理能力。比如,计算时间占20%, IO等待时间80%,假设任务完成需要5s,可以想象成完成任务过程:CPU占用1秒,等待时间4秒,CPU在线程等待时,可以切换另外一个线程进程,这个线程的CPU运行完了,还能允许再激活3个线程,这个是第一个线程等待时间结束了,CPU切换回去,第一个线程任务就处理完成了,完成5个任务,只需要10s。这里还有一个公式: 线程数= N*(x+y)/x;N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,这样能让CPU的利用率最大化。 由于有GIL的影响,python只能使用到1个核,所以这里设置N=1

          而对于计算密集型的任务,先看一个简单的demo,计算100次(1+2+...+10000),单进程耗时0.44s,而多进程和线程池耗时呢?比单进程还差,耗时1.2s。

    单进程:

    import time, threading
    
    def change_it(n):
        for j in range(n):
            sum =0
            for i in range(100000):
                sum = sum + i
    
    if __name__ == "__main__":
        start = time.time()
        change_it(100)
        end = time.time()
        print end - start

    耗时:

    线程池(多进程):

    #coding=GBK
    from mytool.mysql import MySql
    import threadpool
    import time
    
    def change_it(n):
        for j in range(n):
            sum =0
            for i in range(100000):
                sum = sum + i
    
    if __name__ == "__main__":
    
        start = time.time()
    
        func_var = [10,10,10,10,10,10,10,10,10,10]
    
        pool = threadpool.ThreadPool(10)
        requests = threadpool.makeRequests(change_it, func_var)
        for req in requests:
            pool.putRequest(req)
        pool.wait()
        end = time.time()
        print end - start

    耗时:

          看到python在多线程的情况下居然比单线程整整慢了61%。对于由于一个CPU密集型的任务,使用多线程编程,会因为CPU一直处于运行状态,而线程又要等待获取GIL锁,从而进行线程处于循环等待状态,导致性能反而下降。

    四、线程池源码解析

     1、核心类:ThreadPool

         类成员变量:

    class ThreadPool:
        def __init__(self, num_workers, q_size=0):
            self.requestsQueue = Queue.Queue(q_size)
            self.resultsQueue = Queue.Queue()
            self.workers = []
            self.workRequests = {}
            self.createWorkers(num_workers)

       用户自定义的类似,不同的地方多了一个结果队列,把创建的运行线程全都放入了workers 内,且把所有的任务对象化workRequests

        关键函数:

        def createWorkers(self, num_workers):
            """Add num_workers worker threads to the pool."""
    
            for i in range(num_workers):
                self.workers.append(WorkerThread(self.requestsQueue,
                  self.resultsQueue))
        def putRequest(self, request, block=True, timeout=0):
            """Put work request into work queue and save its id for later."""
    
            assert isinstance(request, WorkRequest)
            self.requestsQueue.put(request, block, timeout)
            self.workRequests[request.requestID] = request

    2、工作线程类 :WorkerThread

    成员变量:

    class WorkerThread(threading.Thread):
       
        def __init__(self, requestsQueue, resultsQueue, **kwds):
    
            threading.Thread.__init__(self, **kwds)
            self.setDaemon(1)
            self.workRequestQueue = requestsQueue
            self.resultQueue = resultsQueue
            self._dismissed = threading.Event()
            self.start()

    关键函数:

        def run(self):
        
            while not self._dismissed.isSet():
                # thread blocks here, if queue empty
                request = self.workRequestQueue.get()
                if self._dismissed.isSet():
                    # if told to exit, return the work request we just picked up
                    self.workRequestQueue.put(request)
                    break # and exit
                try:
                    self.resultQueue.put(
                        (request, request.callable(*request.args, **request.kwds))
                    )
                except:
                    request.exception = True
                    self.resultQueue.put((request, sys.exc_info()))

    3、生成任务对象

    def makeRequests(callable, args_list, callback=None, exc_callback=None):
    
        requests = []
        for item in args_list:
            if isinstance(item, tuple):
                requests.append(
                  WorkRequest(callable, item[0], item[1], callback=callback,
                    exc_callback=exc_callback)
                )
            else:
                requests.append(
                  WorkRequest(callable, [item], None, callback=callback,
                    exc_callback=exc_callback)
                )
        return requests

    其中任务对象类定义为:

    class WorkRequest:
            if requestID is None:
                self.requestID = id(self)
            else:
                try:
                    hash(requestID)
                except TypeError:
                    raise TypeError("requestID must be hashable.")
                self.requestID = requestID
            self.exception = False
            self.callback = callback
            self.exc_callback = exc_callback
            self.callable = callable
            self.args = args or []
            self.kwds = kwds or {}
    
    
    
  • 相关阅读:
    数据标注对于人工智能行业的发展到底有多重要?
    人工智能行业每日必读(01·15)
    数据堂与云测数据,哪个数据标注质量更高?
    龙猫数据与云测数据,哪个数据标注质量更高?
    人工智能行业每日必读(01·14)
    人工智能行业每日必读(01.13)
    AI行业精选日报_人工智能(01·10)
    Serverless Kubernetes:理想,现实与未来
    什么是云原生
    深挖云原生的真正含义
  • 原文地址:https://www.cnblogs.com/loleina/p/9664434.html
Copyright © 2011-2022 走看看